
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  3import logging
  4import threading
  5import time
  6import traceback
  7import uuid
  8from datetime import timedelta
  9from typing import (
 10    Any,
 11    Generator,
 12    Generic,
 13    Iterable,
 14    List,
 15    Mapping,
 16    Optional,
 17    Set,
 18    Tuple,
 19    Type,
 20    TypeVar,
 23from airbyte_cdk.logger import lazy_log
 24from airbyte_cdk.models import FailureType
 25from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
 26from airbyte_cdk.sources.declarative.async_job.job_tracker import (
 27    ConcurrentJobLimitReached,
 28    JobTracker,
 30from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
 31from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
 32from airbyte_cdk.sources.message import MessageRepository
 33from airbyte_cdk.sources.types import StreamSlice
 34from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 35from airbyte_cdk.utils.traced_exception import AirbyteTracedException
 37LOGGER = logging.getLogger("airbyte")
 38_NO_TIMEOUT = timedelta.max
 42class AsyncPartition:
 43    """
 44    This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
 45    """
 49    def __init__(
 50        self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
 51    ) -> None:
 52        self._attempts_per_job = {job: 1 for job in jobs}
 53        self._stream_slice = stream_slice
 54        self._job_max_retry = (
 55            job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
 56        )
 58    def has_reached_max_attempt(self) -> bool:
 59        return any(
 60            map(
 61                lambda attempt_count: attempt_count >= self._job_max_retry,
 62                self._attempts_per_job.values(),
 63            )
 64        )
 66    def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None:
 67        current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
 68        if current_attempt_count is None:
 69            raise ValueError("Could not find job to replace")
 70        elif current_attempt_count >= self._job_max_retry:
 71            raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
 73        new_attempt_count = current_attempt_count + 1
 74        for job in new_jobs:
 75            self._attempts_per_job[job] = new_attempt_count
 77    def should_split(self, job: AsyncJob) -> bool:
 78        """
 79        Not used right now but once we support job split, we should split based on the number of attempts
 80        """
 81        return False
 83    @property
 84    def jobs(self) -> Iterable[AsyncJob]:
 85        return self._attempts_per_job.keys()
 87    @property
 88    def stream_slice(self) -> StreamSlice:
 89        return self._stream_slice
 91    @property
 92    def status(self) -> AsyncJobStatus:
 93        """
 94        Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.
 95        """
 96        statuses = set(map(lambda job: job.status(),
 97        if statuses == {AsyncJobStatus.COMPLETED}:
 98            return AsyncJobStatus.COMPLETED
 99        elif AsyncJobStatus.FAILED in statuses:
100            return AsyncJobStatus.FAILED
101        elif AsyncJobStatus.TIMED_OUT in statuses:
102            return AsyncJobStatus.TIMED_OUT
103        else:
104            return AsyncJobStatus.RUNNING
106    def __repr__(self) -> str:
107        return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})"
109    def __json_serializable__(self) -> Any:
110        return self._stream_slice
113T = TypeVar("T")
116class LookaheadIterator(Generic[T]):
117    def __init__(self, iterable: Iterable[T]) -> None:
118        self._iterator = iter(iterable)
119        self._buffer: List[T] = []
121    def __iter__(self) -> "LookaheadIterator[T]":
122        return self
124    def __next__(self) -> T:
125        if self._buffer:
126            return self._buffer.pop()
127        else:
128            return next(self._iterator)
130    def has_next(self) -> bool:
131        if self._buffer:
132            return True
134        try:
135            self._buffer = [next(self._iterator)]
136        except StopIteration:
137            return False
138        else:
139            return True
141    def add_at_the_beginning(self, item: T) -> None:
142        self._buffer = [item] + self._buffer
145class AsyncJobOrchestrator:
148        AsyncJobStatus.COMPLETED,
149        AsyncJobStatus.FAILED,
150        AsyncJobStatus.RUNNING,
151        AsyncJobStatus.TIMED_OUT,
152    }
153    _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
155    def __init__(
156        self,
157        job_repository: AsyncJobRepository,
158        slices: Iterable[StreamSlice],
159        job_tracker: JobTracker,
160        message_repository: MessageRepository,
161        exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
162        has_bulk_parent: bool = False,
163        job_max_retry: Optional[int] = None,
164    ) -> None:
165        """
166        If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
167        needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
168        where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
169        "trying to start a parent job" and "ConcurrentJobLimitReached".
170        """
171        if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
172            # this is to prevent developers updating the possible statuses without updating the logic of this class
173            raise ValueError(
174                "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
175            )
177        self._job_repository: AsyncJobRepository = job_repository
178        self._slice_iterator = LookaheadIterator(slices)
179        self._running_partitions: List[AsyncPartition] = []
180        self._job_tracker = job_tracker
181        self._message_repository = message_repository
182        self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
183        self._has_bulk_parent = has_bulk_parent
184        self._job_max_retry = job_max_retry
186        self._non_breaking_exceptions: List[Exception] = []
188    def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
189        failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
190        jobs_to_replace = [job for job in if job.status() in failed_status_jobs]
191        for job in jobs_to_replace:
192            new_job = self._start_job(job.job_parameters(), job.api_job_id())
193            partition.replace_job(job, [new_job])
195    def _start_jobs(self) -> None:
196        """
197        Retry failed jobs and start jobs for each slice in the slice iterator.
198        This method iterates over the running jobs and slice iterator and starts a job for each slice.
199        The started jobs are added to the running partitions.
200        Returns:
201            None
203        However, the first iteration is for sendgrid which only has one job.
204        """
205        at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False
206        _slice = None
207        try:
208            for partition in self._running_partitions:
209                self._replace_failed_jobs(partition)
211            if (
212                self._has_bulk_parent
213                and self._running_partitions
214                and self._slice_iterator.has_next()
215            ):
216                LOGGER.debug(
217                    "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget"
218                )
219                return
221            for _slice in self._slice_iterator:
222                at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
223                job = self._start_job(_slice)
224                self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
225                if self._has_bulk_parent and self._slice_iterator.has_next():
226                    break
227        except ConcurrentJobLimitReached:
228            if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration:
229                # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator
230                self._slice_iterator.add_at_the_beginning(_slice)  # type: ignore  # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop
231            LOGGER.debug(
232                "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..."
233            )
235    def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob:
236        if previous_job_id:
237            id_to_replace = previous_job_id
238            lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...")
239        else:
240            id_to_replace = self._job_tracker.try_to_get_intent()
242        try:
243            job = self._job_repository.start(_slice)
244            self._job_tracker.add_job(id_to_replace, job.api_job_id())
245            return job
246        except Exception as exception:
247            LOGGER.warning(f"Exception has occurred during job creation: {exception}")
248            if self._is_breaking_exception(exception):
249                self._job_tracker.remove_job(id_to_replace)
250                raise exception
251            return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace)
253    def _keep_api_budget_with_failed_job(
254        self, _slice: StreamSlice, exception: Exception, intent: str
255    ) -> AsyncJob:
256        """
257        We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job
258        as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for
259        retrying jobs that couldn't be started.
260        """
261        LOGGER.warning(
262            f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}"
263        )
264        traced_exception = (
265            exception
266            if isinstance(exception, AirbyteTracedException)
267            else AirbyteTracedException.from_exception(exception)
268        )
269        # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate,
270        # we would keep the exceptions in-memory until we know that we have reached the max attempt.
271        self._message_repository.emit_message(traced_exception.as_airbyte_message())
272        job = self._create_failed_job(_slice)
273        self._job_tracker.add_job(intent, job.api_job_id())
274        return job
276    def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
277        job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT)
278        job.update_status(AsyncJobStatus.FAILED)
279        return job
281    def _get_running_jobs(self) -> Set[AsyncJob]:
282        """
283        Returns a set of running AsyncJob objects.
285        Returns:
286            Set[AsyncJob]: A set of AsyncJob objects that are currently running.
287        """
288        return {
289            job
290            for partition in self._running_partitions
291            for job in
292            if job.status() == AsyncJobStatus.RUNNING
293        }
295    def _update_jobs_status(self) -> None:
296        """
297        Update the status of all running jobs in the repository.
298        """
299        running_jobs = self._get_running_jobs()
300        if running_jobs:
301            # update the status only if there are RUNNING jobs
302            self._job_repository.update_jobs_status(running_jobs)
304    def _wait_on_status_update(self) -> None:
305        """
306        Waits for a specified amount of time between status updates.
309        This method is used to introduce a delay between status updates in order to avoid excessive polling.
310        The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`.
312        Returns:
313            None
314        """
315        lazy_log(
316            LOGGER,
317            logging.DEBUG,
318            lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.",
319        )
321        lazy_log(
322            LOGGER,
323            logging.DEBUG,
324            lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...",
325        )
328    def _process_completed_partition(self, partition: AsyncPartition) -> None:
329        """
330        Process a completed partition.
331        Args:
332            partition (AsyncPartition): The completed partition to process.
333        """
334        job_ids = list(map(lambda job: job.api_job_id(), {job for job in}))
336            f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}."
337        )
339        # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs
340        # but won't be able to as all jobs slots are taken even though job is done.
341        for job in
342            self._job_tracker.remove_job(job.api_job_id())
344    def _process_running_partitions_and_yield_completed_ones(
345        self,
346    ) -> Generator[AsyncPartition, Any, None]:
347        """
348        Process the running partitions.
350        Yields:
351            AsyncPartition: The processed partition.
353        Raises:
354            Any: Any exception raised during processing.
355        """
356        current_running_partitions: List[AsyncPartition] = []
357        for partition in self._running_partitions:
358            match partition.status:
359                case AsyncJobStatus.COMPLETED:
360                    self._process_completed_partition(partition)
361                    yield partition
362                case AsyncJobStatus.RUNNING:
363                    current_running_partitions.append(partition)
364                case _ if partition.has_reached_max_attempt():
365                    self._stop_partition(partition)
366                    self._process_partitions_with_errors(partition)
367                case _:
368                    self._stop_timed_out_jobs(partition)
369                    # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated
370                    self._reallocate_partition(current_running_partitions, partition)
372            # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
373            self._remove_completed_jobs(partition)
375        # update the referenced list with running partitions
376        self._running_partitions = current_running_partitions
378    def _stop_partition(self, partition: AsyncPartition) -> None:
379        for job in
380            if job.status() in _API_SIDE_RUNNING_STATUS:
381                self._abort_job(job, free_job_allocation=True)
382            else:
383                self._job_tracker.remove_job(job.api_job_id())
385    def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
386        for job in
387            if job.status() == AsyncJobStatus.TIMED_OUT:
388                self._abort_job(job, free_job_allocation=False)
390    def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
391        try:
392            self._job_repository.abort(job)
393            if free_job_allocation:
394                self._job_tracker.remove_job(job.api_job_id())
395        except Exception as exception:
396            LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")
398    def _remove_completed_jobs(self, partition: AsyncPartition) -> None:
399        """
400        Remove completed or timed out jobs from the partition.
402        Args:
403            partition (AsyncPartition): The partition to process.
404        """
405        for job in
406            if job.status() == AsyncJobStatus.COMPLETED:
407                self._job_tracker.remove_job(job.api_job_id())
409    def _reallocate_partition(
410        self,
411        current_running_partitions: List[AsyncPartition],
412        partition: AsyncPartition,
413    ) -> None:
414        """
415        Reallocate the partition by starting a new job for each job in the
416        partition.
417        Args:
418            current_running_partitions (list): The list of currently running partitions.
419            partition (AsyncPartition): The partition to reallocate.
420        """
421        current_running_partitions.insert(0, partition)
423    def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
424        """
425        Process a partition with status errors (FAILED and TIMEOUT).
427        Args:
428            partition (AsyncPartition): The partition to process.
429        Returns:
430            AirbyteTracedException: An exception indicating that at least one job could not be completed.
431        Raises:
432            AirbyteTracedException: If at least one job could not be completed.
433        """
434        status_by_job_id = {job.api_job_id(): job.status() for job in}
435        self._non_breaking_exceptions.append(
436            AirbyteTracedException(
437                internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
438                failure_type=FailureType.config_error,
439            )
440        )
442    def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
443        """
444        Creates and retrieves completed partitions.
445        This method continuously starts jobs, updates job status, processes running partitions,
446        logs polling partitions, and waits for status updates. It yields completed partitions
447        as they become available.
449        Returns:
450            An iterable of completed partitions, represented as AsyncPartition objects.
451            Each partition is wrapped in an Optional, allowing for None values.
452        """
453        while True:
454            try:
455                lazy_log(
456                    LOGGER,
457                    logging.DEBUG,
458                    lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop",
459                )
460                self._start_jobs()
461                if not self._slice_iterator.has_next() and not self._running_partitions:
462                    break
464                self._update_jobs_status()
465                yield from self._process_running_partitions_and_yield_completed_ones()
466                self._wait_on_status_update()
467            except Exception as exception:
468                LOGGER.warning(
469                    f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
470                )
471                if self._is_breaking_exception(exception):
472                    self._abort_all_running_jobs()
473                    raise exception
475                self._non_breaking_exceptions.append(exception)
478            f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}"
479        )
480        if self._non_breaking_exceptions:
481            # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
482            # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
483            raise AirbyteTracedException(
484                message="",
485                internal_message="\n".join(
486                    [
487                        filter_secrets(exception.__repr__())
488                        for exception in self._non_breaking_exceptions
489                    ]
490                ),
491                failure_type=FailureType.config_error,
492            )
494    def _handle_non_breaking_error(self, exception: Exception) -> None:
495        LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")
496        self._non_breaking_exceptions.append(exception)
498    def _abort_all_running_jobs(self) -> None:
499        for partition in self._running_partitions:
500            for job in
501                if job.status() in self._RUNNING_ON_API_SIDE_STATUS:
502                    self._abort_job(job, free_job_allocation=True)
503                self._job_tracker.remove_job(job.api_job_id())
505        self._running_partitions = []
507    def _is_breaking_exception(self, exception: Exception) -> bool:
508        return isinstance(exception, self._exceptions_to_break_on) or (
509            isinstance(exception, AirbyteTracedException)
510            and exception.failure_type == FailureType.config_error
511        )
513    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
514        """
515        Fetches records from the given jobs.
517        Args:
518            async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
520        Yields:
521            Iterable[Mapping[str, Any]]: The fetched records from the jobs.
522        """
523        for job in async_jobs:
524            yield from self._job_repository.fetch_records(job)
525            self._job_repository.delete(job)
LOGGER = <Logger airbyte (INFO)>
class AsyncPartition:
 43class AsyncPartition:
 44    """
 45    This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
 46    """
 50    def __init__(
 51        self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
 52    ) -> None:
 53        self._attempts_per_job = {job: 1 for job in jobs}
 54        self._stream_slice = stream_slice
 55        self._job_max_retry = (
 56            job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
 57        )
 59    def has_reached_max_attempt(self) -> bool:
 60        return any(
 61            map(
 62                lambda attempt_count: attempt_count >= self._job_max_retry,
 63                self._attempts_per_job.values(),
 64            )
 65        )
 67    def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None:
 68        current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
 69        if current_attempt_count is None:
 70            raise ValueError("Could not find job to replace")
 71        elif current_attempt_count >= self._job_max_retry:
 72            raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
 74        new_attempt_count = current_attempt_count + 1
 75        for job in new_jobs:
 76            self._attempts_per_job[job] = new_attempt_count
 78    def should_split(self, job: AsyncJob) -> bool:
 79        """
 80        Not used right now but once we support job split, we should split based on the number of attempts
 81        """
 82        return False
 84    @property
 85    def jobs(self) -> Iterable[AsyncJob]:
 86        return self._attempts_per_job.keys()
 88    @property
 89    def stream_slice(self) -> StreamSlice:
 90        return self._stream_slice
 92    @property
 93    def status(self) -> AsyncJobStatus:
 94        """
 95        Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.
 96        """
 97        statuses = set(map(lambda job: job.status(),
 98        if statuses == {AsyncJobStatus.COMPLETED}:
 99            return AsyncJobStatus.COMPLETED
100        elif AsyncJobStatus.FAILED in statuses:
101            return AsyncJobStatus.FAILED
102        elif AsyncJobStatus.TIMED_OUT in statuses:
103            return AsyncJobStatus.TIMED_OUT
104        else:
105            return AsyncJobStatus.RUNNING
107    def __repr__(self) -> str:
108        return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})"
110    def __json_serializable__(self) -> Any:
111        return self._stream_slice

