airbyte_cdk.sources.declarative.async_job.job_orchestrator

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2
  3import logging
  4import threading
  5import time
  6import traceback
  7import uuid
  8from datetime import datetime, timedelta, timezone
  9from typing import (
 10    Any,
 11    Generator,
 12    Generic,
 13    Iterable,
 14    List,
 15    Mapping,
 16    Optional,
 17    Set,
 18    Tuple,
 19    Type,
 20    TypeVar,
 21)
 22
 23from airbyte_cdk.logger import lazy_log
 24from airbyte_cdk.models import FailureType
 25from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
 26from airbyte_cdk.sources.declarative.async_job.job_tracker import (
 27    ConcurrentJobLimitReached,
 28    JobTracker,
 29)
 30from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
 31from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
 32from airbyte_cdk.sources.message import MessageRepository
 33from airbyte_cdk.sources.types import StreamSlice
 34from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 35from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 36
 37LOGGER = logging.getLogger("airbyte")
 38_NO_TIMEOUT = timedelta.max
 39_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
 40
 41
 42class AsyncPartition:
 43    """
 44    This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
 45    """
 46
 47    _DEFAULT_MAX_JOB_RETRY = 3
 48
 49    def __init__(
 50        self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
 51    ) -> None:
 52        self._attempts_per_job = {job: 1 for job in jobs}
 53        self._stream_slice = stream_slice
 54        self._job_max_retry = (
 55            job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
 56        )
 57
 58    def has_reached_max_attempt(self) -> bool:
 59        return any(
 60            map(
 61                lambda attempt_count: attempt_count >= self._job_max_retry,
 62                self._attempts_per_job.values(),
 63            )
 64        )
 65
 66    def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None:
 67        current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
 68        if current_attempt_count is None:
 69            raise ValueError("Could not find job to replace")
 70        elif current_attempt_count >= self._job_max_retry:
 71            raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
 72
 73        new_attempt_count = current_attempt_count + 1
 74        for job in new_jobs:
 75            self._attempts_per_job[job] = new_attempt_count
 76
 77    def should_split(self, job: AsyncJob) -> bool:
 78        """
 79        Not used right now but once we support job split, we should split based on the number of attempts
 80        """
 81        return False
 82
 83    @property
 84    def jobs(self) -> Iterable[AsyncJob]:
 85        return self._attempts_per_job.keys()
 86
 87    @property
 88    def stream_slice(self) -> StreamSlice:
 89        return self._stream_slice
 90
 91    @property
 92    def status(self) -> AsyncJobStatus:
 93        """
 94        Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed
 95        or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).
 96        """
 97        statuses = set(map(lambda job: job.status(), self.jobs))
 98        if statuses == {AsyncJobStatus.COMPLETED}:
 99            return AsyncJobStatus.COMPLETED
