airbyte_cdk.sources.declarative.async_job.job
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3 4from datetime import timedelta 5from typing import Optional 6 7from airbyte_cdk.sources.declarative.async_job.timer import Timer 8from airbyte_cdk.sources.types import StreamSlice 9 10from .status import AsyncJobStatus 11 12 13class AsyncJob: 14 """ 15 Description of an API job. 16 17 Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for 18 it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status. 19 """ 20 21 def __init__( 22 self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None 23 ) -> None: 24 self._api_job_id = api_job_id 25 self._job_parameters = job_parameters 26 self._status = AsyncJobStatus.RUNNING 27 28 timeout = timeout if timeout else timedelta(minutes=60) 29 self._timer = Timer(timeout) 30 self._timer.start() 31 32 def api_job_id(self) -> str: 33 return self._api_job_id 34 35 def status(self) -> AsyncJobStatus: 36 if self._timer.has_timed_out(): 37 # TODO: we should account the fact that, 38 # certain APIs could send the `Timeout` status, 39 # thus we should not return `Timeout` in that case, 40 # but act based on the scenario. 41 42 # the default behavior is to return `Timeout` status and retry. 43 return AsyncJobStatus.TIMED_OUT 44 return self._status 45 46 def job_parameters(self) -> StreamSlice: 47 return self._job_parameters 48 49 def update_status(self, status: AsyncJobStatus) -> None: 50 if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING: 51 self._timer.start() 52 elif status.is_terminal(): 53 self._timer.stop() 54 55 self._status = status 56 57 def __repr__(self) -> str: 58 return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
class
AsyncJob:
14class AsyncJob: 15 """ 16 Description of an API job. 17 18 Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for 19 it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status. 20 """ 21 22 def __init__( 23 self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None 24 ) -> None: 25 self._api_job_id = api_job_id 26 self._job_parameters = job_parameters 27 self._status = AsyncJobStatus.RUNNING 28 29 timeout = timeout if timeout else timedelta(minutes=60) 30 self._timer = Timer(timeout) 31 self._timer.start() 32 33 def api_job_id(self) -> str: 34 return self._api_job_id 35 36 def status(self) -> AsyncJobStatus: 37 if self._timer.has_timed_out(): 38 # TODO: we should account the fact that, 39 # certain APIs could send the `Timeout` status, 40 # thus we should not return `Timeout` in that case, 41 # but act based on the scenario. 42 43 # the default behavior is to return `Timeout` status and retry. 44 return AsyncJobStatus.TIMED_OUT 45 return self._status 46 47 def job_parameters(self) -> StreamSlice: 48 return self._job_parameters 49 50 def update_status(self, status: AsyncJobStatus) -> None: 51 if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING: 52 self._timer.start() 53 elif status.is_terminal(): 54 self._timer.stop() 55 56 self._status = status 57 58 def __repr__(self) -> str: 59 return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
Description of an API job.
Note that the timer will only stop once update_status
is called so the job might be completed on the API side but until we query for
it and call ApiJob.update_status
, ApiJob.status
will not reflect the actual API side status.
AsyncJob( api_job_id: str, job_parameters: airbyte_cdk.StreamSlice, timeout: Optional[datetime.timedelta] = None)
22 def __init__( 23 self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None 24 ) -> None: 25 self._api_job_id = api_job_id 26 self._job_parameters = job_parameters 27 self._status = AsyncJobStatus.RUNNING 28 29 timeout = timeout if timeout else timedelta(minutes=60) 30 self._timer = Timer(timeout) 31 self._timer.start()
36 def status(self) -> AsyncJobStatus: 37 if self._timer.has_timed_out(): 38 # TODO: we should account the fact that, 39 # certain APIs could send the `Timeout` status, 40 # thus we should not return `Timeout` in that case, 41 # but act based on the scenario. 42 43 # the default behavior is to return `Timeout` status and retry. 44 return AsyncJobStatus.TIMED_OUT 45 return self._status
def
update_status( self, status: airbyte_cdk.sources.declarative.async_job.status.AsyncJobStatus) -> None: