airbyte_cdk.sources.declarative.async_job.job

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3
 4from datetime import timedelta
 5from typing import Optional
 6
 7from airbyte_cdk.sources.declarative.async_job.timer import Timer
 8from airbyte_cdk.sources.types import StreamSlice
 9
10from .status import AsyncJobStatus
11
12
13class AsyncJob:
14    """
15    Description of an API job.
16
17    Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for
18    it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status.
19    """
20
21    def __init__(
22        self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None
23    ) -> None:
24        self._api_job_id = api_job_id
25        self._job_parameters = job_parameters
26        self._status = AsyncJobStatus.RUNNING
27
28        timeout = timeout if timeout else timedelta(minutes=60)
29        self._timer = Timer(timeout)
30        self._timer.start()
31
32    def api_job_id(self) -> str:
33        return self._api_job_id
34
35    def status(self) -> AsyncJobStatus:
36        if self._timer.has_timed_out():
37            # TODO: we should account the fact that,
38            # certain APIs could send the `Timeout` status,
39            # thus we should not return `Timeout` in that case,
40            # but act based on the scenario.
41
42            # the default behavior is to return `Timeout` status and retry.
43            return AsyncJobStatus.TIMED_OUT
44        return self._status
45
46    def job_parameters(self) -> StreamSlice:
47        return self._job_parameters
48
49    def update_status(self, status: AsyncJobStatus) -> None:
50        if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING:
51            self._timer.start()
52        elif status.is_terminal():
53            self._timer.stop()
54
55        self._status = status
56
57    def __repr__(self) -> str:
58        return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
class AsyncJob:
14class AsyncJob:
15    """
16    Description of an API job.
17
18    Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for
19    it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status.
20    """
21
22    def __init__(
23        self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None
24    ) -> None:
25        self._api_job_id = api_job_id
26        self._job_parameters = job_parameters
27        self._status = AsyncJobStatus.RUNNING
28
29        timeout = timeout if timeout else timedelta(minutes=60)
30        self._timer = Timer(timeout)
31        self._timer.start()
32
33    def api_job_id(self) -> str:
34        return self._api_job_id
35
36    def status(self) -> AsyncJobStatus:
37        if self._timer.has_timed_out():
38            # TODO: we should account the fact that,
39            # certain APIs could send the `Timeout` status,
40            # thus we should not return `Timeout` in that case,
41            # but act based on the scenario.
42
43            # the default behavior is to return `Timeout` status and retry.
44            return AsyncJobStatus.TIMED_OUT
45        return self._status
46
47    def job_parameters(self) -> StreamSlice:
48        return self._job_parameters
49
50    def update_status(self, status: AsyncJobStatus) -> None:
51        if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING:
52            self._timer.start()
53        elif status.is_terminal():
54            self._timer.stop()
55
56        self._status = status
57
58    def __repr__(self) -> str:
59        return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"

Description of an API job.

Note that the timer will only stop once update_status is called so the job might be completed on the API side but until we query for it and call ApiJob.update_status, ApiJob.status will not reflect the actual API side status.

AsyncJob( api_job_id: str, job_parameters: airbyte_cdk.StreamSlice, timeout: Optional[datetime.timedelta] = None)
22    def __init__(
23        self, api_job_id: str, job_parameters: StreamSlice, timeout: Optional[timedelta] = None
24    ) -> None:
25        self._api_job_id = api_job_id
26        self._job_parameters = job_parameters
27        self._status = AsyncJobStatus.RUNNING
28
29        timeout = timeout if timeout else timedelta(minutes=60)
30        self._timer = Timer(timeout)
31        self._timer.start()
def api_job_id(self) -> str:
33    def api_job_id(self) -> str:
34        return self._api_job_id
36    def status(self) -> AsyncJobStatus:
37        if self._timer.has_timed_out():
38            # TODO: we should account the fact that,
39            # certain APIs could send the `Timeout` status,
40            # thus we should not return `Timeout` in that case,
41            # but act based on the scenario.
42
43            # the default behavior is to return `Timeout` status and retry.
44            return AsyncJobStatus.TIMED_OUT
45        return self._status
def job_parameters(self) -> airbyte_cdk.StreamSlice:
47    def job_parameters(self) -> StreamSlice:
48        return self._job_parameters
def update_status( self, status: airbyte_cdk.sources.declarative.async_job.status.AsyncJobStatus) -> None:
50    def update_status(self, status: AsyncJobStatus) -> None:
51        if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING:
52            self._timer.start()
53        elif status.is_terminal():
54            self._timer.stop()
55
56        self._status = status