100        elif statuses == {AsyncJobStatus.SKIPPED}:
101            return AsyncJobStatus.SKIPPED
102        elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}:
103            # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs
104            return AsyncJobStatus.COMPLETED
105        elif AsyncJobStatus.FAILED in statuses:
106            return AsyncJobStatus.FAILED
107        elif AsyncJobStatus.TIMED_OUT in statuses:
108            return AsyncJobStatus.TIMED_OUT
109        else:
110            return AsyncJobStatus.RUNNING
111
112    def __repr__(self) -> str:
113        return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})"
114
115    def __json_serializable__(self) -> Any:
116        return self._stream_slice
117
118
119T = TypeVar("T")
120
121
122class LookaheadIterator(Generic[T]):
123    def __init__(self, iterable: Iterable[T]) -> None:
124        self._iterator = iter(iterable)
125        self._buffer: List[T] = []
126
127    def __iter__(self) -> "LookaheadIterator[T]":
128        return self
129
130    def __next__(self) -> T:
131        if self._buffer:
132            return self._buffer.pop()
133        else:
134            return next(self._iterator)
135
136    def has_next(self) -> bool:
137        if self._buffer:
138            return True
139
140        try:
141            self._buffer = [next(self._iterator)]
142        except StopIteration:
143            return False
144        else:
145            return True
146
147    def add_at_the_beginning(self, item: T) -> None:
148        self._buffer = [item] + self._buffer
149
150
151class AsyncJobOrchestrator:
152    _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5
153    _KNOWN_JOB_STATUSES = {
154        AsyncJobStatus.COMPLETED,
155        AsyncJobStatus.FAILED,
156        AsyncJobStatus.RUNNING,
157        AsyncJobStatus.TIMED_OUT,
158        AsyncJobStatus.SKIPPED,
159    }
160    _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
161
162    def __init__(
163        self,
164        job_repository: AsyncJobRepository,
165        slices: Iterable[StreamSlice],
166        job_tracker: JobTracker,
167        message_repository: MessageRepository,
168        exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
169        has_bulk_parent: bool = False,
170        job_max_retry: Optional[int] = None,
171        failed_retry_wait_time_in_seconds: Optional[int] = None,
172    ) -> None:
173        """
174        If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
175        needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
176        where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
177        "trying to start a parent job" and "ConcurrentJobLimitReached".
178        """
179        if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
180            # this is to prevent developers updating the possible statuses without updating the logic of this class
181            raise ValueError(
182                "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
183            )
184
185        if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
186            raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")
187
188        self._job_repository: AsyncJobRepository = job_repository
189        self._slice_iterator = LookaheadIterator(slices)
190        self._running_partitions: List[AsyncPartition] = []
191        self._job_tracker = job_tracker
192        self._message_repository = message_repository
193        self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
194        self._has_bulk_parent = has_bulk_parent
195        self._job_max_retry = job_max_retry
196        self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds
197
198        self._non_breaking_exceptions: List[Exception] = []
199
200    def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
201        failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
202        jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
203        for job in jobs_to_replace:
204            if (
205                self._failed_retry_wait_time_in_seconds is not None
206                and job.status() == AsyncJobStatus.FAILED
207                and not job.is_creation_failure()
208            ):
209                if not job.ready_to_retry():
210                    lazy_log(
211                        LOGGER,
212                        logging.DEBUG,
213                        lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
214                    )
215                    continue
216                if not job.retry_deferred():
217                    job.set_retry_after(
218                        datetime.now(tz=timezone.utc)
219                        + timedelta(seconds=self._failed_retry_wait_time_in_seconds)
220                    )
221                    lazy_log(
222                        LOGGER,
223                        logging.INFO,
224                        lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.",
225                    )
226                    continue
227            new_job = self._start_job(job.job_parameters(), job.api_job_id())
228            partition.replace_job(job, [new_job])
229
230    def _start_jobs(self) -> None:
231        """
232        Retry failed jobs and start jobs for each slice in the slice iterator.
233        This method iterates over the running jobs and slice iterator and starts a job for each slice.
234        The started jobs are added to the running partitions.
235        Returns:
236            None
237
238        However, the first iteration is for sendgrid which only has one job.
239        """
240        at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False
241        _slice = None
242        try:
243            for partition in self._running_partitions:
244                self._replace_failed_jobs(partition)
245
246            if (
247                self._has_bulk_parent
248                and self._running_partitions
249                and self._slice_iterator.has_next()
250            ):
251                LOGGER.debug(
252                    "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget"
253                )
254                return
255
256            for _slice in self._slice_iterator:
257                at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
258                job = self._start_job(_slice)
259                self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
260                if self._has_bulk_parent and self._slice_iterator.has_next():
261                    break
262        except ConcurrentJobLimitReached:
263            if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration:
264                # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator
265                self._slice_iterator.add_at_the_beginning(_slice)  # type: ignore  # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop
266            LOGGER.debug(
267                "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..."
268            )
269
270    def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob:
271        if previous_job_id:
272            id_to_replace = previous_job_id
273            lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...")
274        else:
275            id_to_replace = self._job_tracker.try_to_get_intent()
276
277        try:
278            job = self._job_repository.start(_slice)
279            self._job_tracker.add_job(id_to_replace, job.api_job_id())
280            return job
281        except Exception as exception:
282            LOGGER.warning(f"Exception has occurred during job creation: {exception}")
283            if self._is_breaking_exception(exception):
284                self._job_tracker.remove_job(id_to_replace)
285                raise exception
286            return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace)
287
288    def _keep_api_budget_with_failed_job(
289        self, _slice: StreamSlice, exception: Exception, intent: str
290    ) -> AsyncJob:
291        """
292        We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job
293        as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for
294        retrying jobs that couldn't be started.
295        """
296        LOGGER.warning(
297            f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}"
298        )
299        traced_exception = (
300            exception
301            if isinstance(exception, AirbyteTracedException)
302            else AirbyteTracedException.from_exception(exception)
303        )
304        # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate,
305        # we would keep the exceptions in-memory until we know that we have reached the max attempt.
306        self._message_repository.emit_message(traced_exception.as_airbyte_message())
307        job = self._create_failed_job(_slice)
308        self._job_tracker.add_job(intent, job.api_job_id())
309        return job
310
311    def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
312        job = AsyncJob(
313            f"{uuid.uuid4()} - Job that could not start",
314            stream_slice,
315            _NO_TIMEOUT,
316            is_creation_failure=True,
317        )
318        job.update_status(AsyncJobStatus.FAILED)
319        return job
320
321    def _get_running_jobs(self) -> Set[AsyncJob]:
322        """
323        Returns a set of running AsyncJob objects.
324
325        Returns:
326            Set[AsyncJob]: A set of AsyncJob objects that are currently running.
327        """
328        return {
329            job
330            for partition in self._running_partitions
331            for job in partition.jobs
332            if job.status() == AsyncJobStatus.RUNNING
333        }
334
335    def _update_jobs_status(self) -> None:
336        """
337        Update the status of all running jobs in the repository.
338        """
339        running_jobs = self._get_running_jobs()
340        if running_jobs:
341            # update the status only if there are RUNNING jobs
342            self._job_repository.update_jobs_status(running_jobs)
343
344    def _wait_on_status_update(self) -> None:
345        """
346        Waits for a specified amount of time between status updates.
347
348
349        This method is used to introduce a delay between status updates in order to avoid excessive polling.
350        The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`.
351
352        Returns:
353            None
354        """
355        lazy_log(
356            LOGGER,
357            logging.DEBUG,
358            lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.",
359        )
360
361        lazy_log(
362            LOGGER,
363            logging.DEBUG,
364            lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...",
365        )
366        time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS)
367
368    def _process_completed_partition(self, partition: AsyncPartition) -> None:
369        """
370        Process a completed partition.
371        Args:
372            partition (AsyncPartition): The completed partition to process.
373        """
374        job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs}))
375        LOGGER.info(
376            f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}."
377        )
378
379        # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs
380        # but won't be able to as all jobs slots are taken even though job is done.
381        for job in partition.jobs:
382            self._job_tracker.remove_job(job.api_job_id())
383
384    def _process_skipped_partition(self, partition: AsyncPartition) -> None:
385        """
386        Process a skipped partition. The API indicated there is no data to return for this job
387        (e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job
388        allocation without fetching any records or raising errors.
389        """
390        job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs}))
391        LOGGER.info(
392            f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}."
393        )
394        for job in partition.jobs:
395            self._job_tracker.remove_job(job.api_job_id())
396
397    def _process_running_partitions_and_yield_completed_ones(
398        self,
399    ) -> Generator[AsyncPartition, Any, None]:
400        """
401        Process the running partitions.
402
403        Yields:
404            AsyncPartition: The processed partition.
405
406        Raises:
407            Any: Any exception raised during processing.
408        """
409        current_running_partitions: List[AsyncPartition] = []
410        for partition in self._running_partitions:
411            match partition.status:
412                case AsyncJobStatus.COMPLETED:
413                    self._process_completed_partition(partition)
414                    yield partition
415                case AsyncJobStatus.SKIPPED:
416                    self._process_skipped_partition(partition)
417                case AsyncJobStatus.RUNNING:
418                    current_running_partitions.append(partition)
419                case _ if partition.has_reached_max_attempt():
420                    self._stop_partition(partition)
421                    self._process_partitions_with_errors(partition)
422                case _:
423                    self._stop_timed_out_jobs(partition)
424                    # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated
425                    self._reallocate_partition(current_running_partitions, partition)
426
427            # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
428            self._remove_completed_jobs(partition)
429
430        # update the referenced list with running partitions
431        self._running_partitions = current_running_partitions
432
433    def _stop_partition(self, partition: AsyncPartition) -> None:
434        for job in partition.jobs:
435            if job.status() in _API_SIDE_RUNNING_STATUS:
436                self._abort_job(job, free_job_allocation=True)
437            else:
438                self._job_tracker.remove_job(job.api_job_id())
439
440    def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
441        for job in partition.jobs:
442            if job.status() == AsyncJobStatus.TIMED_OUT:
443                self._abort_job(job, free_job_allocation=False)
444
445    def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
446        try:
447            self._job_repository.abort(job)
448            if free_job_allocation:
449                self._job_tracker.remove_job(job.api_job_id())
450        except Exception as exception:
451            LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")
452
453    def _remove_completed_jobs(self, partition: AsyncPartition) -> None:
454        """
455        Remove completed or timed out jobs from the partition.
456
457        Args:
458            partition (AsyncPartition): The partition to process.
459        """
460        for job in partition.jobs:
461            if job.status() == AsyncJobStatus.COMPLETED:
462                self._job_tracker.remove_job(job.api_job_id())
463
464    def _reallocate_partition(
465        self,
466        current_running_partitions: List[AsyncPartition],
467        partition: AsyncPartition,
468    ) -> None:
469        """
470        Reallocate the partition by starting a new job for each job in the
471        partition.
472        Args:
473            current_running_partitions (list): The list of currently running partitions.
474            partition (AsyncPartition): The partition to reallocate.
475        """
476        current_running_partitions.insert(0, partition)
477
478    def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
479        """
480        Process a partition with status errors (FAILED and TIMEOUT).
481
482        Args:
483            partition (AsyncPartition): The partition to process.
484        Returns:
485            AirbyteTracedException: An exception indicating that at least one job could not be completed.
486        Raises:
487            AirbyteTracedException: If at least one job could not be completed.
488        """
489        status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
490        self._non_breaking_exceptions.append(
491            AirbyteTracedException(
492                message="Async job failed after exhausting all retry attempts.",
493                internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
494                failure_type=FailureType.system_error,
495            )
496        )
497
498    def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
499        """
500        Creates and retrieves completed partitions.
501        This method continuously starts jobs, updates job status, processes running partitions,
502        logs polling partitions, and waits for status updates. It yields completed partitions
503        as they become available.
504
505        Returns:
506            An iterable of completed partitions, represented as AsyncPartition objects.
507            Each partition is wrapped in an Optional, allowing for None values.
508        """
509        while True:
510            try:
511                lazy_log(
512                    LOGGER,
513                    logging.DEBUG,
514                    lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop",
515                )
516                self._start_jobs()
517                if not self._slice_iterator.has_next() and not self._running_partitions:
518                    break
519
520                self._update_jobs_status()
521                yield from self._process_running_partitions_and_yield_completed_ones()
522                self._wait_on_status_update()
523            except Exception as exception:
524                LOGGER.warning(
525                    f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
526                )
527                if self._is_breaking_exception(exception):
528                    self._abort_all_running_jobs()
529                    raise exception
530
531                self._non_breaking_exceptions.append(exception)
532
533        LOGGER.info(
534            f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}"
535        )
536        if self._non_breaking_exceptions:
537            # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
538            # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
539            raise AirbyteTracedException(
540                message="One or more async jobs failed after exhausting all retry attempts.",
541                internal_message="\n".join(
542                    [
543                        filter_secrets(exception.__repr__())
544                        for exception in self._non_breaking_exceptions
545                    ]
546                ),
547                failure_type=FailureType.system_error,
548            )
549
550    def _handle_non_breaking_error(self, exception: Exception) -> None:
551        LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")
552        self._non_breaking_exceptions.append(exception)
553
554    def _abort_all_running_jobs(self) -> None:
555        for partition in self._running_partitions:
556            for job in partition.jobs:
557                if job.status() in self._RUNNING_ON_API_SIDE_STATUS:
558                    self._abort_job(job, free_job_allocation=True)
559                self._job_tracker.remove_job(job.api_job_id())
560
561        self._running_partitions = []
562
563    def _is_breaking_exception(self, exception: Exception) -> bool:
564        return isinstance(exception, self._exceptions_to_break_on) or (
565            isinstance(exception, AirbyteTracedException)
566            and exception.failure_type == FailureType.config_error
567        )
568
569    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
570        """
571        Fetches records from the given jobs.
572
573        Args:
574            async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
575
576        Yields:
577            Iterable[Mapping[str, Any]]: The fetched records from the jobs.
578        """
579        for job in async_jobs:
580            yield from self._job_repository.fetch_records(job)
581            self._job_repository.delete(job)
LOGGER = <Logger airbyte (INFO)>
class AsyncPartition:
 43class AsyncPartition:
 44    """
 45    This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
 46    """
 47
 48    _DEFAULT_MAX_JOB_RETRY = 3
 49
 50    def __init__(
 51        self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
 52    ) -> None:
 53        self._attempts_per_job = {job: 1 for job in jobs}
 54        self._stream_slice = stream_slice
 55        self._job_max_retry = (
 56            job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
 57        )
 58
 59    def has_reached_max_attempt(self) -> bool:
 60        return any(
 61            map(
 62                lambda attempt_count: attempt_count >= self._job_max_retry,
 63                self._attempts_per_job.values(),
 64            )
 65        )
 66
 67    def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None:
 68        current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
 69        if current_attempt_count is None:
 70            raise ValueError("Could not find job to replace")
 71        elif current_attempt_count >= self._job_max_retry:
 72            raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
 73
 74        new_attempt_count = current_attempt_count + 1
 75        for job in new_jobs:
 76            self._attempts_per_job[job] = new_attempt_count
 77
 78    def should_split(self, job: AsyncJob) -> bool:
 79        """
 80        Not used right now but once we support job split, we should split based on the number of attempts
 81        """
 82        return False
 83
 84    @property
 85    def jobs(self) -> Iterable[AsyncJob]:
 86        return self._attempts_per_job.keys()
 87
 88    @property
 89    def stream_slice(self) -> StreamSlice:
 90        return self._stream_slice
 91
 92    @property
 93    def status(self) -> AsyncJobStatus:
 94        """
 95        Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed
 96        or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).
 97        """
 98        statuses = set(map(lambda job: job.status(), self.jobs))
 99        if statuses == {AsyncJobStatus.COMPLETED}:
