airbyte_cdk.sources.declarative.async_job.timer

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2from datetime import datetime, timedelta, timezone
 3from typing import Optional
 4
 5
 6class Timer:
 7    def __init__(self, timeout: timedelta) -> None:
 8        self._start_datetime: Optional[datetime] = None
 9        self._end_datetime: Optional[datetime] = None
10        self._timeout = timeout
11
12    def start(self) -> None:
13        self._start_datetime = self._now()
14        self._end_datetime = None
15
16    def stop(self) -> None:
17        if self._end_datetime is None:
18            self._end_datetime = self._now()
19
20    def is_started(self) -> bool:
21        return self._start_datetime is not None
22
23    @property
24    def elapsed_time(self) -> Optional[timedelta]:
25        if not self._start_datetime:
26            return None
27
28        end_time = self._end_datetime or self._now()
29        elapsed_period = end_time - self._start_datetime
30        return elapsed_period
31
32    def has_timed_out(self) -> bool:
33        if not self.is_started():
34            return False
35        return self.elapsed_time > self._timeout  # type: ignore  # given the job timer is started, we assume there is an elapsed_period
36
37    @staticmethod
38    def _now() -> datetime:
39        return datetime.now(tz=timezone.utc)
class Timer:
 7class Timer:
 8    def __init__(self, timeout: timedelta) -> None:
 9        self._start_datetime: Optional[datetime] = None
10        self._end_datetime: Optional[datetime] = None
11        self._timeout = timeout
12
13    def start(self) -> None:
14        self._start_datetime = self._now()
15        self._end_datetime = None
16
17    def stop(self) -> None:
18        if self._end_datetime is None:
19            self._end_datetime = self._now()
20
21    def is_started(self) -> bool:
22        return self._start_datetime is not None
23
24    @property
25    def elapsed_time(self) -> Optional[timedelta]:
26        if not self._start_datetime:
27            return None
28
29        end_time = self._end_datetime or self._now()
30        elapsed_period = end_time - self._start_datetime
31        return elapsed_period
32
33    def has_timed_out(self) -> bool:
34        if not self.is_started():
35            return False
36        return self.elapsed_time > self._timeout  # type: ignore  # given the job timer is started, we assume there is an elapsed_period
37
38    @staticmethod
39    def _now() -> datetime:
40        return datetime.now(tz=timezone.utc)
Timer(timeout: datetime.timedelta)
 8    def __init__(self, timeout: timedelta) -> None:
 9        self._start_datetime: Optional[datetime] = None
10        self._end_datetime: Optional[datetime] = None
11        self._timeout = timeout
def start(self) -> None:
13    def start(self) -> None:
14        self._start_datetime = self._now()
15        self._end_datetime = None
def stop(self) -> None:
17    def stop(self) -> None:
18        if self._end_datetime is None:
19            self._end_datetime = self._now()
def is_started(self) -> bool:
21    def is_started(self) -> bool:
22        return self._start_datetime is not None
elapsed_time: Optional[datetime.timedelta]
24    @property
25    def elapsed_time(self) -> Optional[timedelta]:
26        if not self._start_datetime:
27            return None
28
29        end_time = self._end_datetime or self._now()
30        elapsed_period = end_time - self._start_datetime
31        return elapsed_period
def has_timed_out(self) -> bool:
33    def has_timed_out(self) -> bool:
34        if not self.is_started():
35            return False
36        return self.elapsed_time > self._timeout  # type: ignore  # given the job timer is started, we assume there is an elapsed_period