airbyte_cdk.sources.declarative.async_job.job
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3 4from datetime import datetime, timedelta, timezone 5from typing import Optional 6 7from airbyte_cdk.sources.declarative.async_job.timer import Timer 8from airbyte_cdk.sources.types import StreamSlice 9 10from .status import AsyncJobStatus 11 12 13class AsyncJob: 14 """ 15 Description of an API job. 16 17 Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for 18 it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status. 19 """ 20 21 def __init__( 22 self, 23 api_job_id: str, 24 job_parameters: StreamSlice, 25 timeout: Optional[timedelta] = None, 26 is_creation_failure: bool = False, 27 ) -> None: 28 self._api_job_id = api_job_id 29 self._job_parameters = job_parameters 30 self._status = AsyncJobStatus.RUNNING 31 self._retry_after: Optional[datetime] = None 32 self._is_creation_failure = is_creation_failure 33 34 timeout = timeout if timeout else timedelta(minutes=60) 35 self._timer = Timer(timeout) 36 self._timer.start() 37 38 def api_job_id(self) -> str: 39 return self._api_job_id 40 41 def status(self) -> AsyncJobStatus: 42 if self._timer.has_timed_out(): 43 # TODO: we should account the fact that, 44 # certain APIs could send the `Timeout` status, 45 # thus we should not return `Timeout` in that case, 46 # but act based on the scenario. 47 48 # the default behavior is to return `Timeout` status and retry. 49 return AsyncJobStatus.TIMED_OUT 50 return self._status 51 52 def job_parameters(self) -> StreamSlice: 53 return self._job_parameters 54 55 def update_status(self, status: AsyncJobStatus) -> None: 56 if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING: 57 self._timer.start() 58 elif status.is_terminal(): 59 self._timer.stop() 60 61 self._status = status 62 63 def is_creation_failure(self) -> bool: 64 """Return True if this job was never actually created on the API side.""" 65 return self._is_creation_failure 66 67 def set_retry_after(self, retry_after: datetime) -> None: 68 """Set the earliest time this job can be retried.""" 69 self._retry_after = retry_after 70 71 def retry_deferred(self) -> bool: 72 """Return True if a deferred retry has been scheduled.""" 73 return self._retry_after is not None 74 75 def ready_to_retry(self) -> bool: 76 """Return True if the job has no deferred retry or the wait period has elapsed.""" 77 if self._retry_after is None: 78 return True 79 return datetime.now(tz=timezone.utc) >= self._retry_after 80 81 def __repr__(self) -> str: 82 return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
class
AsyncJob:
14class AsyncJob: 15 """ 16 Description of an API job. 17 18 Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for 19 it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status. 20 """ 21 22 def __init__( 23 self, 24 api_job_id: str, 25 job_parameters: StreamSlice, 26 timeout: Optional[timedelta] = None, 27 is_creation_failure: bool = False, 28 ) -> None: 29 self._api_job_id = api_job_id 30 self._job_parameters = job_parameters 31 self._status = AsyncJobStatus.RUNNING 32 self._retry_after: Optional[datetime] = None 33 self._is_creation_failure = is_creation_failure 34 35 timeout = timeout if timeout else timedelta(minutes=60) 36 self._timer = Timer(timeout) 37 self._timer.start() 38 39 def api_job_id(self) -> str: 40 return self._api_job_id 41 42 def status(self) -> AsyncJobStatus: 43 if self._timer.has_timed_out(): 44 # TODO: we should account the fact that, 45 # certain APIs could send the `Timeout` status, 46 # thus we should not return `Timeout` in that case, 47 # but act based on the scenario. 48 49 # the default behavior is to return `Timeout` status and retry. 50 return AsyncJobStatus.TIMED_OUT 51 return self._status 52 53 def job_parameters(self) -> StreamSlice: 54 return self._job_parameters 55 56 def update_status(self, status: AsyncJobStatus) -> None: 57 if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING: 58 self._timer.start() 59 elif status.is_terminal(): 60 self._timer.stop() 61 62 self._status = status 63 64 def is_creation_failure(self) -> bool: 65 """Return True if this job was never actually created on the API side.""" 66 return self._is_creation_failure 67 68 def set_retry_after(self, retry_after: datetime) -> None: 69 """Set the earliest time this job can be retried.""" 70 self._retry_after = retry_after 71 72 def retry_deferred(self) -> bool: 73 """Return True if a deferred retry has been scheduled.""" 74 return self._retry_after is not None 75 76 def ready_to_retry(self) -> bool: 77 """Return True if the job has no deferred retry or the wait period has elapsed.""" 78 if self._retry_after is None: 79 return True 80 return datetime.now(tz=timezone.utc) >= self._retry_after 81 82 def __repr__(self) -> str: 83 return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
Description of an API job.
Note that the timer will only stop once update_status is called so the job might be completed on the API side but until we query for
it and call ApiJob.update_status, ApiJob.status will not reflect the actual API side status.
AsyncJob( api_job_id: str, job_parameters: airbyte_cdk.StreamSlice, timeout: Optional[datetime.timedelta] = None, is_creation_failure: bool = False)
22 def __init__( 23 self, 24 api_job_id: str, 25 job_parameters: StreamSlice, 26 timeout: Optional[timedelta] = None, 27 is_creation_failure: bool = False, 28 ) -> None: 29 self._api_job_id = api_job_id 30 self._job_parameters = job_parameters 31 self._status = AsyncJobStatus.RUNNING 32 self._retry_after: Optional[datetime] = None 33 self._is_creation_failure = is_creation_failure 34 35 timeout = timeout if timeout else timedelta(minutes=60) 36 self._timer = Timer(timeout) 37 self._timer.start()
42 def status(self) -> AsyncJobStatus: 43 if self._timer.has_timed_out(): 44 # TODO: we should account the fact that, 45 # certain APIs could send the `Timeout` status, 46 # thus we should not return `Timeout` in that case, 47 # but act based on the scenario. 48 49 # the default behavior is to return `Timeout` status and retry. 50 return AsyncJobStatus.TIMED_OUT 51 return self._status
def
update_status( self, status: airbyte_cdk.sources.declarative.async_job.status.AsyncJobStatus) -> None:
def
is_creation_failure(self) -> bool:
64 def is_creation_failure(self) -> bool: 65 """Return True if this job was never actually created on the API side.""" 66 return self._is_creation_failure
Return True if this job was never actually created on the API side.
def
set_retry_after(self, retry_after: datetime.datetime) -> None:
68 def set_retry_after(self, retry_after: datetime) -> None: 69 """Set the earliest time this job can be retried.""" 70 self._retry_after = retry_after
Set the earliest time this job can be retried.
def
retry_deferred(self) -> bool:
72 def retry_deferred(self) -> bool: 73 """Return True if a deferred retry has been scheduled.""" 74 return self._retry_after is not None
Return True if a deferred retry has been scheduled.
def
ready_to_retry(self) -> bool:
76 def ready_to_retry(self) -> bool: 77 """Return True if the job has no deferred retry or the wait period has elapsed.""" 78 if self._retry_after is None: 79 return True 80 return datetime.now(tz=timezone.utc) >= self._retry_after
Return True if the job has no deferred retry or the wait period has elapsed.