100            return AsyncJobStatus.COMPLETED
101        elif statuses == {AsyncJobStatus.SKIPPED}:
102            return AsyncJobStatus.SKIPPED
103        elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}:
104            # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs
105            return AsyncJobStatus.COMPLETED
106        elif AsyncJobStatus.FAILED in statuses:
107            return AsyncJobStatus.FAILED
108        elif AsyncJobStatus.TIMED_OUT in statuses:
109            return AsyncJobStatus.TIMED_OUT
110        else:
111            return AsyncJobStatus.RUNNING
112
113    def __repr__(self) -> str:
114        return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})"
115
116    def __json_serializable__(self) -> Any:
117        return self._stream_slice

This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs

AsyncPartition( jobs: List[airbyte_cdk.sources.declarative.async_job.job.AsyncJob], stream_slice: airbyte_cdk.StreamSlice, job_max_retry: Optional[int] = None)
50    def __init__(
51        self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
52    ) -> None:
53        self._attempts_per_job = {job: 1 for job in jobs}
54        self._stream_slice = stream_slice
55        self._job_max_retry = (
56            job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
57        )
def has_reached_max_attempt(self) -> bool:
59    def has_reached_max_attempt(self) -> bool:
60        return any(
61            map(
62                lambda attempt_count: attempt_count >= self._job_max_retry,
63                self._attempts_per_job.values(),
64            )
65        )
def replace_job( self, job_to_replace: airbyte_cdk.sources.declarative.async_job.job.AsyncJob, new_jobs: List[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> None:
67    def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None:
68        current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
69        if current_attempt_count is None:
70            raise ValueError("Could not find job to replace")
71        elif current_attempt_count >= self._job_max_retry:
72            raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
73
74        new_attempt_count = current_attempt_count + 1
75        for job in new_jobs:
76            self._attempts_per_job[job] = new_attempt_count
def should_split( self, job: airbyte_cdk.sources.declarative.async_job.job.AsyncJob) -> bool:
78    def should_split(self, job: AsyncJob) -> bool:
79        """
80        Not used right now but once we support job split, we should split based on the number of attempts
81        """
82        return False

Not used right now but once we support job split, we should split based on the number of attempts

84    @property
85    def jobs(self) -> Iterable[AsyncJob]:
86        return self._attempts_per_job.keys()
stream_slice: airbyte_cdk.StreamSlice
88    @property
89    def stream_slice(self) -> StreamSlice:
90        return self._stream_slice
 92    @property
 93    def status(self) -> AsyncJobStatus:
 94        """
 95        Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed
 96        or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).
 97        """
 98        statuses = set(map(lambda job: job.status(), self.jobs))
 99        if statuses == {AsyncJobStatus.COMPLETED}:
100            return AsyncJobStatus.COMPLETED
101        elif statuses == {AsyncJobStatus.SKIPPED}:
102            return AsyncJobStatus.SKIPPED
103        elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}:
104            # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs
105            return AsyncJobStatus.COMPLETED
106        elif AsyncJobStatus.FAILED in statuses:
107            return AsyncJobStatus.FAILED
108        elif AsyncJobStatus.TIMED_OUT in statuses:
109            return AsyncJobStatus.TIMED_OUT
110        else:
111            return AsyncJobStatus.RUNNING

Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).

class LookaheadIterator(typing.Generic[~T]):
123class LookaheadIterator(Generic[T]):
124    def __init__(self, iterable: Iterable[T]) -> None:
125        self._iterator = iter(iterable)
126        self._buffer: List[T] = []
127
128    def __iter__(self) -> "LookaheadIterator[T]":
129        return self
130
131    def __next__(self) -> T:
132        if self._buffer:
133            return self._buffer.pop()
134        else:
135            return next(self._iterator)
136
137    def has_next(self) -> bool:
138        if self._buffer:
139            return True
140
141        try:
142            self._buffer = [next(self._iterator)]
143        except StopIteration:
144            return False
145        else:
146            return True
147
148    def add_at_the_beginning(self, item: T) -> None:
149        self._buffer = [item] + self._buffer

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

LookaheadIterator(iterable: Iterable[~T])
124    def __init__(self, iterable: Iterable[T]) -> None:
125        self._iterator = iter(iterable)
126        self._buffer: List[T] = []
def has_next(self) -> bool:
137    def has_next(self) -> bool:
138        if self._buffer:
139            return True
140
141        try:
142            self._buffer = [next(self._iterator)]
143        except StopIteration:
144            return False
145        else:
146            return True
def add_at_the_beginning(self, item: ~T) -> None:
148    def add_at_the_beginning(self, item: T) -> None:
149        self._buffer = [item] + self._buffer
class AsyncJobOrchestrator:
152class AsyncJobOrchestrator:
153    _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5
154    _KNOWN_JOB_STATUSES = {
155        AsyncJobStatus.COMPLETED,
156        AsyncJobStatus.FAILED,
157        AsyncJobStatus.RUNNING,
158        AsyncJobStatus.TIMED_OUT,
159        AsyncJobStatus.SKIPPED,
160    }
161    _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
162
163    def __init__(
164        self,
165        job_repository: AsyncJobRepository,
166        slices: Iterable[StreamSlice],
167        job_tracker: JobTracker,
168        message_repository: MessageRepository,
169        exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
170        has_bulk_parent: bool = False,
171        job_max_retry: Optional[int] = None,
172        failed_retry_wait_time_in_seconds: Optional[int] = None,
173    ) -> None:
174        """
175        If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
176        needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
177        where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
178        "trying to start a parent job" and "ConcurrentJobLimitReached".
179        """
180        if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
181            # this is to prevent developers updating the possible statuses without updating the logic of this class
182            raise ValueError(
183                "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
184            )
185
186        if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
187            raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")
188
189        self._job_repository: AsyncJobRepository = job_repository
190        self._slice_iterator = LookaheadIterator(slices)
191        self._running_partitions: List[AsyncPartition] = []
192        self._job_tracker = job_tracker
193        self._message_repository = message_repository
194        self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
195        self._has_bulk_parent = has_bulk_parent
196        self._job_max_retry = job_max_retry
197        self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds
198
199        self._non_breaking_exceptions: List[Exception] = []
200
201    def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
202        failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
203        jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs]
204        for job in jobs_to_replace:
205            if (
206                self._failed_retry_wait_time_in_seconds is not None
207                and job.status() == AsyncJobStatus.FAILED
208                and not job.is_creation_failure()
209            ):
210                if not job.ready_to_retry():
211                    lazy_log(
212                        LOGGER,
213                        logging.DEBUG,
214                        lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.",
215                    )
216                    continue
217                if not job.retry_deferred():
218                    job.set_retry_after(
219                        datetime.now(tz=timezone.utc)
220                        + timedelta(seconds=self._failed_retry_wait_time_in_seconds)
221                    )
222                    lazy_log(
223                        LOGGER,
224                        logging.INFO,
225                        lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.",
226                    )
227                    continue
228            new_job = self._start_job(job.job_parameters(), job.api_job_id())
229            partition.replace_job(job, [new_job])
230
231    def _start_jobs(self) -> None:
232        """
233        Retry failed jobs and start jobs for each slice in the slice iterator.
234        This method iterates over the running jobs and slice iterator and starts a job for each slice.
235        The started jobs are added to the running partitions.
236        Returns:
237            None
238
239        However, the first iteration is for sendgrid which only has one job.
240        """
241        at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False
242        _slice = None
243        try:
244            for partition in self._running_partitions:
245                self._replace_failed_jobs(partition)
246
247            if (
248                self._has_bulk_parent
249                and self._running_partitions
250                and self._slice_iterator.has_next()
251            ):
252                LOGGER.debug(
253                    "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget"
254                )
255                return
256
257            for _slice in self._slice_iterator:
258                at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
259                job = self._start_job(_slice)
260                self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
261                if self._has_bulk_parent and self._slice_iterator.has_next():
262                    break
263        except ConcurrentJobLimitReached:
264            if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration:
265                # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator
266                self._slice_iterator.add_at_the_beginning(_slice)  # type: ignore  # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop
267            LOGGER.debug(
268                "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..."
269            )
270
271    def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob:
272        if previous_job_id:
273            id_to_replace = previous_job_id
274            lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...")
275        else:
276            id_to_replace = self._job_tracker.try_to_get_intent()
277
278        try:
279            job = self._job_repository.start(_slice)
280            self._job_tracker.add_job(id_to_replace, job.api_job_id())
281            return job
282        except Exception as exception:
283            LOGGER.warning(f"Exception has occurred during job creation: {exception}")
284            if self._is_breaking_exception(exception):
285                self._job_tracker.remove_job(id_to_replace)
286                raise exception
287            return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace)
288
289    def _keep_api_budget_with_failed_job(
290        self, _slice: StreamSlice, exception: Exception, intent: str
291    ) -> AsyncJob:
292        """
293        We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job
294        as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for
295        retrying jobs that couldn't be started.
296        """
297        LOGGER.warning(
298            f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}"
299        )
300        traced_exception = (
301            exception
302            if isinstance(exception, AirbyteTracedException)
303            else AirbyteTracedException.from_exception(exception)
304        )
305        # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate,
306        # we would keep the exceptions in-memory until we know that we have reached the max attempt.
307        self._message_repository.emit_message(traced_exception.as_airbyte_message())
308        job = self._create_failed_job(_slice)
309        self._job_tracker.add_job(intent, job.api_job_id())
310        return job
311
312    def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
313        job = AsyncJob(
314            f"{uuid.uuid4()} - Job that could not start",
315            stream_slice,
316            _NO_TIMEOUT,
317            is_creation_failure=True,
318        )
319        job.update_status(AsyncJobStatus.FAILED)
320        return job
321
322    def _get_running_jobs(self) -> Set[AsyncJob]:
323        """
324        Returns a set of running AsyncJob objects.
325
326        Returns:
327            Set[AsyncJob]: A set of AsyncJob objects that are currently running.
328        """
329        return {
330            job
331            for partition in self._running_partitions
332            for job in partition.jobs
333            if job.status() == AsyncJobStatus.RUNNING
334        }
335
336    def _update_jobs_status(self) -> None:
337        """
338        Update the status of all running jobs in the repository.
339        """
340        running_jobs = self._get_running_jobs()
341        if running_jobs:
342            # update the status only if there are RUNNING jobs
343            self._job_repository.update_jobs_status(running_jobs)
344
345    def _wait_on_status_update(self) -> None:
346        """
347        Waits for a specified amount of time between status updates.
348
349
350        This method is used to introduce a delay between status updates in order to avoid excessive polling.
351        The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`.
352
353        Returns:
354            None
355        """
356        lazy_log(
357            LOGGER,
358            logging.DEBUG,
359            lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.",
360        )
361
362        lazy_log(
363            LOGGER,
364            logging.DEBUG,
365            lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...",
366        )
367        time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS)
368
369    def _process_completed_partition(self, partition: AsyncPartition) -> None:
370        """
371        Process a completed partition.
372        Args:
373            partition (AsyncPartition): The completed partition to process.
374        """
375        job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs}))
376        LOGGER.info(
377            f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}."
378        )
379
380        # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs
381        # but won't be able to as all jobs slots are taken even though job is done.
382        for job in partition.jobs:
383            self._job_tracker.remove_job(job.api_job_id())
384
385    def _process_skipped_partition(self, partition: AsyncPartition) -> None:
386        """
387        Process a skipped partition. The API indicated there is no data to return for this job
388        (e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job
389        allocation without fetching any records or raising errors.
390        """
391        job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs}))
392        LOGGER.info(
393            f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}."
394        )
395        for job in partition.jobs:
396            self._job_tracker.remove_job(job.api_job_id())
397
398    def _process_running_partitions_and_yield_completed_ones(
399        self,
400    ) -> Generator[AsyncPartition, Any, None]:
401        """
402        Process the running partitions.
403
404        Yields:
405            AsyncPartition: The processed partition.
406
407        Raises:
408            Any: Any exception raised during processing.
409        """
410        current_running_partitions: List[AsyncPartition] = []
411        for partition in self._running_partitions:
412            match partition.status:
413                case AsyncJobStatus.COMPLETED:
414                    self._process_completed_partition(partition)
415                    yield partition
416                case AsyncJobStatus.SKIPPED:
417                    self._process_skipped_partition(partition)
418                case AsyncJobStatus.RUNNING:
419                    current_running_partitions.append(partition)
420                case _ if partition.has_reached_max_attempt():
421                    self._stop_partition(partition)
422                    self._process_partitions_with_errors(partition)
423                case _:
424                    self._stop_timed_out_jobs(partition)
425                    # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated
426                    self._reallocate_partition(current_running_partitions, partition)
427
428            # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
429            self._remove_completed_jobs(partition)
430
431        # update the referenced list with running partitions
432        self._running_partitions = current_running_partitions
433
434    def _stop_partition(self, partition: AsyncPartition) -> None:
435        for job in partition.jobs:
436            if job.status() in _API_SIDE_RUNNING_STATUS:
437                self._abort_job(job, free_job_allocation=True)
438            else:
439                self._job_tracker.remove_job(job.api_job_id())
440
441    def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
442        for job in partition.jobs:
443            if job.status() == AsyncJobStatus.TIMED_OUT:
444                self._abort_job(job, free_job_allocation=False)
445
446    def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
447        try:
448            self._job_repository.abort(job)
449            if free_job_allocation:
450                self._job_tracker.remove_job(job.api_job_id())
451        except Exception as exception:
452            LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")
453
454    def _remove_completed_jobs(self, partition: AsyncPartition) -> None:
455        """
456        Remove completed or timed out jobs from the partition.
457
458        Args:
459            partition (AsyncPartition): The partition to process.
460        """
461        for job in partition.jobs:
462            if job.status() == AsyncJobStatus.COMPLETED:
463                self._job_tracker.remove_job(job.api_job_id())
464
465    def _reallocate_partition(
466        self,
467        current_running_partitions: List[AsyncPartition],
468        partition: AsyncPartition,
469    ) -> None:
470        """
471        Reallocate the partition by starting a new job for each job in the
472        partition.
473        Args:
474            current_running_partitions (list): The list of currently running partitions.
475            partition (AsyncPartition): The partition to reallocate.
476        """
477        current_running_partitions.insert(0, partition)
478
479    def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
480        """
481        Process a partition with status errors (FAILED and TIMEOUT).
482
483        Args:
484            partition (AsyncPartition): The partition to process.
485        Returns:
486            AirbyteTracedException: An exception indicating that at least one job could not be completed.
487        Raises:
488            AirbyteTracedException: If at least one job could not be completed.
489        """
490        status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs}
491        self._non_breaking_exceptions.append(
492            AirbyteTracedException(
493                message="Async job failed after exhausting all retry attempts.",
494                internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
495                failure_type=FailureType.system_error,
496            )
497        )
498
499    def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
500        """
501        Creates and retrieves completed partitions.
502        This method continuously starts jobs, updates job status, processes running partitions,
503        logs polling partitions, and waits for status updates. It yields completed partitions
504        as they become available.
505
506        Returns:
507            An iterable of completed partitions, represented as AsyncPartition objects.
508            Each partition is wrapped in an Optional, allowing for None values.
509        """
510        while True:
511            try:
512                lazy_log(
513                    LOGGER,
514                    logging.DEBUG,
515                    lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop",
516                )
517                self._start_jobs()
518                if not self._slice_iterator.has_next() and not self._running_partitions:
519                    break
520
521                self._update_jobs_status()
522                yield from self._process_running_partitions_and_yield_completed_ones()
523                self._wait_on_status_update()
524            except Exception as exception:
525                LOGGER.warning(
526                    f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
527                )
528                if self._is_breaking_exception(exception):
529                    self._abort_all_running_jobs()
530                    raise exception
531
532                self._non_breaking_exceptions.append(exception)
533
534        LOGGER.info(
535            f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}"
536        )
537        if self._non_breaking_exceptions:
538            # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
539            # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
540            raise AirbyteTracedException(
541                message="One or more async jobs failed after exhausting all retry attempts.",
542                internal_message="\n".join(
543                    [
544                        filter_secrets(exception.__repr__())
545                        for exception in self._non_breaking_exceptions
546                    ]
547                ),
548                failure_type=FailureType.system_error,
549            )
550
551    def _handle_non_breaking_error(self, exception: Exception) -> None:
552        LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")
553        self._non_breaking_exceptions.append(exception)
554
555    def _abort_all_running_jobs(self) -> None:
556        for partition in self._running_partitions:
557            for job in partition.jobs:
558                if job.status() in self._RUNNING_ON_API_SIDE_STATUS:
559                    self._abort_job(job, free_job_allocation=True)
560                self._job_tracker.remove_job(job.api_job_id())
561
562        self._running_partitions = []
563
564    def _is_breaking_exception(self, exception: Exception) -> bool:
565        return isinstance(exception, self._exceptions_to_break_on) or (
566            isinstance(exception, AirbyteTracedException)
567            and exception.failure_type == FailureType.config_error
568        )
569
570    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
571        """
572        Fetches records from the given jobs.
573
574        Args:
575            async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
576
577        Yields:
578            Iterable[Mapping[str, Any]]: The fetched records from the jobs.
579        """
580        for job in async_jobs:
581            yield from self._job_repository.fetch_records(job)
582            self._job_repository.delete(job)
AsyncJobOrchestrator( job_repository: airbyte_cdk.sources.declarative.async_job.repository.AsyncJobRepository, slices: Iterable[airbyte_cdk.StreamSlice], job_tracker: airbyte_cdk.sources.declarative.async_job.job_tracker.JobTracker, message_repository: airbyte_cdk.MessageRepository, exceptions_to_break_on: Iterable[Type[Exception]] = (), has_bulk_parent: bool = False, job_max_retry: Optional[int] = None, failed_retry_wait_time_in_seconds: Optional[int] = None)
163    def __init__(
164        self,
165        job_repository: AsyncJobRepository,
166        slices: Iterable[StreamSlice],
167        job_tracker: JobTracker,
168        message_repository: MessageRepository,
169        exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
170        has_bulk_parent: bool = False,
171        job_max_retry: Optional[int] = None,
172        failed_retry_wait_time_in_seconds: Optional[int] = None,
173    ) -> None:
174        """
175        If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
176        needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
177        where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
178        "trying to start a parent job" and "ConcurrentJobLimitReached".
179        """
180        if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
181            # this is to prevent developers updating the possible statuses without updating the logic of this class
182            raise ValueError(
183                "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
184            )
185
186        if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0:
187            raise ValueError("failed_retry_wait_time_in_seconds must be >= 1")
188
189        self._job_repository: AsyncJobRepository = job_repository
190        self._slice_iterator = LookaheadIterator(slices)
191        self._running_partitions: List[AsyncPartition] = []
192        self._job_tracker = job_tracker
193        self._message_repository = message_repository
194        self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
195        self._has_bulk_parent = has_bulk_parent
196        self._job_max_retry = job_max_retry
197        self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds
198
199        self._non_breaking_exceptions: List[Exception] = []

If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, has_bulk_parent needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of "trying to start a parent job" and "ConcurrentJobLimitReached".

def create_and_get_completed_partitions( self) -> Iterable[AsyncPartition]:
499    def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
500        """
501        Creates and retrieves completed partitions.
502        This method continuously starts jobs, updates job status, processes running partitions,
503        logs polling partitions, and waits for status updates. It yields completed partitions
504        as they become available.
505
506        Returns:
507            An iterable of completed partitions, represented as AsyncPartition objects.
508            Each partition is wrapped in an Optional, allowing for None values.
509        """
510        while True:
511            try:
512                lazy_log(
513                    LOGGER,
514                    logging.DEBUG,
515                    lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop",
516                )
517                self._start_jobs()
518                if not self._slice_iterator.has_next() and not self._running_partitions:
519                    break
520
521                self._update_jobs_status()
522                yield from self._process_running_partitions_and_yield_completed_ones()
523                self._wait_on_status_update()
524            except Exception as exception:
525                LOGGER.warning(
526                    f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
527                )
528                if self._is_breaking_exception(exception):
529                    self._abort_all_running_jobs()
530                    raise exception
531
532                self._non_breaking_exceptions.append(exception)
533
534        LOGGER.info(
535            f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}"
536        )
537        if self._non_breaking_exceptions:
538            # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
539            # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
540            raise AirbyteTracedException(
541                message="One or more async jobs failed after exhausting all retry attempts.",
542                internal_message="\n".join(
543                    [
544                        filter_secrets(exception.__repr__())
545                        for exception in self._non_breaking_exceptions
546                    ]
547                ),
548                failure_type=FailureType.system_error,
549            )

Creates and retrieves completed partitions. This method continuously starts jobs, updates job status, processes running partitions, logs polling partitions, and waits for status updates. It yields completed partitions as they become available.

Returns:

An iterable of completed partitions, represented as AsyncPartition objects. Each partition is wrapped in an Optional, allowing for None values.

def fetch_records( self, async_jobs: Iterable[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> Iterable[Mapping[str, Any]]:
570    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
571        """
572        Fetches records from the given jobs.
573
574        Args:
575            async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
576
577        Yields:
578            Iterable[Mapping[str, Any]]: The fetched records from the jobs.
579        """
580        for job in async_jobs:
581            yield from self._job_repository.fetch_records(job)
582            self._job_repository.delete(job)

Fetches records from the given jobs.

Arguments:
  • async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
Yields:

Iterable[Mapping[str, Any]]: The fetched records from the jobs.