airbyte_cdk.sources.declarative.async_job.job_orchestrator
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3import logging 4import threading 5import time 6import traceback 7import uuid 8from datetime import datetime, timedelta, timezone 9from typing import ( 10 Any, 11 Generator, 12 Generic, 13 Iterable, 14 List, 15 Mapping, 16 Optional, 17 Set, 18 Tuple, 19 Type, 20 TypeVar, 21) 22 23from airbyte_cdk.logger import lazy_log 24from airbyte_cdk.models import FailureType 25from airbyte_cdk.sources.declarative.async_job.job import AsyncJob 26from airbyte_cdk.sources.declarative.async_job.job_tracker import ( 27 ConcurrentJobLimitReached, 28 JobTracker, 29) 30from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository 31from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus 32from airbyte_cdk.sources.message import MessageRepository 33from airbyte_cdk.sources.types import StreamSlice 34from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 35from airbyte_cdk.utils.traced_exception import AirbyteTracedException 36 37LOGGER = logging.getLogger("airbyte") 38_NO_TIMEOUT = timedelta.max 39_API_SIDE_RUNNING_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} 40 41 42class AsyncPartition: 43 """ 44 This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs 45 """ 46 47 _DEFAULT_MAX_JOB_RETRY = 3 48 49 def __init__( 50 self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None 51 ) -> None: 52 self._attempts_per_job = {job: 1 for job in jobs} 53 self._stream_slice = stream_slice 54 self._job_max_retry = ( 55 job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY 56 ) 57 58 def has_reached_max_attempt(self) -> bool: 59 return any( 60 map( 61 lambda attempt_count: attempt_count >= self._job_max_retry, 62 self._attempts_per_job.values(), 63 ) 64 ) 65 66 def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: 67 current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) 68 if current_attempt_count is None: 69 raise ValueError("Could not find job to replace") 70 elif current_attempt_count >= self._job_max_retry: 71 raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") 72 73 new_attempt_count = current_attempt_count + 1 74 for job in new_jobs: 75 self._attempts_per_job[job] = new_attempt_count 76 77 def should_split(self, job: AsyncJob) -> bool: 78 """ 79 Not used right now but once we support job split, we should split based on the number of attempts 80 """ 81 return False 82 83 @property 84 def jobs(self) -> Iterable[AsyncJob]: 85 return self._attempts_per_job.keys() 86 87 @property 88 def stream_slice(self) -> StreamSlice: 89 return self._stream_slice 90 91 @property 92 def status(self) -> AsyncJobStatus: 93 """ 94 Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed 95 or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED). 96 """ 97 statuses = set(map(lambda job: job.status(), self.jobs)) 98 if statuses == {AsyncJobStatus.COMPLETED}: 99 return AsyncJobStatus.COMPLETED 100 elif statuses == {AsyncJobStatus.SKIPPED}: 101 return AsyncJobStatus.SKIPPED 102 elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}: 103 # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs 104 return AsyncJobStatus.COMPLETED 105 elif AsyncJobStatus.FAILED in statuses: 106 return AsyncJobStatus.FAILED 107 elif AsyncJobStatus.TIMED_OUT in statuses: 108 return AsyncJobStatus.TIMED_OUT 109 else: 110 return AsyncJobStatus.RUNNING 111 112 def __repr__(self) -> str: 113 return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})" 114 115 def __json_serializable__(self) -> Any: 116 return self._stream_slice 117 118 119T = TypeVar("T") 120 121 122class LookaheadIterator(Generic[T]): 123 def __init__(self, iterable: Iterable[T]) -> None: 124 self._iterator = iter(iterable) 125 self._buffer: List[T] = [] 126 127 def __iter__(self) -> "LookaheadIterator[T]": 128 return self 129 130 def __next__(self) -> T: 131 if self._buffer: 132 return self._buffer.pop() 133 else: 134 return next(self._iterator) 135 136 def has_next(self) -> bool: 137 if self._buffer: 138 return True 139 140 try: 141 self._buffer = [next(self._iterator)] 142 except StopIteration: 143 return False 144 else: 145 return True 146 147 def add_at_the_beginning(self, item: T) -> None: 148 self._buffer = [item] + self._buffer 149 150 151class AsyncJobOrchestrator: 152 _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5 153 _KNOWN_JOB_STATUSES = { 154 AsyncJobStatus.COMPLETED, 155 AsyncJobStatus.FAILED, 156 AsyncJobStatus.RUNNING, 157 AsyncJobStatus.TIMED_OUT, 158 AsyncJobStatus.SKIPPED, 159 } 160 _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} 161 162 def __init__( 163 self, 164 job_repository: AsyncJobRepository, 165 slices: Iterable[StreamSlice], 166 job_tracker: JobTracker, 167 message_repository: MessageRepository, 168 exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), 169 has_bulk_parent: bool = False, 170 job_max_retry: Optional[int] = None, 171 failed_retry_wait_time_in_seconds: Optional[int] = None, 172 ) -> None: 173 """ 174 If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` 175 needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation 176 where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of 177 "trying to start a parent job" and "ConcurrentJobLimitReached". 178 """ 179 if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES: 180 # this is to prevent developers updating the possible statuses without updating the logic of this class 181 raise ValueError( 182 "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES" 183 ) 184 185 if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0: 186 raise ValueError("failed_retry_wait_time_in_seconds must be >= 1") 187 188 self._job_repository: AsyncJobRepository = job_repository 189 self._slice_iterator = LookaheadIterator(slices) 190 self._running_partitions: List[AsyncPartition] = [] 191 self._job_tracker = job_tracker 192 self._message_repository = message_repository 193 self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) 194 self._has_bulk_parent = has_bulk_parent 195 self._job_max_retry = job_max_retry 196 self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds 197 198 self._non_breaking_exceptions: List[Exception] = [] 199 200 def _replace_failed_jobs(self, partition: AsyncPartition) -> None: 201 failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) 202 jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] 203 for job in jobs_to_replace: 204 if ( 205 self._failed_retry_wait_time_in_seconds is not None 206 and job.status() == AsyncJobStatus.FAILED 207 and not job.is_creation_failure() 208 ): 209 if not job.ready_to_retry(): 210 lazy_log( 211 LOGGER, 212 logging.DEBUG, 213 lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.", 214 ) 215 continue 216 if not job.retry_deferred(): 217 job.set_retry_after( 218 datetime.now(tz=timezone.utc) 219 + timedelta(seconds=self._failed_retry_wait_time_in_seconds) 220 ) 221 lazy_log( 222 LOGGER, 223 logging.INFO, 224 lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.", 225 ) 226 continue 227 new_job = self._start_job(job.job_parameters(), job.api_job_id()) 228 partition.replace_job(job, [new_job]) 229 230 def _start_jobs(self) -> None: 231 """ 232 Retry failed jobs and start jobs for each slice in the slice iterator. 233 This method iterates over the running jobs and slice iterator and starts a job for each slice. 234 The started jobs are added to the running partitions. 235 Returns: 236 None 237 238 However, the first iteration is for sendgrid which only has one job. 239 """ 240 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False 241 _slice = None 242 try: 243 for partition in self._running_partitions: 244 self._replace_failed_jobs(partition) 245 246 if ( 247 self._has_bulk_parent 248 and self._running_partitions 249 and self._slice_iterator.has_next() 250 ): 251 LOGGER.debug( 252 "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget" 253 ) 254 return 255 256 for _slice in self._slice_iterator: 257 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True 258 job = self._start_job(_slice) 259 self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry)) 260 if self._has_bulk_parent and self._slice_iterator.has_next(): 261 break 262 except ConcurrentJobLimitReached: 263 if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration: 264 # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator 265 self._slice_iterator.add_at_the_beginning(_slice) # type: ignore # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop 266 LOGGER.debug( 267 "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..." 268 ) 269 270 def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob: 271 if previous_job_id: 272 id_to_replace = previous_job_id 273 lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...") 274 else: 275 id_to_replace = self._job_tracker.try_to_get_intent() 276 277 try: 278 job = self._job_repository.start(_slice) 279 self._job_tracker.add_job(id_to_replace, job.api_job_id()) 280 return job 281 except Exception as exception: 282 LOGGER.warning(f"Exception has occurred during job creation: {exception}") 283 if self._is_breaking_exception(exception): 284 self._job_tracker.remove_job(id_to_replace) 285 raise exception 286 return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace) 287 288 def _keep_api_budget_with_failed_job( 289 self, _slice: StreamSlice, exception: Exception, intent: str 290 ) -> AsyncJob: 291 """ 292 We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job 293 as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for 294 retrying jobs that couldn't be started. 295 """ 296 LOGGER.warning( 297 f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}" 298 ) 299 traced_exception = ( 300 exception 301 if isinstance(exception, AirbyteTracedException) 302 else AirbyteTracedException.from_exception(exception) 303 ) 304 # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, 305 # we would keep the exceptions in-memory until we know that we have reached the max attempt. 306 self._message_repository.emit_message(traced_exception.as_airbyte_message()) 307 job = self._create_failed_job(_slice) 308 self._job_tracker.add_job(intent, job.api_job_id()) 309 return job 310 311 def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: 312 job = AsyncJob( 313 f"{uuid.uuid4()} - Job that could not start", 314 stream_slice, 315 _NO_TIMEOUT, 316 is_creation_failure=True, 317 ) 318 job.update_status(AsyncJobStatus.FAILED) 319 return job 320 321 def _get_running_jobs(self) -> Set[AsyncJob]: 322 """ 323 Returns a set of running AsyncJob objects. 324 325 Returns: 326 Set[AsyncJob]: A set of AsyncJob objects that are currently running. 327 """ 328 return { 329 job 330 for partition in self._running_partitions 331 for job in partition.jobs 332 if job.status() == AsyncJobStatus.RUNNING 333 } 334 335 def _update_jobs_status(self) -> None: 336 """ 337 Update the status of all running jobs in the repository. 338 """ 339 running_jobs = self._get_running_jobs() 340 if running_jobs: 341 # update the status only if there are RUNNING jobs 342 self._job_repository.update_jobs_status(running_jobs) 343 344 def _wait_on_status_update(self) -> None: 345 """ 346 Waits for a specified amount of time between status updates. 347 348 349 This method is used to introduce a delay between status updates in order to avoid excessive polling. 350 The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`. 351 352 Returns: 353 None 354 """ 355 lazy_log( 356 LOGGER, 357 logging.DEBUG, 358 lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.", 359 ) 360 361 lazy_log( 362 LOGGER, 363 logging.DEBUG, 364 lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...", 365 ) 366 time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS) 367 368 def _process_completed_partition(self, partition: AsyncPartition) -> None: 369 """ 370 Process a completed partition. 371 Args: 372 partition (AsyncPartition): The completed partition to process. 373 """ 374 job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) 375 LOGGER.info( 376 f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}." 377 ) 378 379 # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs 380 # but won't be able to as all jobs slots are taken even though job is done. 381 for job in partition.jobs: 382 self._job_tracker.remove_job(job.api_job_id()) 383 384 def _process_skipped_partition(self, partition: AsyncPartition) -> None: 385 """ 386 Process a skipped partition. The API indicated there is no data to return for this job 387 (e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job 388 allocation without fetching any records or raising errors. 389 """ 390 job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) 391 LOGGER.info( 392 f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}." 393 ) 394 for job in partition.jobs: 395 self._job_tracker.remove_job(job.api_job_id()) 396 397 def _process_running_partitions_and_yield_completed_ones( 398 self, 399 ) -> Generator[AsyncPartition, Any, None]: 400 """ 401 Process the running partitions. 402 403 Yields: 404 AsyncPartition: The processed partition. 405 406 Raises: 407 Any: Any exception raised during processing. 408 """ 409 current_running_partitions: List[AsyncPartition] = [] 410 for partition in self._running_partitions: 411 match partition.status: 412 case AsyncJobStatus.COMPLETED: 413 self._process_completed_partition(partition) 414 yield partition 415 case AsyncJobStatus.SKIPPED: 416 self._process_skipped_partition(partition) 417 case AsyncJobStatus.RUNNING: 418 current_running_partitions.append(partition) 419 case _ if partition.has_reached_max_attempt(): 420 self._stop_partition(partition) 421 self._process_partitions_with_errors(partition) 422 case _: 423 self._stop_timed_out_jobs(partition) 424 # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated 425 self._reallocate_partition(current_running_partitions, partition) 426 427 # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority 428 self._remove_completed_jobs(partition) 429 430 # update the referenced list with running partitions 431 self._running_partitions = current_running_partitions 432 433 def _stop_partition(self, partition: AsyncPartition) -> None: 434 for job in partition.jobs: 435 if job.status() in _API_SIDE_RUNNING_STATUS: 436 self._abort_job(job, free_job_allocation=True) 437 else: 438 self._job_tracker.remove_job(job.api_job_id()) 439 440 def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None: 441 for job in partition.jobs: 442 if job.status() == AsyncJobStatus.TIMED_OUT: 443 self._abort_job(job, free_job_allocation=False) 444 445 def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: 446 try: 447 self._job_repository.abort(job) 448 if free_job_allocation: 449 self._job_tracker.remove_job(job.api_job_id()) 450 except Exception as exception: 451 LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}") 452 453 def _remove_completed_jobs(self, partition: AsyncPartition) -> None: 454 """ 455 Remove completed or timed out jobs from the partition. 456 457 Args: 458 partition (AsyncPartition): The partition to process. 459 """ 460 for job in partition.jobs: 461 if job.status() == AsyncJobStatus.COMPLETED: 462 self._job_tracker.remove_job(job.api_job_id()) 463 464 def _reallocate_partition( 465 self, 466 current_running_partitions: List[AsyncPartition], 467 partition: AsyncPartition, 468 ) -> None: 469 """ 470 Reallocate the partition by starting a new job for each job in the 471 partition. 472 Args: 473 current_running_partitions (list): The list of currently running partitions. 474 partition (AsyncPartition): The partition to reallocate. 475 """ 476 current_running_partitions.insert(0, partition) 477 478 def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: 479 """ 480 Process a partition with status errors (FAILED and TIMEOUT). 481 482 Args: 483 partition (AsyncPartition): The partition to process. 484 Returns: 485 AirbyteTracedException: An exception indicating that at least one job could not be completed. 486 Raises: 487 AirbyteTracedException: If at least one job could not be completed. 488 """ 489 status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} 490 self._non_breaking_exceptions.append( 491 AirbyteTracedException( 492 message="Async job failed after exhausting all retry attempts.", 493 internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", 494 failure_type=FailureType.system_error, 495 ) 496 ) 497 498 def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: 499 """ 500 Creates and retrieves completed partitions. 501 This method continuously starts jobs, updates job status, processes running partitions, 502 logs polling partitions, and waits for status updates. It yields completed partitions 503 as they become available. 504 505 Returns: 506 An iterable of completed partitions, represented as AsyncPartition objects. 507 Each partition is wrapped in an Optional, allowing for None values. 508 """ 509 while True: 510 try: 511 lazy_log( 512 LOGGER, 513 logging.DEBUG, 514 lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop", 515 ) 516 self._start_jobs() 517 if not self._slice_iterator.has_next() and not self._running_partitions: 518 break 519 520 self._update_jobs_status() 521 yield from self._process_running_partitions_and_yield_completed_ones() 522 self._wait_on_status_update() 523 except Exception as exception: 524 LOGGER.warning( 525 f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}" 526 ) 527 if self._is_breaking_exception(exception): 528 self._abort_all_running_jobs() 529 raise exception 530 531 self._non_breaking_exceptions.append(exception) 532 533 LOGGER.info( 534 f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}" 535 ) 536 if self._non_breaking_exceptions: 537 # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the 538 # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. 539 raise AirbyteTracedException( 540 message="One or more async jobs failed after exhausting all retry attempts.", 541 internal_message="\n".join( 542 [ 543 filter_secrets(exception.__repr__()) 544 for exception in self._non_breaking_exceptions 545 ] 546 ), 547 failure_type=FailureType.system_error, 548 ) 549 550 def _handle_non_breaking_error(self, exception: Exception) -> None: 551 LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}") 552 self._non_breaking_exceptions.append(exception) 553 554 def _abort_all_running_jobs(self) -> None: 555 for partition in self._running_partitions: 556 for job in partition.jobs: 557 if job.status() in self._RUNNING_ON_API_SIDE_STATUS: 558 self._abort_job(job, free_job_allocation=True) 559 self._job_tracker.remove_job(job.api_job_id()) 560 561 self._running_partitions = [] 562 563 def _is_breaking_exception(self, exception: Exception) -> bool: 564 return isinstance(exception, self._exceptions_to_break_on) or ( 565 isinstance(exception, AirbyteTracedException) 566 and exception.failure_type == FailureType.config_error 567 ) 568 569 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 570 """ 571 Fetches records from the given jobs. 572 573 Args: 574 async_jobs Iterable[AsyncJob]: The list of AsyncJobs. 575 576 Yields: 577 Iterable[Mapping[str, Any]]: The fetched records from the jobs. 578 """ 579 for job in async_jobs: 580 yield from self._job_repository.fetch_records(job) 581 self._job_repository.delete(job)
43class AsyncPartition: 44 """ 45 This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs 46 """ 47 48 _DEFAULT_MAX_JOB_RETRY = 3 49 50 def __init__( 51 self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None 52 ) -> None: 53 self._attempts_per_job = {job: 1 for job in jobs} 54 self._stream_slice = stream_slice 55 self._job_max_retry = ( 56 job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY 57 ) 58 59 def has_reached_max_attempt(self) -> bool: 60 return any( 61 map( 62 lambda attempt_count: attempt_count >= self._job_max_retry, 63 self._attempts_per_job.values(), 64 ) 65 ) 66 67 def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: 68 current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) 69 if current_attempt_count is None: 70 raise ValueError("Could not find job to replace") 71 elif current_attempt_count >= self._job_max_retry: 72 raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") 73 74 new_attempt_count = current_attempt_count + 1 75 for job in new_jobs: 76 self._attempts_per_job[job] = new_attempt_count 77 78 def should_split(self, job: AsyncJob) -> bool: 79 """ 80 Not used right now but once we support job split, we should split based on the number of attempts 81 """ 82 return False 83 84 @property 85 def jobs(self) -> Iterable[AsyncJob]: 86 return self._attempts_per_job.keys() 87 88 @property 89 def stream_slice(self) -> StreamSlice: 90 return self._stream_slice 91 92 @property 93 def status(self) -> AsyncJobStatus: 94 """ 95 Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed 96 or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED). 97 """ 98 statuses = set(map(lambda job: job.status(), self.jobs)) 99 if statuses == {AsyncJobStatus.COMPLETED}: 100 return AsyncJobStatus.COMPLETED 101 elif statuses == {AsyncJobStatus.SKIPPED}: 102 return AsyncJobStatus.SKIPPED 103 elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}: 104 # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs 105 return AsyncJobStatus.COMPLETED 106 elif AsyncJobStatus.FAILED in statuses: 107 return AsyncJobStatus.FAILED 108 elif AsyncJobStatus.TIMED_OUT in statuses: 109 return AsyncJobStatus.TIMED_OUT 110 else: 111 return AsyncJobStatus.RUNNING 112 113 def __repr__(self) -> str: 114 return f"AsyncPartition(stream_slice={self._stream_slice}, attempt_per_job={self._attempts_per_job})" 115 116 def __json_serializable__(self) -> Any: 117 return self._stream_slice
This bucket of api_jobs is a bit useless for this iteration but should become interesting when we will be able to split jobs
50 def __init__( 51 self, jobs: List[AsyncJob], stream_slice: StreamSlice, job_max_retry: Optional[int] = None 52 ) -> None: 53 self._attempts_per_job = {job: 1 for job in jobs} 54 self._stream_slice = stream_slice 55 self._job_max_retry = ( 56 job_max_retry if job_max_retry is not None else self._DEFAULT_MAX_JOB_RETRY 57 )
67 def replace_job(self, job_to_replace: AsyncJob, new_jobs: List[AsyncJob]) -> None: 68 current_attempt_count = self._attempts_per_job.pop(job_to_replace, None) 69 if current_attempt_count is None: 70 raise ValueError("Could not find job to replace") 71 elif current_attempt_count >= self._job_max_retry: 72 raise ValueError(f"Max attempt reached for job in partition {self._stream_slice}") 73 74 new_attempt_count = current_attempt_count + 1 75 for job in new_jobs: 76 self._attempts_per_job[job] = new_attempt_count
78 def should_split(self, job: AsyncJob) -> bool: 79 """ 80 Not used right now but once we support job split, we should split based on the number of attempts 81 """ 82 return False
Not used right now but once we support job split, we should split based on the number of attempts
92 @property 93 def status(self) -> AsyncJobStatus: 94 """ 95 Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed 96 or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED). 97 """ 98 statuses = set(map(lambda job: job.status(), self.jobs)) 99 if statuses == {AsyncJobStatus.COMPLETED}: 100 return AsyncJobStatus.COMPLETED 101 elif statuses == {AsyncJobStatus.SKIPPED}: 102 return AsyncJobStatus.SKIPPED 103 elif statuses <= {AsyncJobStatus.COMPLETED, AsyncJobStatus.SKIPPED}: 104 # Mix of completed and skipped — treat as completed so records are fetched for the completed jobs 105 return AsyncJobStatus.COMPLETED 106 elif AsyncJobStatus.FAILED in statuses: 107 return AsyncJobStatus.FAILED 108 elif AsyncJobStatus.TIMED_OUT in statuses: 109 return AsyncJobStatus.TIMED_OUT 110 else: 111 return AsyncJobStatus.RUNNING
Given different job statuses, the priority is: FAILED, TIMED_OUT, RUNNING. Else, it means everything is completed or skipped. A partition is SKIPPED only when all jobs are SKIPPED (or a mix of COMPLETED and SKIPPED).
123class LookaheadIterator(Generic[T]): 124 def __init__(self, iterable: Iterable[T]) -> None: 125 self._iterator = iter(iterable) 126 self._buffer: List[T] = [] 127 128 def __iter__(self) -> "LookaheadIterator[T]": 129 return self 130 131 def __next__(self) -> T: 132 if self._buffer: 133 return self._buffer.pop() 134 else: 135 return next(self._iterator) 136 137 def has_next(self) -> bool: 138 if self._buffer: 139 return True 140 141 try: 142 self._buffer = [next(self._iterator)] 143 except StopIteration: 144 return False 145 else: 146 return True 147 148 def add_at_the_beginning(self, item: T) -> None: 149 self._buffer = [item] + self._buffer
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
152class AsyncJobOrchestrator: 153 _WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS = 5 154 _KNOWN_JOB_STATUSES = { 155 AsyncJobStatus.COMPLETED, 156 AsyncJobStatus.FAILED, 157 AsyncJobStatus.RUNNING, 158 AsyncJobStatus.TIMED_OUT, 159 AsyncJobStatus.SKIPPED, 160 } 161 _RUNNING_ON_API_SIDE_STATUS = {AsyncJobStatus.RUNNING, AsyncJobStatus.TIMED_OUT} 162 163 def __init__( 164 self, 165 job_repository: AsyncJobRepository, 166 slices: Iterable[StreamSlice], 167 job_tracker: JobTracker, 168 message_repository: MessageRepository, 169 exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), 170 has_bulk_parent: bool = False, 171 job_max_retry: Optional[int] = None, 172 failed_retry_wait_time_in_seconds: Optional[int] = None, 173 ) -> None: 174 """ 175 If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` 176 needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation 177 where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of 178 "trying to start a parent job" and "ConcurrentJobLimitReached". 179 """ 180 if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES: 181 # this is to prevent developers updating the possible statuses without updating the logic of this class 182 raise ValueError( 183 "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES" 184 ) 185 186 if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0: 187 raise ValueError("failed_retry_wait_time_in_seconds must be >= 1") 188 189 self._job_repository: AsyncJobRepository = job_repository 190 self._slice_iterator = LookaheadIterator(slices) 191 self._running_partitions: List[AsyncPartition] = [] 192 self._job_tracker = job_tracker 193 self._message_repository = message_repository 194 self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) 195 self._has_bulk_parent = has_bulk_parent 196 self._job_max_retry = job_max_retry 197 self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds 198 199 self._non_breaking_exceptions: List[Exception] = [] 200 201 def _replace_failed_jobs(self, partition: AsyncPartition) -> None: 202 failed_status_jobs = (AsyncJobStatus.FAILED, AsyncJobStatus.TIMED_OUT) 203 jobs_to_replace = [job for job in partition.jobs if job.status() in failed_status_jobs] 204 for job in jobs_to_replace: 205 if ( 206 self._failed_retry_wait_time_in_seconds is not None 207 and job.status() == AsyncJobStatus.FAILED 208 and not job.is_creation_failure() 209 ): 210 if not job.ready_to_retry(): 211 lazy_log( 212 LOGGER, 213 logging.DEBUG, 214 lambda: f"Job {job.api_job_id()} is not ready to retry yet (deferred). Skipping.", 215 ) 216 continue 217 if not job.retry_deferred(): 218 job.set_retry_after( 219 datetime.now(tz=timezone.utc) 220 + timedelta(seconds=self._failed_retry_wait_time_in_seconds) 221 ) 222 lazy_log( 223 LOGGER, 224 logging.INFO, 225 lambda: f"Job {job.api_job_id()} failed. Deferring retry for {self._failed_retry_wait_time_in_seconds} seconds.", 226 ) 227 continue 228 new_job = self._start_job(job.job_parameters(), job.api_job_id()) 229 partition.replace_job(job, [new_job]) 230 231 def _start_jobs(self) -> None: 232 """ 233 Retry failed jobs and start jobs for each slice in the slice iterator. 234 This method iterates over the running jobs and slice iterator and starts a job for each slice. 235 The started jobs are added to the running partitions. 236 Returns: 237 None 238 239 However, the first iteration is for sendgrid which only has one job. 240 """ 241 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = False 242 _slice = None 243 try: 244 for partition in self._running_partitions: 245 self._replace_failed_jobs(partition) 246 247 if ( 248 self._has_bulk_parent 249 and self._running_partitions 250 and self._slice_iterator.has_next() 251 ): 252 LOGGER.debug( 253 "This AsyncJobOrchestrator is operating as a child of a bulk stream hence we limit the number of concurrent jobs on the child until there are no more parent slices to avoid the child taking all the API job budget" 254 ) 255 return 256 257 for _slice in self._slice_iterator: 258 at_least_one_slice_consumed_from_slice_iterator_during_current_iteration = True 259 job = self._start_job(_slice) 260 self._running_partitions.append(AsyncPartition([job], _slice, self._job_max_retry)) 261 if self._has_bulk_parent and self._slice_iterator.has_next(): 262 break 263 except ConcurrentJobLimitReached: 264 if at_least_one_slice_consumed_from_slice_iterator_during_current_iteration: 265 # this means a slice has been consumed but the job couldn't be create therefore we need to put it back at the beginning of the _slice_iterator 266 self._slice_iterator.add_at_the_beginning(_slice) # type: ignore # we know it's not None here because `ConcurrentJobLimitReached` happens during the for loop 267 LOGGER.debug( 268 "Waiting before creating more jobs as the limit of concurrent jobs has been reached. Will try again later..." 269 ) 270 271 def _start_job(self, _slice: StreamSlice, previous_job_id: Optional[str] = None) -> AsyncJob: 272 if previous_job_id: 273 id_to_replace = previous_job_id 274 lazy_log(LOGGER, logging.DEBUG, lambda: f"Attempting to replace job {id_to_replace}...") 275 else: 276 id_to_replace = self._job_tracker.try_to_get_intent() 277 278 try: 279 job = self._job_repository.start(_slice) 280 self._job_tracker.add_job(id_to_replace, job.api_job_id()) 281 return job 282 except Exception as exception: 283 LOGGER.warning(f"Exception has occurred during job creation: {exception}") 284 if self._is_breaking_exception(exception): 285 self._job_tracker.remove_job(id_to_replace) 286 raise exception 287 return self._keep_api_budget_with_failed_job(_slice, exception, id_to_replace) 288 289 def _keep_api_budget_with_failed_job( 290 self, _slice: StreamSlice, exception: Exception, intent: str 291 ) -> AsyncJob: 292 """ 293 We have a mechanism to retry job. It is used when a job status is FAILED or TIMED_OUT. The easiest way to retry is to have this job 294 as created in a failed state and leverage the retry for failed/timed out jobs. This way, we don't have to have another process for 295 retrying jobs that couldn't be started. 296 """ 297 LOGGER.warning( 298 f"Could not start job for slice {_slice}. Job will be flagged as failed and retried if max number of attempts not reached: {exception}" 299 ) 300 traced_exception = ( 301 exception 302 if isinstance(exception, AirbyteTracedException) 303 else AirbyteTracedException.from_exception(exception) 304 ) 305 # Even though we're not sure this will break the stream, we will emit here for simplicity's sake. If we wanted to be more accurate, 306 # we would keep the exceptions in-memory until we know that we have reached the max attempt. 307 self._message_repository.emit_message(traced_exception.as_airbyte_message()) 308 job = self._create_failed_job(_slice) 309 self._job_tracker.add_job(intent, job.api_job_id()) 310 return job 311 312 def _create_failed_job(self, stream_slice: StreamSlice) -> AsyncJob: 313 job = AsyncJob( 314 f"{uuid.uuid4()} - Job that could not start", 315 stream_slice, 316 _NO_TIMEOUT, 317 is_creation_failure=True, 318 ) 319 job.update_status(AsyncJobStatus.FAILED) 320 return job 321 322 def _get_running_jobs(self) -> Set[AsyncJob]: 323 """ 324 Returns a set of running AsyncJob objects. 325 326 Returns: 327 Set[AsyncJob]: A set of AsyncJob objects that are currently running. 328 """ 329 return { 330 job 331 for partition in self._running_partitions 332 for job in partition.jobs 333 if job.status() == AsyncJobStatus.RUNNING 334 } 335 336 def _update_jobs_status(self) -> None: 337 """ 338 Update the status of all running jobs in the repository. 339 """ 340 running_jobs = self._get_running_jobs() 341 if running_jobs: 342 # update the status only if there are RUNNING jobs 343 self._job_repository.update_jobs_status(running_jobs) 344 345 def _wait_on_status_update(self) -> None: 346 """ 347 Waits for a specified amount of time between status updates. 348 349 350 This method is used to introduce a delay between status updates in order to avoid excessive polling. 351 The duration of the delay is determined by the value of `_WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS`. 352 353 Returns: 354 None 355 """ 356 lazy_log( 357 LOGGER, 358 logging.DEBUG, 359 lambda: f"Polling status in progress. There are currently {len(self._running_partitions)} running partitions.", 360 ) 361 362 lazy_log( 363 LOGGER, 364 logging.DEBUG, 365 lambda: f"Waiting for {self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS} seconds before next poll...", 366 ) 367 time.sleep(self._WAIT_TIME_BETWEEN_STATUS_UPDATE_IN_SECONDS) 368 369 def _process_completed_partition(self, partition: AsyncPartition) -> None: 370 """ 371 Process a completed partition. 372 Args: 373 partition (AsyncPartition): The completed partition to process. 374 """ 375 job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) 376 LOGGER.info( 377 f"The following jobs for stream slice {partition.stream_slice} have been completed: {job_ids}." 378 ) 379 380 # It is important to remove the jobs from the job tracker before yielding the partition as the caller might try to schedule jobs 381 # but won't be able to as all jobs slots are taken even though job is done. 382 for job in partition.jobs: 383 self._job_tracker.remove_job(job.api_job_id()) 384 385 def _process_skipped_partition(self, partition: AsyncPartition) -> None: 386 """ 387 Process a skipped partition. The API indicated there is no data to return for this job 388 (e.g. Amazon SP-API CANCELLED status means no data to report). We clean up the job 389 allocation without fetching any records or raising errors. 390 """ 391 job_ids = list(map(lambda job: job.api_job_id(), {job for job in partition.jobs})) 392 LOGGER.info( 393 f"The following jobs for stream slice {partition.stream_slice} have been skipped (no data to return): {job_ids}." 394 ) 395 for job in partition.jobs: 396 self._job_tracker.remove_job(job.api_job_id()) 397 398 def _process_running_partitions_and_yield_completed_ones( 399 self, 400 ) -> Generator[AsyncPartition, Any, None]: 401 """ 402 Process the running partitions. 403 404 Yields: 405 AsyncPartition: The processed partition. 406 407 Raises: 408 Any: Any exception raised during processing. 409 """ 410 current_running_partitions: List[AsyncPartition] = [] 411 for partition in self._running_partitions: 412 match partition.status: 413 case AsyncJobStatus.COMPLETED: 414 self._process_completed_partition(partition) 415 yield partition 416 case AsyncJobStatus.SKIPPED: 417 self._process_skipped_partition(partition) 418 case AsyncJobStatus.RUNNING: 419 current_running_partitions.append(partition) 420 case _ if partition.has_reached_max_attempt(): 421 self._stop_partition(partition) 422 self._process_partitions_with_errors(partition) 423 case _: 424 self._stop_timed_out_jobs(partition) 425 # re-allocate FAILED jobs, but TIMEOUT jobs are not re-allocated 426 self._reallocate_partition(current_running_partitions, partition) 427 428 # We only remove completed / timeout jobs jobs as we want failed jobs to be re-allocated in priority 429 self._remove_completed_jobs(partition) 430 431 # update the referenced list with running partitions 432 self._running_partitions = current_running_partitions 433 434 def _stop_partition(self, partition: AsyncPartition) -> None: 435 for job in partition.jobs: 436 if job.status() in _API_SIDE_RUNNING_STATUS: 437 self._abort_job(job, free_job_allocation=True) 438 else: 439 self._job_tracker.remove_job(job.api_job_id()) 440 441 def _stop_timed_out_jobs(self, partition: AsyncPartition) -> None: 442 for job in partition.jobs: 443 if job.status() == AsyncJobStatus.TIMED_OUT: 444 self._abort_job(job, free_job_allocation=False) 445 446 def _abort_job(self, job: AsyncJob, free_job_allocation: bool = True) -> None: 447 try: 448 self._job_repository.abort(job) 449 if free_job_allocation: 450 self._job_tracker.remove_job(job.api_job_id()) 451 except Exception as exception: 452 LOGGER.warning(f"Could not free budget for job {job.api_job_id()}: {exception}") 453 454 def _remove_completed_jobs(self, partition: AsyncPartition) -> None: 455 """ 456 Remove completed or timed out jobs from the partition. 457 458 Args: 459 partition (AsyncPartition): The partition to process. 460 """ 461 for job in partition.jobs: 462 if job.status() == AsyncJobStatus.COMPLETED: 463 self._job_tracker.remove_job(job.api_job_id()) 464 465 def _reallocate_partition( 466 self, 467 current_running_partitions: List[AsyncPartition], 468 partition: AsyncPartition, 469 ) -> None: 470 """ 471 Reallocate the partition by starting a new job for each job in the 472 partition. 473 Args: 474 current_running_partitions (list): The list of currently running partitions. 475 partition (AsyncPartition): The partition to reallocate. 476 """ 477 current_running_partitions.insert(0, partition) 478 479 def _process_partitions_with_errors(self, partition: AsyncPartition) -> None: 480 """ 481 Process a partition with status errors (FAILED and TIMEOUT). 482 483 Args: 484 partition (AsyncPartition): The partition to process. 485 Returns: 486 AirbyteTracedException: An exception indicating that at least one job could not be completed. 487 Raises: 488 AirbyteTracedException: If at least one job could not be completed. 489 """ 490 status_by_job_id = {job.api_job_id(): job.status() for job in partition.jobs} 491 self._non_breaking_exceptions.append( 492 AirbyteTracedException( 493 message="Async job failed after exhausting all retry attempts.", 494 internal_message=f"At least one job could not be completed for slice {partition.stream_slice}. Job statuses were: {status_by_job_id}. See warning logs for more information.", 495 failure_type=FailureType.system_error, 496 ) 497 ) 498 499 def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: 500 """ 501 Creates and retrieves completed partitions. 502 This method continuously starts jobs, updates job status, processes running partitions, 503 logs polling partitions, and waits for status updates. It yields completed partitions 504 as they become available. 505 506 Returns: 507 An iterable of completed partitions, represented as AsyncPartition objects. 508 Each partition is wrapped in an Optional, allowing for None values. 509 """ 510 while True: 511 try: 512 lazy_log( 513 LOGGER, 514 logging.DEBUG, 515 lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop", 516 ) 517 self._start_jobs() 518 if not self._slice_iterator.has_next() and not self._running_partitions: 519 break 520 521 self._update_jobs_status() 522 yield from self._process_running_partitions_and_yield_completed_ones() 523 self._wait_on_status_update() 524 except Exception as exception: 525 LOGGER.warning( 526 f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}" 527 ) 528 if self._is_breaking_exception(exception): 529 self._abort_all_running_jobs() 530 raise exception 531 532 self._non_breaking_exceptions.append(exception) 533 534 LOGGER.info( 535 f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}" 536 ) 537 if self._non_breaking_exceptions: 538 # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the 539 # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. 540 raise AirbyteTracedException( 541 message="One or more async jobs failed after exhausting all retry attempts.", 542 internal_message="\n".join( 543 [ 544 filter_secrets(exception.__repr__()) 545 for exception in self._non_breaking_exceptions 546 ] 547 ), 548 failure_type=FailureType.system_error, 549 ) 550 551 def _handle_non_breaking_error(self, exception: Exception) -> None: 552 LOGGER.error(f"Failed to start the Job: {exception}, traceback: {traceback.format_exc()}") 553 self._non_breaking_exceptions.append(exception) 554 555 def _abort_all_running_jobs(self) -> None: 556 for partition in self._running_partitions: 557 for job in partition.jobs: 558 if job.status() in self._RUNNING_ON_API_SIDE_STATUS: 559 self._abort_job(job, free_job_allocation=True) 560 self._job_tracker.remove_job(job.api_job_id()) 561 562 self._running_partitions = [] 563 564 def _is_breaking_exception(self, exception: Exception) -> bool: 565 return isinstance(exception, self._exceptions_to_break_on) or ( 566 isinstance(exception, AirbyteTracedException) 567 and exception.failure_type == FailureType.config_error 568 ) 569 570 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 571 """ 572 Fetches records from the given jobs. 573 574 Args: 575 async_jobs Iterable[AsyncJob]: The list of AsyncJobs. 576 577 Yields: 578 Iterable[Mapping[str, Any]]: The fetched records from the jobs. 579 """ 580 for job in async_jobs: 581 yield from self._job_repository.fetch_records(job) 582 self._job_repository.delete(job)
163 def __init__( 164 self, 165 job_repository: AsyncJobRepository, 166 slices: Iterable[StreamSlice], 167 job_tracker: JobTracker, 168 message_repository: MessageRepository, 169 exceptions_to_break_on: Iterable[Type[Exception]] = tuple(), 170 has_bulk_parent: bool = False, 171 job_max_retry: Optional[int] = None, 172 failed_retry_wait_time_in_seconds: Optional[int] = None, 173 ) -> None: 174 """ 175 If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, `has_bulk_parent` 176 needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation 177 where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of 178 "trying to start a parent job" and "ConcurrentJobLimitReached". 179 """ 180 if {*AsyncJobStatus} != self._KNOWN_JOB_STATUSES: 181 # this is to prevent developers updating the possible statuses without updating the logic of this class 182 raise ValueError( 183 "An AsyncJobStatus has been either removed or added which means the logic of this class needs to be reviewed. Once the logic has been updated, please update _KNOWN_JOB_STATUSES" 184 ) 185 186 if failed_retry_wait_time_in_seconds is not None and failed_retry_wait_time_in_seconds <= 0: 187 raise ValueError("failed_retry_wait_time_in_seconds must be >= 1") 188 189 self._job_repository: AsyncJobRepository = job_repository 190 self._slice_iterator = LookaheadIterator(slices) 191 self._running_partitions: List[AsyncPartition] = [] 192 self._job_tracker = job_tracker 193 self._message_repository = message_repository 194 self._exceptions_to_break_on: Tuple[Type[Exception], ...] = tuple(exceptions_to_break_on) 195 self._has_bulk_parent = has_bulk_parent 196 self._job_max_retry = job_max_retry 197 self._failed_retry_wait_time_in_seconds = failed_retry_wait_time_in_seconds 198 199 self._non_breaking_exceptions: List[Exception] = []
If the stream slices provided as a parameters relies on a async job streams that relies on the same JobTracker, has_bulk_parent
needs to be set to True as jobs creation needs to be prioritized on the parent level. Doing otherwise could lead to a situation
where the child has taken up all the job budget without room to the parent to create more which would lead to an infinite loop of
"trying to start a parent job" and "ConcurrentJobLimitReached".
499 def create_and_get_completed_partitions(self) -> Iterable[AsyncPartition]: 500 """ 501 Creates and retrieves completed partitions. 502 This method continuously starts jobs, updates job status, processes running partitions, 503 logs polling partitions, and waits for status updates. It yields completed partitions 504 as they become available. 505 506 Returns: 507 An iterable of completed partitions, represented as AsyncPartition objects. 508 Each partition is wrapped in an Optional, allowing for None values. 509 """ 510 while True: 511 try: 512 lazy_log( 513 LOGGER, 514 logging.DEBUG, 515 lambda: f"JobOrchestrator loop - (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) is starting the async job loop", 516 ) 517 self._start_jobs() 518 if not self._slice_iterator.has_next() and not self._running_partitions: 519 break 520 521 self._update_jobs_status() 522 yield from self._process_running_partitions_and_yield_completed_ones() 523 self._wait_on_status_update() 524 except Exception as exception: 525 LOGGER.warning( 526 f"Caught exception that stops the processing of the jobs: {exception}. Traceback: {traceback.format_exc()}" 527 ) 528 if self._is_breaking_exception(exception): 529 self._abort_all_running_jobs() 530 raise exception 531 532 self._non_breaking_exceptions.append(exception) 533 534 LOGGER.info( 535 f"JobOrchestrator loop - Thread (Thread {threading.get_native_id()}, AsyncJobOrchestrator {self}) completed! Errors during creation were {self._non_breaking_exceptions}" 536 ) 537 if self._non_breaking_exceptions: 538 # We emitted traced message but we didn't break on non_breaking_exception. We still need to raise an exception so that the 539 # call of `create_and_get_completed_partitions` knows that there was an issue with some partitions and the sync is incomplete. 540 raise AirbyteTracedException( 541 message="One or more async jobs failed after exhausting all retry attempts.", 542 internal_message="\n".join( 543 [ 544 filter_secrets(exception.__repr__()) 545 for exception in self._non_breaking_exceptions 546 ] 547 ), 548 failure_type=FailureType.system_error, 549 )
Creates and retrieves completed partitions. This method continuously starts jobs, updates job status, processes running partitions, logs polling partitions, and waits for status updates. It yields completed partitions as they become available.
Returns:
An iterable of completed partitions, represented as AsyncPartition objects. Each partition is wrapped in an Optional, allowing for None values.
570 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 571 """ 572 Fetches records from the given jobs. 573 574 Args: 575 async_jobs Iterable[AsyncJob]: The list of AsyncJobs. 576 577 Yields: 578 Iterable[Mapping[str, Any]]: The fetched records from the jobs. 579 """ 580 for job in async_jobs: 581 yield from self._job_repository.fetch_records(job) 582 self._job_repository.delete(job)
Fetches records from the given jobs.
Arguments:
- async_jobs Iterable[AsyncJob]: The list of AsyncJobs.
Yields:
Iterable[Mapping[str, Any]]: The fetched records from the jobs.