airbyte_cdk.sources.declarative.async_job.timer
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2from datetime import datetime, timedelta, timezone 3from typing import Optional 4 5 6class Timer: 7 def __init__(self, timeout: timedelta) -> None: 8 self._start_datetime: Optional[datetime] = None 9 self._end_datetime: Optional[datetime] = None 10 self._timeout = timeout 11 12 def start(self) -> None: 13 self._start_datetime = self._now() 14 self._end_datetime = None 15 16 def stop(self) -> None: 17 if self._end_datetime is None: 18 self._end_datetime = self._now() 19 20 def is_started(self) -> bool: 21 return self._start_datetime is not None 22 23 @property 24 def elapsed_time(self) -> Optional[timedelta]: 25 if not self._start_datetime: 26 return None 27 28 end_time = self._end_datetime or self._now() 29 elapsed_period = end_time - self._start_datetime 30 return elapsed_period 31 32 def has_timed_out(self) -> bool: 33 if not self.is_started(): 34 return False 35 return self.elapsed_time > self._timeout # type: ignore # given the job timer is started, we assume there is an elapsed_period 36 37 @staticmethod 38 def _now() -> datetime: 39 return datetime.now(tz=timezone.utc)
class
Timer:
7class Timer: 8 def __init__(self, timeout: timedelta) -> None: 9 self._start_datetime: Optional[datetime] = None 10 self._end_datetime: Optional[datetime] = None 11 self._timeout = timeout 12 13 def start(self) -> None: 14 self._start_datetime = self._now() 15 self._end_datetime = None 16 17 def stop(self) -> None: 18 if self._end_datetime is None: 19 self._end_datetime = self._now() 20 21 def is_started(self) -> bool: 22 return self._start_datetime is not None 23 24 @property 25 def elapsed_time(self) -> Optional[timedelta]: 26 if not self._start_datetime: 27 return None 28 29 end_time = self._end_datetime or self._now() 30 elapsed_period = end_time - self._start_datetime 31 return elapsed_period 32 33 def has_timed_out(self) -> bool: 34 if not self.is_started(): 35 return False 36 return self.elapsed_time > self._timeout # type: ignore # given the job timer is started, we assume there is an elapsed_period 37 38 @staticmethod 39 def _now() -> datetime: 40 return datetime.now(tz=timezone.utc)