airbyte_cdk.sources.streams.concurrent.clamping
1from abc import ABC 2from datetime import datetime, timedelta 3from enum import Enum 4from typing import Callable 5 6from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType 7 8 9class ClampingStrategy(ABC): 10 def clamp(self, value: CursorValueType) -> CursorValueType: 11 raise NotImplementedError() 12 13 14class NoClamping(ClampingStrategy): 15 def clamp(self, value: CursorValueType) -> CursorValueType: 16 return value 17 18 19class ClampingEndProvider: 20 def __init__( 21 self, 22 clamping_strategy: ClampingStrategy, 23 end_provider: Callable[[], CursorValueType], 24 granularity: timedelta, 25 ) -> None: 26 self._clamping_strategy = clamping_strategy 27 self._end_provider = end_provider 28 self._granularity = granularity 29 30 def __call__(self) -> CursorValueType: 31 return self._clamping_strategy.clamp(self._end_provider()) - self._granularity 32 33 34class DayClampingStrategy(ClampingStrategy): 35 def __init__(self, is_ceiling: bool = True) -> None: 36 self._is_ceiling = is_ceiling 37 38 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 39 return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) 40 if self._is_ceiling: 41 return return_value + timedelta(days=1) 42 return return_value 43 44 45class MonthClampingStrategy(ClampingStrategy): 46 def __init__(self, is_ceiling: bool = True) -> None: 47 self._is_ceiling = is_ceiling 48 49 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 50 return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) 51 needs_to_round = value.day != 1 52 if not needs_to_round: 53 return return_value 54 55 return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1) 56 57 def _ceil(self, value: datetime) -> datetime: 58 return value.replace( 59 year=value.year + 1 if value.month == 12 else value.year, 60 month=(value.month % 12) + 1, 61 day=1, 62 hour=0, 63 minute=0, 64 second=0, 65 microsecond=0, 66 ) 67 68 69class Weekday(Enum): 70 """ 71 These integer values map to the same ones used by the Datetime.date.weekday() implementation 72 """ 73 74 MONDAY = 0 75 TUESDAY = 1 76 WEDNESDAY = 2 77 THURSDAY = 3 78 FRIDAY = 4 79 SATURDAY = 5 80 SUNDAY = 6 81 82 83class WeekClampingStrategy(ClampingStrategy): 84 def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None: 85 self._day_of_week = day_of_week.value 86 self._is_ceiling = is_ceiling 87 88 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 89 days_diff_to_ceiling = ( 90 7 - (value.weekday() - self._day_of_week) 91 if value.weekday() > self._day_of_week 92 else abs(value.weekday() - self._day_of_week) 93 ) 94 delta = ( 95 timedelta(days_diff_to_ceiling) 96 if self._is_ceiling 97 else timedelta(days_diff_to_ceiling - 7) 98 ) 99 return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta
class
ClampingStrategy(abc.ABC):
10class ClampingStrategy(ABC): 11 def clamp(self, value: CursorValueType) -> CursorValueType: 12 raise NotImplementedError()
Helper class that provides a standard way to create an ABC using inheritance.
15class NoClamping(ClampingStrategy): 16 def clamp(self, value: CursorValueType) -> CursorValueType: 17 return value
Helper class that provides a standard way to create an ABC using inheritance.
class
ClampingEndProvider:
20class ClampingEndProvider: 21 def __init__( 22 self, 23 clamping_strategy: ClampingStrategy, 24 end_provider: Callable[[], CursorValueType], 25 granularity: timedelta, 26 ) -> None: 27 self._clamping_strategy = clamping_strategy 28 self._end_provider = end_provider 29 self._granularity = granularity 30 31 def __call__(self) -> CursorValueType: 32 return self._clamping_strategy.clamp(self._end_provider()) - self._granularity
ClampingEndProvider( clamping_strategy: ClampingStrategy, end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], granularity: datetime.timedelta)
35class DayClampingStrategy(ClampingStrategy): 36 def __init__(self, is_ceiling: bool = True) -> None: 37 self._is_ceiling = is_ceiling 38 39 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 40 return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) 41 if self._is_ceiling: 42 return return_value + timedelta(days=1) 43 return return_value
Helper class that provides a standard way to create an ABC using inheritance.
46class MonthClampingStrategy(ClampingStrategy): 47 def __init__(self, is_ceiling: bool = True) -> None: 48 self._is_ceiling = is_ceiling 49 50 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 51 return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) 52 needs_to_round = value.day != 1 53 if not needs_to_round: 54 return return_value 55 56 return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1) 57 58 def _ceil(self, value: datetime) -> datetime: 59 return value.replace( 60 year=value.year + 1 if value.month == 12 else value.year, 61 month=(value.month % 12) + 1, 62 day=1, 63 hour=0, 64 minute=0, 65 second=0, 66 microsecond=0, 67 )
Helper class that provides a standard way to create an ABC using inheritance.
def
clamp(self, value: datetime.datetime) -> datetime.datetime:
50 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 51 return_value = value.replace(hour=0, minute=0, second=0, microsecond=0) 52 needs_to_round = value.day != 1 53 if not needs_to_round: 54 return return_value 55 56 return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1)
class
Weekday(enum.Enum):
70class Weekday(Enum): 71 """ 72 These integer values map to the same ones used by the Datetime.date.weekday() implementation 73 """ 74 75 MONDAY = 0 76 TUESDAY = 1 77 WEDNESDAY = 2 78 THURSDAY = 3 79 FRIDAY = 4 80 SATURDAY = 5 81 SUNDAY = 6
These integer values map to the same ones used by the Datetime.date.weekday() implementation
MONDAY =
<Weekday.MONDAY: 0>
TUESDAY =
<Weekday.TUESDAY: 1>
WEDNESDAY =
<Weekday.WEDNESDAY: 2>
THURSDAY =
<Weekday.THURSDAY: 3>
FRIDAY =
<Weekday.FRIDAY: 4>
SATURDAY =
<Weekday.SATURDAY: 5>
SUNDAY =
<Weekday.SUNDAY: 6>
84class WeekClampingStrategy(ClampingStrategy): 85 def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None: 86 self._day_of_week = day_of_week.value 87 self._is_ceiling = is_ceiling 88 89 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 90 days_diff_to_ceiling = ( 91 7 - (value.weekday() - self._day_of_week) 92 if value.weekday() > self._day_of_week 93 else abs(value.weekday() - self._day_of_week) 94 ) 95 delta = ( 96 timedelta(days_diff_to_ceiling) 97 if self._is_ceiling 98 else timedelta(days_diff_to_ceiling - 7) 99 ) 100 return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta
Helper class that provides a standard way to create an ABC using inheritance.
WeekClampingStrategy( day_of_week: Weekday, is_ceiling: bool = True)
def
clamp(self, value: datetime.datetime) -> datetime.datetime:
89 def clamp(self, value: datetime) -> datetime: # type: ignore # datetime implements method from CursorValueType 90 days_diff_to_ceiling = ( 91 7 - (value.weekday() - self._day_of_week) 92 if value.weekday() > self._day_of_week 93 else abs(value.weekday() - self._day_of_week) 94 ) 95 delta = ( 96 timedelta(days_diff_to_ceiling) 97 if self._is_ceiling 98 else timedelta(days_diff_to_ceiling - 7) 99 ) 100 return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta