airbyte_cdk.sources.streams.concurrent.clamping

 1from abc import ABC
 2from datetime import datetime, timedelta
 3from enum import Enum
 4from typing import Callable
 5
 6from airbyte_cdk.sources.streams.concurrent.cursor_types import CursorValueType
 7
 8
 9class ClampingStrategy(ABC):
10    def clamp(self, value: CursorValueType) -> CursorValueType:
11        raise NotImplementedError()
12
13
14class NoClamping(ClampingStrategy):
15    def clamp(self, value: CursorValueType) -> CursorValueType:
16        return value
17
18
19class ClampingEndProvider:
20    def __init__(
21        self,
22        clamping_strategy: ClampingStrategy,
23        end_provider: Callable[[], CursorValueType],
24        granularity: timedelta,
25    ) -> None:
26        self._clamping_strategy = clamping_strategy
27        self._end_provider = end_provider
28        self._granularity = granularity
29
30    def __call__(self) -> CursorValueType:
31        return self._clamping_strategy.clamp(self._end_provider()) - self._granularity
32
33
34class DayClampingStrategy(ClampingStrategy):
35    def __init__(self, is_ceiling: bool = True) -> None:
36        self._is_ceiling = is_ceiling
37
38    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
39        return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
40        if self._is_ceiling:
41            return return_value + timedelta(days=1)
42        return return_value
43
44
45class MonthClampingStrategy(ClampingStrategy):
46    def __init__(self, is_ceiling: bool = True) -> None:
47        self._is_ceiling = is_ceiling
48
49    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
50        return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
51        needs_to_round = value.day != 1
52        if not needs_to_round:
53            return return_value
54
55        return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1)
56
57    def _ceil(self, value: datetime) -> datetime:
58        return value.replace(
59            year=value.year + 1 if value.month == 12 else value.year,
60            month=(value.month % 12) + 1,
61            day=1,
62            hour=0,
63            minute=0,
64            second=0,
65            microsecond=0,
66        )
67
68
69class Weekday(Enum):
70    """
71    These integer values map to the same ones used by the Datetime.date.weekday() implementation
72    """
73
74    MONDAY = 0
75    TUESDAY = 1
76    WEDNESDAY = 2
77    THURSDAY = 3
78    FRIDAY = 4
79    SATURDAY = 5
80    SUNDAY = 6
81
82
83class WeekClampingStrategy(ClampingStrategy):
84    def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None:
85        self._day_of_week = day_of_week.value
86        self._is_ceiling = is_ceiling
87
88    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
89        days_diff_to_ceiling = (
90            7 - (value.weekday() - self._day_of_week)
91            if value.weekday() > self._day_of_week
92            else abs(value.weekday() - self._day_of_week)
93        )
94        delta = (
95            timedelta(days_diff_to_ceiling)
96            if self._is_ceiling
97            else timedelta(days_diff_to_ceiling - 7)
98        )
99        return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta
class ClampingStrategy(abc.ABC):
10class ClampingStrategy(ABC):
11    def clamp(self, value: CursorValueType) -> CursorValueType:
12        raise NotImplementedError()

Helper class that provides a standard way to create an ABC using inheritance.

11    def clamp(self, value: CursorValueType) -> CursorValueType:
12        raise NotImplementedError()
class NoClamping(ClampingStrategy):
15class NoClamping(ClampingStrategy):
16    def clamp(self, value: CursorValueType) -> CursorValueType:
17        return value

Helper class that provides a standard way to create an ABC using inheritance.

16    def clamp(self, value: CursorValueType) -> CursorValueType:
17        return value
class ClampingEndProvider:
20class ClampingEndProvider:
21    def __init__(
22        self,
23        clamping_strategy: ClampingStrategy,
24        end_provider: Callable[[], CursorValueType],
25        granularity: timedelta,
26    ) -> None:
27        self._clamping_strategy = clamping_strategy
28        self._end_provider = end_provider
29        self._granularity = granularity
30
31    def __call__(self) -> CursorValueType:
32        return self._clamping_strategy.clamp(self._end_provider()) - self._granularity
ClampingEndProvider( clamping_strategy: ClampingStrategy, end_provider: Callable[[], airbyte_cdk.sources.streams.concurrent.cursor_types.CursorValueType], granularity: datetime.timedelta)
21    def __init__(
22        self,
23        clamping_strategy: ClampingStrategy,
24        end_provider: Callable[[], CursorValueType],
25        granularity: timedelta,
26    ) -> None:
27        self._clamping_strategy = clamping_strategy
28        self._end_provider = end_provider
29        self._granularity = granularity
class DayClampingStrategy(ClampingStrategy):
35class DayClampingStrategy(ClampingStrategy):
36    def __init__(self, is_ceiling: bool = True) -> None:
37        self._is_ceiling = is_ceiling
38
39    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
40        return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
41        if self._is_ceiling:
42            return return_value + timedelta(days=1)
43        return return_value

Helper class that provides a standard way to create an ABC using inheritance.

DayClampingStrategy(is_ceiling: bool = True)
36    def __init__(self, is_ceiling: bool = True) -> None:
37        self._is_ceiling = is_ceiling
def clamp(self, value: datetime.datetime) -> datetime.datetime:
39    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
40        return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
41        if self._is_ceiling:
42            return return_value + timedelta(days=1)
43        return return_value
class MonthClampingStrategy(ClampingStrategy):
46class MonthClampingStrategy(ClampingStrategy):
47    def __init__(self, is_ceiling: bool = True) -> None:
48        self._is_ceiling = is_ceiling
49
50    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
51        return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
52        needs_to_round = value.day != 1
53        if not needs_to_round:
54            return return_value
55
56        return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1)
57
58    def _ceil(self, value: datetime) -> datetime:
59        return value.replace(
60            year=value.year + 1 if value.month == 12 else value.year,
61            month=(value.month % 12) + 1,
62            day=1,
63            hour=0,
64            minute=0,
65            second=0,
66            microsecond=0,
67        )

Helper class that provides a standard way to create an ABC using inheritance.

MonthClampingStrategy(is_ceiling: bool = True)
47    def __init__(self, is_ceiling: bool = True) -> None:
48        self._is_ceiling = is_ceiling
def clamp(self, value: datetime.datetime) -> datetime.datetime:
50    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
51        return_value = value.replace(hour=0, minute=0, second=0, microsecond=0)
52        needs_to_round = value.day != 1
53        if not needs_to_round:
54            return return_value
55
56        return self._ceil(return_value) if self._is_ceiling else return_value.replace(day=1)
class Weekday(enum.Enum):
70class Weekday(Enum):
71    """
72    These integer values map to the same ones used by the Datetime.date.weekday() implementation
73    """
74
75    MONDAY = 0
76    TUESDAY = 1
77    WEDNESDAY = 2
78    THURSDAY = 3
79    FRIDAY = 4
80    SATURDAY = 5
81    SUNDAY = 6

These integer values map to the same ones used by the Datetime.date.weekday() implementation

MONDAY = <Weekday.MONDAY: 0>
TUESDAY = <Weekday.TUESDAY: 1>
WEDNESDAY = <Weekday.WEDNESDAY: 2>
THURSDAY = <Weekday.THURSDAY: 3>
FRIDAY = <Weekday.FRIDAY: 4>
SATURDAY = <Weekday.SATURDAY: 5>
SUNDAY = <Weekday.SUNDAY: 6>
class WeekClampingStrategy(ClampingStrategy):
 84class WeekClampingStrategy(ClampingStrategy):
 85    def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None:
 86        self._day_of_week = day_of_week.value
 87        self._is_ceiling = is_ceiling
 88
 89    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
 90        days_diff_to_ceiling = (
 91            7 - (value.weekday() - self._day_of_week)
 92            if value.weekday() > self._day_of_week
 93            else abs(value.weekday() - self._day_of_week)
 94        )
 95        delta = (
 96            timedelta(days_diff_to_ceiling)
 97            if self._is_ceiling
 98            else timedelta(days_diff_to_ceiling - 7)
 99        )
100        return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta

Helper class that provides a standard way to create an ABC using inheritance.

WeekClampingStrategy( day_of_week: Weekday, is_ceiling: bool = True)
85    def __init__(self, day_of_week: Weekday, is_ceiling: bool = True) -> None:
86        self._day_of_week = day_of_week.value
87        self._is_ceiling = is_ceiling
def clamp(self, value: datetime.datetime) -> datetime.datetime:
 89    def clamp(self, value: datetime) -> datetime:  # type: ignore  # datetime implements method from CursorValueType
 90        days_diff_to_ceiling = (
 91            7 - (value.weekday() - self._day_of_week)
 92            if value.weekday() > self._day_of_week
 93            else abs(value.weekday() - self._day_of_week)
 94        )
 95        delta = (
 96            timedelta(days_diff_to_ceiling)
 97            if self._is_ceiling
 98            else timedelta(days_diff_to_ceiling - 7)
 99        )
100        return value.replace(hour=0, minute=0, second=0, microsecond=0) + delta