airbyte_cdk.sources.declarative.async_job.job

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3
 4from datetime import datetime, timedelta, timezone
 5from typing import Optional
 6
 7from airbyte_cdk.sources.declarative.async_job.timer import Timer
 8from airbyte_cdk.sources.types import StreamSlice
 9
10from .status import AsyncJobStatus
11
12
13class AsyncJob:
14    """
15    Description of an API job.
16
17    Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for
18    it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status.
19    """
20
21    def __init__(
22        self,
23        api_job_id: str,
24        job_parameters: StreamSlice,
25        timeout: Optional[timedelta] = None,
26        is_creation_failure: bool = False,
27    ) -> None:
28        self._api_job_id = api_job_id
29        self._job_parameters = job_parameters
30        self._status = AsyncJobStatus.RUNNING
31        self._retry_after: Optional[datetime] = None
32        self._is_creation_failure = is_creation_failure
33
34        timeout = timeout if timeout else timedelta(minutes=60)
35        self._timer = Timer(timeout)
36        self._timer.start()
37
38    def api_job_id(self) -> str:
39        return self._api_job_id
40
41    def status(self) -> AsyncJobStatus:
42        if self._timer.has_timed_out():
43            # TODO: we should account the fact that,
44            # certain APIs could send the `Timeout` status,
45            # thus we should not return `Timeout` in that case,
46            # but act based on the scenario.
47
48            # the default behavior is to return `Timeout` status and retry.
49            return AsyncJobStatus.TIMED_OUT
50        return self._status
51
52    def job_parameters(self) -> StreamSlice:
53        return self._job_parameters
54
55    def update_status(self, status: AsyncJobStatus) -> None:
56        if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING:
57            self._timer.start()
58        elif status.is_terminal():
59            self._timer.stop()
60
61        self._status = status
62
63    def is_creation_failure(self) -> bool:
64        """Return True if this job was never actually created on the API side."""
65        return self._is_creation_failure
66
67    def set_retry_after(self, retry_after: datetime) -> None:
68        """Set the earliest time this job can be retried."""
69        self._retry_after = retry_after
70
71    def retry_deferred(self) -> bool:
72        """Return True if a deferred retry has been scheduled."""
73        return self._retry_after is not None
74
75    def ready_to_retry(self) -> bool:
76        """Return True if the job has no deferred retry or the wait period has elapsed."""
77        if self._retry_after is None:
78            return True
79        return datetime.now(tz=timezone.utc) >= self._retry_after
80
81    def __repr__(self) -> str:
82        return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"
class AsyncJob:
14class AsyncJob:
15    """
16    Description of an API job.
17
18    Note that the timer will only stop once `update_status` is called so the job might be completed on the API side but until we query for
19    it and call `ApiJob.update_status`, `ApiJob.status` will not reflect the actual API side status.
20    """
21
22    def __init__(
23        self,
24        api_job_id: str,
25        job_parameters: StreamSlice,
26        timeout: Optional[timedelta] = None,
27        is_creation_failure: bool = False,
28    ) -> None:
29        self._api_job_id = api_job_id
30        self._job_parameters = job_parameters
31        self._status = AsyncJobStatus.RUNNING
32        self._retry_after: Optional[datetime] = None
33        self._is_creation_failure = is_creation_failure
34
35        timeout = timeout if timeout else timedelta(minutes=60)
36        self._timer = Timer(timeout)
37        self._timer.start()
38
39    def api_job_id(self) -> str:
40        return self._api_job_id
41
42    def status(self) -> AsyncJobStatus:
43        if self._timer.has_timed_out():
44            # TODO: we should account the fact that,
45            # certain APIs could send the `Timeout` status,
46            # thus we should not return `Timeout` in that case,
47            # but act based on the scenario.
48
49            # the default behavior is to return `Timeout` status and retry.
50            return AsyncJobStatus.TIMED_OUT
51        return self._status
52
53    def job_parameters(self) -> StreamSlice:
54        return self._job_parameters
55
56    def update_status(self, status: AsyncJobStatus) -> None:
57        if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING:
58            self._timer.start()
59        elif status.is_terminal():
60            self._timer.stop()
61
62        self._status = status
63
64    def is_creation_failure(self) -> bool:
65        """Return True if this job was never actually created on the API side."""
66        return self._is_creation_failure
67
68    def set_retry_after(self, retry_after: datetime) -> None:
69        """Set the earliest time this job can be retried."""
70        self._retry_after = retry_after
71
72    def retry_deferred(self) -> bool:
73        """Return True if a deferred retry has been scheduled."""
74        return self._retry_after is not None
75
76    def ready_to_retry(self) -> bool:
77        """Return True if the job has no deferred retry or the wait period has elapsed."""
78        if self._retry_after is None:
79            return True
80        return datetime.now(tz=timezone.utc) >= self._retry_after
81
82    def __repr__(self) -> str:
83        return f"AsyncJob(api_job_id={self.api_job_id()}, job_parameters={self.job_parameters()}, status={self.status()})"