This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs

AsyncPartition( jobs: List[airbyte_cdk.sources.declarative.async_job.job.AsyncJob], stream_slice: airbyte_cdk.StreamSlice, job_max_retry: Optional[int] = None)
50    def __init__(
51        self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None
52    ) -> None:
53        self._attempts_per_job = {job: 1 for job in jobs}
54        self._stream_slice = stream_slice
55        self._job_max_retry = (
56            job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY
57        )
def has_reached_max_attempt(self) -> bool:
59    def has_reached_max_attempt(self) -> bool:
60        return any(
61            map(
62                lambda attempt_count: attempt_count >= self._job_max_retry,
63                self._attempts_per_job.values(),
64            )
65        )
def replace_job( self, job_to_replace: airbyte_cdk.sources.declarative.async_job.job.AsyncJob, new_jobs: List[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> None:
67    def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None:
68        current_attempt_count = self._attempts_per_job.pop(job_to_replace, None)
69        if current_attempt_count is None:
70            raise ValueError("Could not find job to replace")
71        elif current_attempt_count >= self._job_max_retry:
72            raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}")
74        new_attempt_count = current_attempt_count + 1
75        for job in new_jobs:
76            self._attempts_per_job[job] = new_attempt_count
def should_split( self, job: airbyte_cdk.sources.declarative.async_job.job.AsyncJob) -> bool:
78    def should_split(self, job: AsyncJob) -> bool:
79        """
80        Not used right now but once we support job split, we should split based on the number of attempts
81        """
82        return False

Not used right now but once we support job split, we should split based on the number of attempts

84    @property
85    def jobs(self) -> Iterable[AsyncJob]:
86        return self._attempts_per_job.keys()
stream_slice: airbyte_cdk.StreamSlice
88    @property
89    def stream_slice(self) -> StreamSlice:
90        return self._stream_slice
 92    @property
 93    def status(self) -> AsyncJobStatus:
 94        """
 95        Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.
 96        """
 97        statuses = set(map(lambda job: job.status(),
 98        if statuses == {AsyncJobStatus.COMPLETED}:
 99            return AsyncJobStatus.COMPLETED
100        elif AsyncJobStatus.FAILED in statuses:
101            return AsyncJobStatus.FAILED
102        elif AsyncJobStatus.TIMED_OUT in statuses:
103            return AsyncJobStatus.TIMED_OUT
104        else:
105            return AsyncJobStatus.RUNNING

Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.

class LookaheadIterator(typing.Generic[~T]):
117class LookaheadIterator(Generic[T]):
118    def __init__(self, iterable: Iterable[T]) -> None:
119        self._iterator = iter(iterable)
120        self._buffer: List[T] = []
122    def __iter__(self) -> "LookaheadIterator[T]":
123        return self
125    def __next__(self) -> T:
126        if self._buffer:
127            return self._buffer.pop()
128        else:
129            return next(self._iterator)
131    def has_next(self) -> bool:
132        if self._buffer:
133            return True
135        try:
136            self._buffer = [next(self._iterator)]
137        except StopIteration:
138            return False
139        else:
140            return True
142    def add_at_the_beginning(self, item: T) -> None:
143        self._buffer = [item] + self._buffer

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

LookaheadIterator(iterable: Iterable[~T])
118    def __init__(self, iterable: Iterable[T]) -> None:
119        self._iterator = iter(iterable)
120        self._buffer: List[T] = []
def has_next(self) -> bool:
131    def has_next(self) -> bool:
132        if self._buffer:
133            return True
135        try:
136            self._buffer = [next(self._iterator)]
137        except StopIteration:
138            return False
139        else:
140            return True
def add_at_the_beginning(self, item: ~T) -> None:
142    def add_at_the_beginning(self, item: T) -> None:
143        self._buffer = [item] + self._buffer
class AsyncJobOrchestrator:
146class AsyncJobOrchestrator:
149        AsyncJobStatus.COMPLETED,
150        AsyncJobStatus.FAILED,
151        AsyncJobStatus.RUNNING,
152        AsyncJobStatus.TIMED_OUT,
153    }
154    _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT}
156    def __init__(
157        self,
158        job_repository: AsyncJobRepository,
159        slices: Iterable[StreamSlice],
160        job_tracker: JobTracker,
161        message_repository: MessageRepository,
162        exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
163        has_bulk_parent: bool = False,
164        job_max_retry: Optional[int] = None,
165    ) -> None:
166        """
167        If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
168        needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
169        where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
170        "trying to start a parent job" and "ConcurrentJobLimitReached".
171        """
172        if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
173            # this is to prevent developers updating the possible statuses without updating the logic of this class
174            raise ValueError(
175                "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
176            )
178        self._job_repository: AsyncJobRepository = job_repository
179        self._slice_iterator = LookaheadIterator(slices)
180        self._running_partitions: List[AsyncPartition] = []
181        self._job_tracker = job_tracker
182        self._message_repository = message_repository
183        self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
184        self._has_bulk_parent = has_bulk_parent
185        self._job_max_retry = job_max_retry
187        self._non_breaking_exceptions: List[Exception] = []
189    def _replace_failed_jobs(self, partition: AsyncPartition) -> None:
190        failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT)
191        jobs_to_replace = [job for job in if job.status() in failed_status_jobs]
192        for job in jobs_to_replace:
193            new_job = self._start_job(job.job_parameters(), job.api_job_id())
194            partition.replace_job(job, [new_job])
196    def _start_jobs(self) -> None:
197        """
198        Retry failed jobs and start jobs for each slice in the slice iterator.
199        This method iterates over the running jobs and slice iterator and starts a job for each slice.
200        The started jobs are added to the running partitions.
201        Returns:
202            None
204        However, the first iteration is for sendgrid which only has one job.
205        """
206        at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False
207        _slice = None
208        try:
209            for partition in self._running_partitions:
210                self._replace_failed_jobs(partition)
212            if (
213                self._has_bulk_parent
214                and self._running_partitions
215                and self._slice_iterator.has_next()
216            ):
217                LOGGER.debug(
218                    "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget"
219                )
220                return
222            for _slice in self._slice_iterator:
223                at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True
224                job = self._start_job(_slice)
225                self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry))
226                if self._has_bulk_parent and self._slice_iterator.has_next():
227                    break
228        except ConcurrentJobLimitReached:
229            if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration:
230                # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator
231                self._slice_iterator.add_at_the_beginning(_slice)  # type: ignore  # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop
232            LOGGER.debug(
233                "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..."
234            )
236    def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob:
237        if previous_job_id:
238            id_to_replace = previous_job_id
239            lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...")
240        else:
241            id_to_replace = self._job_tracker.try_to_get_intent()
243        try:
244            job = self._job_repository.start(_slice)
245            self._job_tracker.add_job(id_to_replace, job.api_job_id())
246            return job
247        except Exception as exception:
248            LOGGER.warning(f"Exception has occurred during job creation: {exception}")
249            if self._is_breaking_exception(exception):
250                self._job_tracker.remove_job(id_to_replace)
251                raise exception
252            return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace)
254    def _keep_api_budget_with_failed_job(
255        self, _slice: StreamSlice, exception: Exception, intent: str
256    ) -> AsyncJob:
257        """
258        We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job
259        as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for
260        retrying jobs that couldn't be started.
261        """
262        LOGGER.warning(
263            f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}"
264        )
265        traced_exception = (
266            exception
267            if isinstance(exception, AirbyteTracedException)
268            else AirbyteTracedException.from_exception(exception)
269        )
270        # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate,
271        # we would keep the exceptions in-memory until we know that we have reached the max attempt.
272        self._message_repository.emit_message(traced_exception.as_airbyte_message())
273        job = self._create_failed_job(_slice)
274        self._job_tracker.add_job(intent, job.api_job_id())
275        return job
277    def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob:
278        job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT)
279        job.update_status(AsyncJobStatus.FAILED)
280        return job
282    def _get_running_jobs(self) -> Set[AsyncJob]:
283        """
284        Returns a set of running AsyncJob objects.
286        Returns:
287            Set[AsyncJob]: A set of AsyncJob objects that are currently running.
288        """
289        return {
290            job
291            for partition in self._running_partitions
292            for job in
293            if job.status() == AsyncJobStatus.RUNNING
294        }
296    def _update_jobs_status(self) -> None:
297        """
298        Update the status of all running jobs in the repository.
299        """
300        running_jobs = self._get_running_jobs()
301        if running_jobs:
302            # update the status only if there are RUNNING jobs
303            self._job_repository.update_jobs_status(running_jobs)
305    def _wait_on_status_update(self) -> None:
306        """
307        Waits for a specified amount of time between status updates.
310        This method is used to introduce a delay between status updates in order to avoid excessive polling.
311        The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`.
313        Returns:
314            None
315        """
316        lazy_log(
317            LOGGER,
318            logging.DEBUG,
319            lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.",
320        )
322        lazy_log(
323            LOGGER,
324            logging.DEBUG,
325            lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...",
326        )
329    def _process_completed_partition(self, partition: AsyncPartition) -> None:
330        """
331        Process a completed partition.
332        Args:
333            partition (AsyncPartition): The completed partition to process.
334        """
335        job_ids = list(map(lambda job: job.api_job_id(), {job for job in}))
337            f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}."
338        )
340        # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs
341        # but won't be able to as all jobs slots are taken even though job is done.
342        for job in
343            self._job_tracker.remove_job(job.api_job_id())
345    def _process_running_partitions_and_yield_completed_ones(
346        self,
347    ) -> Generator[AsyncPartition, Any, None]:
348        """
349        Process the running partitions.
351        Yields:
352            AsyncPartition: The processed partition.
354        Raises:
355            Any: Any exception raised during processing.
356        """
357        current_running_partitions: List[AsyncPartition] = []
358        for partition in self._running_partitions:
359            match partition.status:
360                case AsyncJobStatus.COMPLETED:
361                    self._process_completed_partition(partition)
362                    yield partition
363                case AsyncJobStatus.RUNNING:
364                    current_running_partitions.append(partition)
365                case _ if partition.has_reached_max_attempt():
366                    self._stop_partition(partition)
367                    self._process_partitions_with_errors(partition)
368                case _:
369                    self._stop_timed_out_jobs(partition)
370                    # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated
371                    self._reallocate_partition(current_running_partitions, partition)
373            # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority
374            self._remove_completed_jobs(partition)
376        # update the referenced list with running partitions
377        self._running_partitions = current_running_partitions
379    def _stop_partition(self, partition: AsyncPartition) -> None:
380        for job in
381            if job.status() in _API_SIDE_RUNNING_STATUS:
382                self._abort_job(job, free_job_allocation=True)
383            else:
384                self._job_tracker.remove_job(job.api_job_id())
386    def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None:
387        for job in
388            if job.status() == AsyncJobStatus.TIMED_OUT:
389                self._abort_job(job, free_job_allocation=False)
391    def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None:
392        try:
393            self._job_repository.abort(job)
394            if free_job_allocation:
395                self._job_tracker.remove_job(job.api_job_id())
396        except Exception as exception:
397            LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}")
399    def _remove_completed_jobs(self, partition: AsyncPartition) -> None:
400        """
401        Remove completed or timed out jobs from the partition.
403        Args:
404            partition (AsyncPartition): The partition to process.
405        """
406        for job in
407            if job.status() == AsyncJobStatus.COMPLETED:
408                self._job_tracker.remove_job(job.api_job_id())
410    def _reallocate_partition(
411        self,
412        current_running_partitions: List[AsyncPartition],
413        partition: AsyncPartition,
414    ) -> None:
415        """
416        Reallocate the partition by starting a new job for each job in the
417        partition.
418        Args:
419            current_running_partitions (list): The list of currently running partitions.
420            partition (AsyncPartition): The partition to reallocate.
421        """
422        current_running_partitions.insert(0, partition)
424    def _process_partitions_with_errors(self, partition: AsyncPartition) -> None:
425        """
426        Process a partition with status errors (FAILED and TIMEOUT).
428        Args:
429            partition (AsyncPartition): The partition to process.
430        Returns:
431            AirbyteTracedException: An exception indicating that at least one job could not be completed.
432        Raises:
433            AirbyteTracedException: If at least one job could not be completed.
434        """
435        status_by_job_id = {job.api_job_id(): job.status() for job in}
436        self._non_breaking_exceptions.append(
437            AirbyteTracedException(
438                internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.",
439                failure_type=FailureType.config_error,
440            )
441        )
443    def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
444        """
445        Creates and retrieves completed partitions.
446        This method continuously starts jobs, updates job status, processes running partitions,
447        logs polling partitions, and waits for status updates. It yields completed partitions
448        as they become available.
450        Returns:
451            An iterable of completed partitions, represented as AsyncPartition objects.
452            Each partition is wrapped in an Optional, allowing for None values.
453        """
454        while True:
455            try:
456                lazy_log(
457                    LOGGER,
458                    logging.DEBUG,
459                    lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop",
460                )
461                self._start_jobs()
462                if not self._slice_iterator.has_next() and not self._running_partitions:
463                    break
465                self._update_jobs_status()
466                yield from self._process_running_partitions_and_yield_completed_ones()
467                self._wait_on_status_update()
468            except Exception as exception:
469                LOGGER.warning(
470                    f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
471                )
472                if self._is_breaking_exception(exception):
473                    self._abort_all_running_jobs()
474                    raise exception
476                self._non_breaking_exceptions.append(exception)
479            f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}"
480        )
481        if self._non_breaking_exceptions:
482            # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
483            # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
484            raise AirbyteTracedException(
485                message="",
486                internal_message="\n".join(
487                    [
488                        filter_secrets(exception.__repr__())
489                        for exception in self._non_breaking_exceptions
490                    ]
491                ),
492                failure_type=FailureType.config_error,
493            )
495    def _handle_non_breaking_error(self, exception: Exception) -> None:
496        LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}")
497        self._non_breaking_exceptions.append(exception)
499    def _abort_all_running_jobs(self) -> None:
500        for partition in self._running_partitions:
501            for job in
502                if job.status() in self._RUNNING_ON_API_SIDE_STATUS:
503                    self._abort_job(job, free_job_allocation=True)
504                self._job_tracker.remove_job(job.api_job_id())
506        self._running_partitions = []
508    def _is_breaking_exception(self, exception: Exception) -> bool:
509        return isinstance(exception, self._exceptions_to_break_on) or (
510            isinstance(exception, AirbyteTracedException)
511            and exception.failure_type == FailureType.config_error
512        )
514    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
515        """
516        Fetches records from the given jobs.
518        Args:
519            async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
521        Yields:
522            Iterable[Mapping[str, Any]]: The fetched records from the jobs.
523        """
524        for job in async_jobs:
525            yield from self._job_repository.fetch_records(job)
526            self._job_repository.delete(job)
AsyncJobOrchestrator( job_repository: airbyte_cdk.sources.declarative.async_job.repository.AsyncJobRepository, slices: Iterable[airbyte_cdk.StreamSlice], job_tracker: airbyte_cdk.sources.declarative.async_job.job_tracker.JobTracker, message_repository: airbyte_cdk.MessageRepository, exceptions_to_break_on: Iterable[Type[Exception]] = (), has_bulk_parent: bool = False, job_max_retry: Optional[int] = None)
156    def __init__(
157        self,
158        job_repository: AsyncJobRepository,
159        slices: Iterable[StreamSlice],
160        job_tracker: JobTracker,
161        message_repository: MessageRepository,
162        exceptions_to_break_on: Iterable[Type[Exception]] = tuple(),
163        has_bulk_parent: bool = False,
164        job_max_retry: Optional[int] = None,
165    ) -> None:
166        """
167        If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent`
168        needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
169        where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
170        "trying to start a parent job" and "ConcurrentJobLimitReached".
171        """
172        if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES:
173            # this is to prevent developers updating the possible statuses without updating the logic of this class
174            raise ValueError(
175                "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES"
176            )
178        self._job_repository: AsyncJobRepository = job_repository
179        self._slice_iterator = LookaheadIterator(slices)
180        self._running_partitions: List[AsyncPartition] = []
181        self._job_tracker = job_tracker
182        self._message_repository = message_repository
183        self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on)
184        self._has_bulk_parent = has_bulk_parent
185        self._job_max_retry = job_max_retry
187        self._non_breaking_exceptions: List[Exception] = []

If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, has_bulk_parent needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of "trying to start a parent job" and "ConcurrentJobLimitReached".

def create_and_get_completed_partitions( self) -> Iterable[AsyncPartition]:
443    def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]:
444        """
445        Creates and retrieves completed partitions.
446        This method continuously starts jobs, updates job status, processes running partitions,
447        logs polling partitions, and waits for status updates. It yields completed partitions
448        as they become available.
450        Returns:
451            An iterable of completed partitions, represented as AsyncPartition objects.
452            Each partition is wrapped in an Optional, allowing for None values.
453        """
454        while True:
455            try:
456                lazy_log(
457                    LOGGER,
458                    logging.DEBUG,
459                    lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop",
460                )
461                self._start_jobs()
462                if not self._slice_iterator.has_next() and not self._running_partitions:
463                    break
465                self._update_jobs_status()
466                yield from self._process_running_partitions_and_yield_completed_ones()
467                self._wait_on_status_update()
468            except Exception as exception:
469                LOGGER.warning(
470                    f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}"
471                )
472                if self._is_breaking_exception(exception):
473                    self._abort_all_running_jobs()
474                    raise exception
476                self._non_breaking_exceptions.append(exception)
479            f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}"
480        )
481        if self._non_breaking_exceptions:
482            # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the
483            # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete.
484            raise AirbyteTracedException(
485                message="",
486                internal_message="\n".join(
487                    [
488                        filter_secrets(exception.__repr__())
489                        for exception in self._non_breaking_exceptions
490                    ]
491                ),
492                failure_type=FailureType.config_error,
493            )

Creates and retrieves completed partitions. This method continuously starts jobs, updates job status, processes running partitions, logs polling partitions, and waits for status updates. It yields completed partitions as they become available.


An iterable of completed partitions, represented as AsyncPartition objects. Each partition is wrapped in an Optional, allowing for None values.

def fetch_records( self, async_jobs: Iterable[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> Iterable[Mapping[str, Any]]:
514    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
515        """
516        Fetches records from the given jobs.
518        Args:
519            async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
521        Yields:
522            Iterable[Mapping[str, Any]]: The fetched records from the jobs.
523        """
524        for job in async_jobs:
525            yield from self._job_repository.fetch_records(job)
526            self._job_repository.delete(job)

Fetches records from the given jobs.

  • async_jobs Iterable[AsyncJob]: The list of AsyncJobs.

Iterable[Mapping[str, Any]]: The fetched records from the jobs.