airbyte_cdk.sources.declarative.async_job.job_orchestrator
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3import logging 4import threading 5import time 6import traceback 7import uuid 8from datetime import timedelta 9from typing import ( 10 Any, 11 Generator, 12 Generic, 13 Iterable, 14 List, 15 Mapping, 16 Optional, 17 Set, 18 Tuple, 19 Type, 20 TypeVar, 21) 22 23from airbyte_cdk.logger import lazy_log 24from airbyte_cdk.models import FailureType 25from airbyte_cdk.sources.declarative.async_job.job import AsyncJob 26from airbyte_cdk.sources.declarative.async_job.job_tracker import ( 27 ConcurrentJobLimitReached, 28 JobTracker, 29) 30from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository 31from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus 32from airbyte_cdk.sources.message import MessageRepository 33from airbyte_cdk.sources.types import StreamSlice 34from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 35from airbyte_cdk.utils.traced_exception import AirbyteTracedException 36 37LOGGER = logging.getLogger("airbyte") 38_NO_TIMEOUT = timedelta.max 39_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} 40 41 42class AsyncPartition: 43 """ 44 This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs 45 """ 46 47 _DEFAULT_MAX_JOB_RETRY = 3 48 49 def __init__( 50 self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None 51 ) -> None: 52 self._attempts_per_job = {job: 1 for job in jobs} 53 self._stream_slice = stream_slice 54 self._job_max_retry = ( 55 job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY 56 ) 57 58 def has_reached_max_attempt(self) -> bool: 59 return any( 60 map( 61 lambda attempt_count: attempt_count >= self._job_max_retry, 62 self._attempts_per_job.values(), 63 ) 64 ) 65 66 def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: 67 current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) 68 if current_attempt_count is None: 69 raise ValueError("Could not find job to replace") 70 elif current_attempt_count >= self._job_max_retry: 71 raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") 72 73 new_attempt_count = current_attempt_count + 1 74 for job in new_jobs: 75 self._attempts_per_job[job] = new_attempt_count 76 77 def should_split(self, job: AsyncJob) -> bool: 78 """ 79 Not used right now but once we support job split, we should split based on the number of attempts 80 """ 81 return False 82 83 @property 84 def jobs(self) -> Iterable[AsyncJob]: 85 return self._attempts_per_job.keys() 86 87 @property 88 def stream_slice(self) -> StreamSlice: 89 return self._stream_slice 90 91 @property 92 def status(self) -> AsyncJobStatus: 93 """ 94 Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed. 95 """ 96 statuses = set(map(lambda job: job.status(), self.jobs)) 97 if statuses == {AsyncJobStatus.COMPLETED}: 98 return AsyncJobStatus.COMPLETED 99 elif AsyncJobStatus.FAILED in statuses: 100 return AsyncJobStatus.FAILED 101 elif AsyncJobStatus.TIMED_OUT in statuses: 102 return AsyncJobStatus.TIMED_OUT 103 else: 104 return AsyncJobStatus.RUNNING 105 106 def __repr__(self) -> str: 107 return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})" 108 109 def __json_serializable__(self) -> Any: 110 return self._stream_slice 111 112 113T = TypeVar("T") 114 115 116class LookaheadIterator(Generic[T]): 117 def __init__(self, iterable: Iterable[T]) -> None: 118 self._iterator = iter(iterable) 119 self._buffer: List[T] = [] 120 121 def __iter__(self) -> "LookaheadIterator[T]": 122 return self 123 124 def __next__(self) -> T: 125 if self._buffer: 126 return self._buffer.pop() 127 else: 128 return next(self._iterator) 129 130 def has_next(self) -> bool: 131 if self._buffer: 132 return True 133 134 try: 135 self._buffer = [next(self._iterator)] 136 except StopIteration: 137 return False 138 else: 139 return True 140 141 def add_at_the_beginning(self, item: T) -> None: 142 self._buffer = [item] + self._buffer 143 144 145class AsyncJobOrchestrator: 146 _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5 147 _KNOWN_JOB_STATUSES = { 148 AsyncJobStatus.COMPLETED, 149 AsyncJobStatus.FAILED, 150 AsyncJobStatus.RUNNING, 151 AsyncJobStatus.TIMED_OUT, 152 } 153 _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} 154 155 def __init__( 156 self, 157 job_repository: AsyncJobRepository, 158 slices: Iterable[StreamSlice], 159 job_tracker: JobTracker, 160 message_repository: MessageRepository, 161 exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), 162 has_bulk_parent: bool = False, 163 job_max_retry: Optional[int] = None, 164 ) -> None: 165 """ 166 If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` 167 needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation 168 where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of 169 "trying to start a parent job" and "ConcurrentJobLimitReached". 170 """ 171 if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES: 172 # this is to prevent developers updating the possible statuses without updating the logic of this class 173 raise ValueError( 174 "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES" 175 ) 176 177 self._job_repository: AsyncJobRepository = job_repository 178 self._slice_iterator = LookaheadIterator(slices) 179 self._running_partitions: List[AsyncPartition] = [] 180 self._job_tracker = job_tracker 181 self._message_repository = message_repository 182 self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) 183 self._has_bulk_parent = has_bulk_parent 184 self._job_max_retry = job_max_retry 185 186 self._non_breaking_exceptions: List[Exception] = [] 187 188 def _replace_failed_jobs(self, partition: AsyncPartition) -> None: 189 failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) 190 jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] 191 for job in jobs_to_replace: 192 new_job = self._start_job(job.job_parameters(), job.api_job_id()) 193 partition.replace_job(job, [new_job]) 194 195 def _start_jobs(self) -> None: 196 """ 197 Retry failed jobs and start jobs for each slice in the slice iterator. 198 This method iterates over the running jobs and slice iterator and starts a job for each slice. 199 The started jobs are added to the running partitions. 200 Returns: 201 None 202 203 However, the first iteration is for sendgrid which only has one job. 204 """ 205 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False 206 _slice = None 207 try: 208 for partition in self._running_partitions: 209 self._replace_failed_jobs(partition) 210 211 if ( 212 self._has_bulk_parent 213 and self._running_partitions 214 and self._slice_iterator.has_next() 215 ): 216 LOGGER.debug( 217 "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget" 218 ) 219 return 220 221 for _slice in self._slice_iterator: 222 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True 223 job = self._start_job(_slice) 224 self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry)) 225 if self._has_bulk_parent and self._slice_iterator.has_next(): 226 break 227 except ConcurrentJobLimitReached: 228 if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration: 229 # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator 230 self._slice_iterator.add_at_the_beginning(_slice) # type: ignore # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop 231 LOGGER.debug( 232 "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..." 233 ) 234 235 def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob: 236 if previous_job_id: 237 id_to_replace = previous_job_id 238 lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...") 239 else: 240 id_to_replace = self._job_tracker.try_to_get_intent() 241 242 try: 243 job = self._job_repository.start(_slice) 244 self._job_tracker.add_job(id_to_replace, job.api_job_id()) 245 return job 246 except Exception as exception: 247 LOGGER.warning(f"Exception has occurred during job creation: {exception}") 248 if self._is_breaking_exception(exception): 249 self._job_tracker.remove_job(id_to_replace) 250 raise exception 251 return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace) 252 253 def _keep_api_budget_with_failed_job( 254 self, _slice: StreamSlice, exception: Exception, intent: str 255 ) -> AsyncJob: 256 """ 257 We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job 258 as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for 259 retrying jobs that couldn't be started. 260 """ 261 LOGGER.warning( 262 f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}" 263 ) 264 traced_exception = ( 265 exception 266 if isinstance(exception, AirbyteTracedException) 267 else AirbyteTracedException.from_exception(exception) 268 ) 269 # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, 270 # we would keep the exceptions in-memory until we know that we have reached the max attempt. 271 self._message_repository.emit_message(traced_exception.as_airbyte_message()) 272 job = self._create_failed_job(_slice) 273 self._job_tracker.add_job(intent, job.api_job_id()) 274 return job 275 276 def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: 277 job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT) 278 job.update_status(AsyncJobStatus.FAILED) 279 return job 280 281 def _get_running_jobs(self) -> Set[AsyncJob]: 282 """ 283 Returns a set of running AsyncJob objects. 284 285 Returns: 286 Set[AsyncJob]: A set of AsyncJob objects that are currently running. 287 """ 288 return { 289 job 290 for partition in self._running_partitions 291 for job in partition.jobs 292 if job.status() == AsyncJobStatus.RUNNING 293 } 294 295 def _update_jobs_status(self) -> None: 296 """ 297 Update the status of all running jobs in the repository. 298 """ 299 running_jobs = self._get_running_jobs() 300 if running_jobs: 301 # update the status only if there are RUNNING jobs 302 self._job_repository.update_jobs_status(running_jobs) 303 304 def _wait_on_status_update(self) -> None: 305 """ 306 Waits for a specified amount of time between status updates. 307 308 309 This method is used to introduce a delay between status updates in order to avoid excessive polling. 310 The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`. 311 312 Returns: 313 None 314 """ 315 lazy_log( 316 LOGGER, 317 logging.DEBUG, 318 lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.", 319 ) 320 321 lazy_log( 322 LOGGER, 323 logging.DEBUG, 324 lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...", 325 ) 326 time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS) 327 328 def _process_completed_partition(self, partition: AsyncPartition) -> None: 329 """ 330 Process a completed partition. 331 Args: 332 partition (AsyncPartition): The completed partition to process. 333 """ 334 job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) 335 LOGGER.info( 336 f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}." 337 ) 338 339 # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs 340 # but won't be able to as all jobs slots are taken even though job is done. 341 for job in partition.jobs: 342 self._job_tracker.remove_job(job.api_job_id()) 343 344 def _process_running_partitions_and_yield_completed_ones( 345 self, 346 ) -> Generator[AsyncPartition, Any, None]: 347 """ 348 Process the running partitions. 349 350 Yields: 351 AsyncPartition: The processed partition. 352 353 Raises: 354 Any: Any exception raised during processing. 355 """ 356 current_running_partitions: List[AsyncPartition] = [] 357 for partition in self._running_partitions: 358 match partition.status: 359 case AsyncJobStatus.COMPLETED: 360 self._process_completed_partition(partition) 361 yield partition 362 case AsyncJobStatus.RUNNING: 363 current_running_partitions.append(partition) 364 case _ if partition.has_reached_max_attempt(): 365 self._stop_partition(partition) 366 self._process_partitions_with_errors(partition) 367 case _: 368 self._stop_timed_out_jobs(partition) 369 # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated 370 self._reallocate_partition(current_running_partitions, partition) 371 372 # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority 373 self._remove_completed_jobs(partition) 374 375 # update the referenced list with running partitions 376 self._running_partitions = current_running_partitions 377 378 def _stop_partition(self, partition: AsyncPartition) -> None: 379 for job in partition.jobs: 380 if job.status() in _API_SIDE_RUNNING_STATUS: 381 self._abort_job(job, free_job_allocation=True) 382 else: 383 self._job_tracker.remove_job(job.api_job_id()) 384 385 def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None: 386 for job in partition.jobs: 387 if job.status() == AsyncJobStatus.TIMED_OUT: 388 self._abort_job(job, free_job_allocation=False) 389 390 def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: 391 try: 392 self._job_repository.abort(job) 393 if free_job_allocation: 394 self._job_tracker.remove_job(job.api_job_id()) 395 except Exception as exception: 396 LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}") 397 398 def _remove_completed_jobs(self, partition: AsyncPartition) -> None: 399 """ 400 Remove completed or timed out jobs from the partition. 401 402 Args: 403 partition (AsyncPartition): The partition to process. 404 """ 405 for job in partition.jobs: 406 if job.status() == AsyncJobStatus.COMPLETED: 407 self._job_tracker.remove_job(job.api_job_id()) 408 409 def _reallocate_partition( 410 self, 411 current_running_partitions: List[AsyncPartition], 412 partition: AsyncPartition, 413 ) -> None: 414 """ 415 Reallocate the partition by starting a new job for each job in the 416 partition. 417 Args: 418 current_running_partitions (list): The list of currently running partitions. 419 partition (AsyncPartition): The partition to reallocate. 420 """ 421 current_running_partitions.insert(0, partition) 422 423 def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: 424 """ 425 Process a partition with status errors (FAILED and TIMEOUT). 426 427 Args: 428 partition (AsyncPartition): The partition to process. 429 Returns: 430 AirbyteTracedException: An exception indicating that at least one job could not be completed. 431 Raises: 432 AirbyteTracedException: If at least one job could not be completed. 433 """ 434 status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} 435 self._non_breaking_exceptions.append( 436 AirbyteTracedException( 437 internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", 438 failure_type=FailureType.config_error, 439 ) 440 ) 441 442 def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: 443 """ 444 Creates and retrieves completed partitions. 445 This method continuously starts jobs, updates job status, processes running partitions, 446 logs polling partitions, and waits for status updates. It yields completed partitions 447 as they become available. 448 449 Returns: 450 An iterable of completed partitions, represented as AsyncPartition objects. 451 Each partition is wrapped in an Optional, allowing for None values. 452 """ 453 while True: 454 try: 455 lazy_log( 456 LOGGER, 457 logging.DEBUG, 458 lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop", 459 ) 460 self._start_jobs() 461 if not self._slice_iterator.has_next() and not self._running_partitions: 462 break 463 464 self._update_jobs_status() 465 yield from self._process_running_partitions_and_yield_completed_ones() 466 self._wait_on_status_update() 467 except Exception as exception: 468 LOGGER.warning( 469 f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}" 470 ) 471 if self._is_breaking_exception(exception): 472 self._abort_all_running_jobs() 473 raise exception 474 475 self._non_breaking_exceptions.append(exception) 476 477 LOGGER.info( 478 f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}" 479 ) 480 if self._non_breaking_exceptions: 481 # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the 482 # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. 483 raise AirbyteTracedException( 484 message="", 485 internal_message="\n".join( 486 [ 487 filter_secrets(exception.__repr__()) 488 for exception in self._non_breaking_exceptions 489 ] 490 ), 491 failure_type=FailureType.config_error, 492 ) 493 494 def _handle_non_breaking_error(self, exception: Exception) -> None: 495 LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}") 496 self._non_breaking_exceptions.append(exception) 497 498 def _abort_all_running_jobs(self) -> None: 499 for partition in self._running_partitions: 500 for job in partition.jobs: 501 if job.status() in self._RUNNING_ON_API_SIDE_STATUS: 502 self._abort_job(job, free_job_allocation=True) 503 self._job_tracker.remove_job(job.api_job_id()) 504 505 self._running_partitions = [] 506 507 def _is_breaking_exception(self, exception: Exception) -> bool: 508 return isinstance(exception, self._exceptions_to_break_on) or ( 509 isinstance(exception, AirbyteTracedException) 510 and exception.failure_type == FailureType.config_error 511 ) 512 513 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 514 """ 515 Fetches records from the given jobs. 516 517 Args: 518 async_jobs Iterable[AsyncJob]: The list of AsyncJobs. 519 520 Yields: 521 Iterable[Mapping[str, Any]]: The fetched records from the jobs. 522 """ 523 for job in async_jobs: 524 yield from self._job_repository.fetch_records(job) 525 self._job_repository.delete(job)
43class AsyncPartition: 44 """ 45 This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs 46 """ 47 48 _DEFAULT_MAX_JOB_RETRY = 3 49 50 def __init__( 51 self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None 52 ) -> None: 53 self._attempts_per_job = {job: 1 for job in jobs} 54 self._stream_slice = stream_slice 55 self._job_max_retry = ( 56 job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY 57 ) 58 59 def has_reached_max_attempt(self) -> bool: 60 return any( 61 map( 62 lambda attempt_count: attempt_count >= self._job_max_retry, 63 self._attempts_per_job.values(), 64 ) 65 ) 66 67 def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: 68 current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) 69 if current_attempt_count is None: 70 raise ValueError("Could not find job to replace") 71 elif current_attempt_count >= self._job_max_retry: 72 raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") 73 74 new_attempt_count = current_attempt_count + 1 75 for job in new_jobs: 76 self._attempts_per_job[job] = new_attempt_count 77 78 def should_split(self, job: AsyncJob) -> bool: 79 """ 80 Not used right now but once we support job split, we should split based on the number of attempts 81 """ 82 return False 83 84 @property 85 def jobs(self) -> Iterable[AsyncJob]: 86 return self._attempts_per_job.keys() 87 88 @property 89 def stream_slice(self) -> StreamSlice: 90 return self._stream_slice 91 92 @property 93 def status(self) -> AsyncJobStatus: 94 """ 95 Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed. 96 """ 97 statuses = set(map(lambda job: job.status(), self.jobs)) 98 if statuses == {AsyncJobStatus.COMPLETED}: 99 return AsyncJobStatus.COMPLETED 100 elif AsyncJobStatus.FAILED in statuses: 101 return AsyncJobStatus.FAILED 102 elif AsyncJobStatus.TIMED_OUT in statuses: 103 return AsyncJobStatus.TIMED_OUT 104 else: 105 return AsyncJobStatus.RUNNING 106 107 def __repr__(self) -> str: 108 return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})" 109 110 def __json_serializable__(self) -> Any: 111 return self._stream_slice
This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
50 def __init__( 51 self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None 52 ) -> None: 53 self._attempts_per_job = {job: 1 for job in jobs} 54 self._stream_slice = stream_slice 55 self._job_max_retry = ( 56 job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY 57 )
67 def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: 68 current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) 69 if current_attempt_count is None: 70 raise ValueError("Could not find job to replace") 71 elif current_attempt_count >= self._job_max_retry: 72 raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") 73 74 new_attempt_count = current_attempt_count + 1 75 for job in new_jobs: 76 self._attempts_per_job[job] = new_attempt_count
78 def should_split(self, job: AsyncJob) -> bool: 79 """ 80 Not used right now but once we support job split, we should split based on the number of attempts 81 """ 82 return False
Not used right now but once we support job split, we should split based on the number of attempts
92 @property 93 def status(self) -> AsyncJobStatus: 94 """ 95 Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed. 96 """ 97 statuses = set(map(lambda job: job.status(), self.jobs)) 98 if statuses == {AsyncJobStatus.COMPLETED}: 99 return AsyncJobStatus.COMPLETED 100 elif AsyncJobStatus.FAILED in statuses: 101 return AsyncJobStatus.FAILED 102 elif AsyncJobStatus.TIMED_OUT in statuses: 103 return AsyncJobStatus.TIMED_OUT 104 else: 105 return AsyncJobStatus.RUNNING
Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed.
117class LookaheadIterator(Generic[T]): 118 def __init__(self, iterable: Iterable[T]) -> None: 119 self._iterator = iter(iterable) 120 self._buffer: List[T] = [] 121 122 def __iter__(self) -> "LookaheadIterator[T]": 123 return self 124 125 def __next__(self) -> T: 126 if self._buffer: 127 return self._buffer.pop() 128 else: 129 return next(self._iterator) 130 131 def has_next(self) -> bool: 132 if self._buffer: 133 return True 134 135 try: 136 self._buffer = [next(self._iterator)] 137 except StopIteration: 138 return False 139 else: 140 return True 141 142 def add_at_the_beginning(self, item: T) -> None: 143 self._buffer = [item] + self._buffer
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
146class AsyncJobOrchestrator: 147 _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5 148 _KNOWN_JOB_STATUSES = { 149 AsyncJobStatus.COMPLETED, 150 AsyncJobStatus.FAILED, 151 AsyncJobStatus.RUNNING, 152 AsyncJobStatus.TIMED_OUT, 153 } 154 _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} 155 156 def __init__( 157 self, 158 job_repository: AsyncJobRepository, 159 slices: Iterable[StreamSlice], 160 job_tracker: JobTracker, 161 message_repository: MessageRepository, 162 exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), 163 has_bulk_parent: bool = False, 164 job_max_retry: Optional[int] = None, 165 ) -> None: 166 """ 167 If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` 168 needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation 169 where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of 170 "trying to start a parent job" and "ConcurrentJobLimitReached". 171 """ 172 if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES: 173 # this is to prevent developers updating the possible statuses without updating the logic of this class 174 raise ValueError( 175 "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES" 176 ) 177 178 self._job_repository: AsyncJobRepository = job_repository 179 self._slice_iterator = LookaheadIterator(slices) 180 self._running_partitions: List[AsyncPartition] = [] 181 self._job_tracker = job_tracker 182 self._message_repository = message_repository 183 self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) 184 self._has_bulk_parent = has_bulk_parent 185 self._job_max_retry = job_max_retry 186 187 self._non_breaking_exceptions: List[Exception] = [] 188 189 def _replace_failed_jobs(self, partition: AsyncPartition) -> None: 190 failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) 191 jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] 192 for job in jobs_to_replace: 193 new_job = self._start_job(job.job_parameters(), job.api_job_id()) 194 partition.replace_job(job, [new_job]) 195 196 def _start_jobs(self) -> None: 197 """ 198 Retry failed jobs and start jobs for each slice in the slice iterator. 199 This method iterates over the running jobs and slice iterator and starts a job for each slice. 200 The started jobs are added to the running partitions. 201 Returns: 202 None 203 204 However, the first iteration is for sendgrid which only has one job. 205 """ 206 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False 207 _slice = None 208 try: 209 for partition in self._running_partitions: 210 self._replace_failed_jobs(partition) 211 212 if ( 213 self._has_bulk_parent 214 and self._running_partitions 215 and self._slice_iterator.has_next() 216 ): 217 LOGGER.debug( 218 "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget" 219 ) 220 return 221 222 for _slice in self._slice_iterator: 223 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True 224 job = self._start_job(_slice) 225 self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry)) 226 if self._has_bulk_parent and self._slice_iterator.has_next(): 227 break 228 except ConcurrentJobLimitReached: 229 if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration: 230 # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator 231 self._slice_iterator.add_at_the_beginning(_slice) # type: ignore # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop 232 LOGGER.debug( 233 "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..." 234 ) 235 236 def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob: 237 if previous_job_id: 238 id_to_replace = previous_job_id 239 lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...") 240 else: 241 id_to_replace = self._job_tracker.try_to_get_intent() 242 243 try: 244 job = self._job_repository.start(_slice) 245 self._job_tracker.add_job(id_to_replace, job.api_job_id()) 246 return job 247 except Exception as exception: 248 LOGGER.warning(f"Exception has occurred during job creation: {exception}") 249 if self._is_breaking_exception(exception): 250 self._job_tracker.remove_job(id_to_replace) 251 raise exception 252 return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace) 253 254 def _keep_api_budget_with_failed_job( 255 self, _slice: StreamSlice, exception: Exception, intent: str 256 ) -> AsyncJob: 257 """ 258 We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job 259 as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for 260 retrying jobs that couldn't be started. 261 """ 262 LOGGER.warning( 263 f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}" 264 ) 265 traced_exception = ( 266 exception 267 if isinstance(exception, AirbyteTracedException) 268 else AirbyteTracedException.from_exception(exception) 269 ) 270 # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, 271 # we would keep the exceptions in-memory until we know that we have reached the max attempt. 272 self._message_repository.emit_message(traced_exception.as_airbyte_message()) 273 job = self._create_failed_job(_slice) 274 self._job_tracker.add_job(intent, job.api_job_id()) 275 return job 276 277 def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: 278 job = AsyncJob(f"{uuid.uuid4()} - Job that could not start", stream_slice, _NO_TIMEOUT) 279 job.update_status(AsyncJobStatus.FAILED) 280 return job 281 282 def _get_running_jobs(self) -> Set[AsyncJob]: 283 """ 284 Returns a set of running AsyncJob objects. 285 286 Returns: 287 Set[AsyncJob]: A set of AsyncJob objects that are currently running. 288 """ 289 return { 290 job 291 for partition in self._running_partitions 292 for job in partition.jobs 293 if job.status() == AsyncJobStatus.RUNNING 294 } 295 296 def _update_jobs_status(self) -> None: 297 """ 298 Update the status of all running jobs in the repository. 299 """ 300 running_jobs = self._get_running_jobs() 301 if running_jobs: 302 # update the status only if there are RUNNING jobs 303 self._job_repository.update_jobs_status(running_jobs) 304 305 def _wait_on_status_update(self) -> None: 306 """ 307 Waits for a specified amount of time between status updates. 308 309 310 This method is used to introduce a delay between status updates in order to avoid excessive polling. 311 The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`. 312 313 Returns: 314 None 315 """ 316 lazy_log( 317 LOGGER, 318 logging.DEBUG, 319 lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.", 320 ) 321 322 lazy_log( 323 LOGGER, 324 logging.DEBUG, 325 lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...", 326 ) 327 time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS) 328 329 def _process_completed_partition(self, partition: AsyncPartition) -> None: 330 """ 331 Process a completed partition. 332 Args: 333 partition (AsyncPartition): The completed partition to process. 334 """ 335 job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) 336 LOGGER.info( 337 f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}." 338 ) 339 340 # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs 341 # but won't be able to as all jobs slots are taken even though job is done. 342 for job in partition.jobs: 343 self._job_tracker.remove_job(job.api_job_id()) 344 345 def _process_running_partitions_and_yield_completed_ones( 346 self, 347 ) -> Generator[AsyncPartition, Any, None]: 348 """ 349 Process the running partitions. 350 351 Yields: 352 AsyncPartition: The processed partition. 353 354 Raises: 355 Any: Any exception raised during processing. 356 """ 357 current_running_partitions: List[AsyncPartition] = [] 358 for partition in self._running_partitions: 359 match partition.status: 360 case AsyncJobStatus.COMPLETED: 361 self._process_completed_partition(partition) 362 yield partition 363 case AsyncJobStatus.RUNNING: 364 current_running_partitions.append(partition) 365 case _ if partition.has_reached_max_attempt(): 366 self._stop_partition(partition) 367 self._process_partitions_with_errors(partition) 368 case _: 369 self._stop_timed_out_jobs(partition) 370 # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated 371 self._reallocate_partition(current_running_partitions, partition) 372 373 # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority 374 self._remove_completed_jobs(partition) 375 376 # update the referenced list with running partitions 377 self._running_partitions = current_running_partitions 378 379 def _stop_partition(self, partition: AsyncPartition) -> None: 380 for job in partition.jobs: 381 if job.status() in _API_SIDE_RUNNING_STATUS: 382 self._abort_job(job, free_job_allocation=True) 383 else: 384 self._job_tracker.remove_job(job.api_job_id()) 385 386 def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None: 387 for job in partition.jobs: 388 if job.status() == AsyncJobStatus.TIMED_OUT: 389 self._abort_job(job, free_job_allocation=False) 390 391 def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: 392 try: 393 self._job_repository.abort(job) 394 if free_job_allocation: 395 self._job_tracker.remove_job(job.api_job_id()) 396 except Exception as exception: 397 LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}") 398 399 def _remove_completed_jobs(self, partition: AsyncPartition) -> None: 400 """ 401 Remove completed or timed out jobs from the partition. 402 403 Args: 404 partition (AsyncPartition): The partition to process. 405 """ 406 for job in partition.jobs: 407 if job.status() == AsyncJobStatus.COMPLETED: 408 self._job_tracker.remove_job(job.api_job_id()) 409 410 def _reallocate_partition( 411 self, 412 current_running_partitions: List[AsyncPartition], 413 partition: AsyncPartition, 414 ) -> None: 415 """ 416 Reallocate the partition by starting a new job for each job in the 417 partition. 418 Args: 419 current_running_partitions (list): The list of currently running partitions. 420 partition (AsyncPartition): The partition to reallocate. 421 """ 422 current_running_partitions.insert(0, partition) 423 424 def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: 425 """ 426 Process a partition with status errors (FAILED and TIMEOUT). 427 428 Args: 429 partition (AsyncPartition): The partition to process. 430 Returns: 431 AirbyteTracedException: An exception indicating that at least one job could not be completed. 432 Raises: 433 AirbyteTracedException: If at least one job could not be completed. 434 """ 435 status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} 436 self._non_breaking_exceptions.append( 437 AirbyteTracedException( 438 internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", 439 failure_type=FailureType.config_error, 440 ) 441 ) 442 443 def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: 444 """ 445 Creates and retrieves completed partitions. 446 This method continuously starts jobs, updates job status, processes running partitions, 447 logs polling partitions, and waits for status updates. It yields completed partitions 448 as they become available. 449 450 Returns: 451 An iterable of completed partitions, represented as AsyncPartition objects. 452 Each partition is wrapped in an Optional, allowing for None values. 453 """ 454 while True: 455 try: 456 lazy_log( 457 LOGGER, 458 logging.DEBUG, 459 lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop", 460 ) 461 self._start_jobs() 462 if not self._slice_iterator.has_next() and not self._running_partitions: 463 break 464 465 self._update_jobs_status() 466 yield from self._process_running_partitions_and_yield_completed_ones() 467 self._wait_on_status_update() 468 except Exception as exception: 469 LOGGER.warning( 470 f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}" 471 ) 472 if self._is_breaking_exception(exception): 473 self._abort_all_running_jobs() 474 raise exception 475 476 self._non_breaking_exceptions.append(exception) 477 478 LOGGER.info( 479 f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}" 480 ) 481 if self._non_breaking_exceptions: 482 # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the 483 # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. 484 raise AirbyteTracedException( 485 message="", 486 internal_message="\n".join( 487 [ 488 filter_secrets(exception.__repr__()) 489 for exception in self._non_breaking_exceptions 490 ] 491 ), 492 failure_type=FailureType.config_error, 493 ) 494 495 def _handle_non_breaking_error(self, exception: Exception) -> None: 496 LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}") 497 self._non_breaking_exceptions.append(exception) 498 499 def _abort_all_running_jobs(self) -> None: 500 for partition in self._running_partitions: 501 for job in partition.jobs: 502 if job.status() in self._RUNNING_ON_API_SIDE_STATUS: 503 self._abort_job(job, free_job_allocation=True) 504 self._job_tracker.remove_job(job.api_job_id()) 505 506 self._running_partitions = [] 507 508 def _is_breaking_exception(self, exception: Exception) -> bool: 509 return isinstance(exception, self._exceptions_to_break_on) or ( 510 isinstance(exception, AirbyteTracedException) 511 and exception.failure_type == FailureType.config_error 512 ) 513 514 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 515 """ 516 Fetches records from the given jobs. 517 518 Args: 519 async_jobs Iterable[AsyncJob]: The list of AsyncJobs. 520 521 Yields: 522 Iterable[Mapping[str, Any]]: The fetched records from the jobs. 523 """ 524 for job in async_jobs: 525 yield from self._job_repository.fetch_records(job) 526 self._job_repository.delete(job)
156 def __init__( 157 self, 158 job_repository: AsyncJobRepository, 159 slices: Iterable[StreamSlice], 160 job_tracker: JobTracker, 161 message_repository: MessageRepository, 162 exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), 163 has_bulk_parent: bool = False, 164 job_max_retry: Optional[int] = None, 165 ) -> None: 166 """ 167 If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` 168 needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation 169 where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of 170 "trying to start a parent job" and "ConcurrentJobLimitReached". 171 """ 172 if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES: 173 # this is to prevent developers updating the possible statuses without updating the logic of this class 174 raise ValueError( 175 "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES" 176 ) 177 178 self._job_repository: AsyncJobRepository = job_repository 179 self._slice_iterator = LookaheadIterator(slices) 180 self._running_partitions: List[AsyncPartition] = [] 181 self._job_tracker = job_tracker 182 self._message_repository = message_repository 183 self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) 184 self._has_bulk_parent = has_bulk_parent 185 self._job_max_retry = job_max_retry 186 187 self._non_breaking_exceptions: List[Exception] = []
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, has_bulk_parent
needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
"trying to start a parent job" and "ConcurrentJobLimitReached".
443 def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: 444 """ 445 Creates and retrieves completed partitions. 446 This method continuously starts jobs, updates job status, processes running partitions, 447 logs polling partitions, and waits for status updates. It yields completed partitions 448 as they become available. 449 450 Returns: 451 An iterable of completed partitions, represented as AsyncPartition objects. 452 Each partition is wrapped in an Optional, allowing for None values. 453 """ 454 while True: 455 try: 456 lazy_log( 457 LOGGER, 458 logging.DEBUG, 459 lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop", 460 ) 461 self._start_jobs() 462 if not self._slice_iterator.has_next() and not self._running_partitions: 463 break 464 465 self._update_jobs_status() 466 yield from self._process_running_partitions_and_yield_completed_ones() 467 self._wait_on_status_update() 468 except Exception as exception: 469 LOGGER.warning( 470 f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}" 471 ) 472 if self._is_breaking_exception(exception): 473 self._abort_all_running_jobs() 474 raise exception 475 476 self._non_breaking_exceptions.append(exception) 477 478 LOGGER.info( 479 f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}" 480 ) 481 if self._non_breaking_exceptions: 482 # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the 483 # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. 484 raise AirbyteTracedException( 485 message="", 486 internal_message="\n".join( 487 [ 488 filter_secrets(exception.__repr__()) 489 for exception in self._non_breaking_exceptions 490 ] 491 ), 492 failure_type=FailureType.config_error, 493 )
Creates and retrieves completed partitions. This method continuously starts jobs, updates job status, processes running partitions, logs polling partitions, and waits for status updates. It yields completed partitions as they become available.
Returns:
An iterable of completed partitions, represented as AsyncPartition objects. Each partition is wrapped in an Optional, allowing for None values.
514 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 515 """ 516 Fetches records from the given jobs. 517 518 Args: 519 async_jobs Iterable[AsyncJob]: The list of AsyncJobs. 520 521 Yields: 522 Iterable[Mapping[str, Any]]: The fetched records from the jobs. 523 """ 524 for job in async_jobs: 525 yield from self._job_repository.fetch_records(job) 526 self._job_repository.delete(job)
Fetches records from the given jobs.
Arguments:
- async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
Yields:
Iterable[Mapping[str, Any]]: The fetched records from the jobs.