Description of an API job.

Note that the timer will only stop once update_status is called so the job might be completed on the API side but until we query for it and call ApiJob.update_status, ApiJob.status will not reflect the actual API side status.

AsyncJob( api_job_id: str, job_parameters: airbyte_cdk.StreamSlice, timeout: Optional[datetime.timedelta] = None, is_creation_failure: bool = False)
22    def __init__(
23        self,
24        api_job_id: str,
25        job_parameters: StreamSlice,
26        timeout: Optional[timedelta] = None,
27        is_creation_failure: bool = False,
28    ) -> None:
29        self._api_job_id = api_job_id
30        self._job_parameters = job_parameters
31        self._status = AsyncJobStatus.RUNNING
32        self._retry_after: Optional[datetime] = None
33        self._is_creation_failure = is_creation_failure
34
35        timeout = timeout if timeout else timedelta(minutes=60)
36        self._timer = Timer(timeout)
37        self._timer.start()
def api_job_id(self) -> str:
39    def api_job_id(self) -> str:
40        return self._api_job_id
42    def status(self) -> AsyncJobStatus:
43        if self._timer.has_timed_out():
44            # TODO: we should account the fact that,
45            # certain APIs could send the `Timeout` status,
46            # thus we should not return `Timeout` in that case,
47            # but act based on the scenario.
48
49            # the default behavior is to return `Timeout` status and retry.
50            return AsyncJobStatus.TIMED_OUT
51        return self._status
def job_parameters(self) -> airbyte_cdk.StreamSlice:
53    def job_parameters(self) -> StreamSlice:
54        return self._job_parameters
def update_status( self, status: airbyte_cdk.sources.declarative.async_job.status.AsyncJobStatus) -> None:
56    def update_status(self, status: AsyncJobStatus) -> None:
57        if self._status != AsyncJobStatus.RUNNING and status == AsyncJobStatus.RUNNING:
58            self._timer.start()
59        elif status.is_terminal():
60            self._timer.stop()
61
62        self._status = status
def is_creation_failure(self) -> bool:
64    def is_creation_failure(self) -> bool:
65        """Return True if this job was never actually created on the API side."""
66        return self._is_creation_failure

Return True if this job was never actually created on the API side.

def set_retry_after(self, retry_after: datetime.datetime) -> None:
68    def set_retry_after(self, retry_after: datetime) -> None:
69        """Set the earliest time this job can be retried."""
70        self._retry_after = retry_after

Set the earliest time this job can be retried.

def retry_deferred(self) -> bool:
72    def retry_deferred(self) -> bool:
73        """Return True if a deferred retry has been scheduled."""
74        return self._retry_after is not None

Return True if a deferred retry has been scheduled.

def ready_to_retry(self) -> bool:
76    def ready_to_retry(self) -> bool:
77        """Return True if the job has no deferred retry or the wait period has elapsed."""
78        if self._retry_after is None:
79            return True
80        return datetime.now(tz=timezone.utc) >= self._retry_after

Return True if the job has no deferred retry or the wait period has elapsed.