airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from abc import abstractmethod
  6from datetime import datetime, timedelta, timezone
  7from typing import Any, Callable, List, MutableMapping, Optional, Tuple
  8
  9# FIXME We would eventually like the Concurrent package do be agnostic of the declarative package. However, this is a breaking change and
 10#  the goal in the short term is only to fix the issue we are seeing for source-declarative-manifest.
 11from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
 12from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
 13from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 14    AbstractStreamStateConverter,
 15    ConcurrencyCompatibleStateType,
 16)
 17from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse
 18
 19
 20class DateTimeStreamStateConverter(AbstractStreamStateConverter):
 21    def _from_state_message(self, value: Any) -> Any:
 22        return self.parse_timestamp(value)
 23
 24    def _to_state_message(self, value: Any) -> Any:
 25        return self.output_format(value)
 26
 27    @property
 28    @abstractmethod
 29    def _zero_value(self) -> Any: ...
 30
 31    @property
 32    def zero_value(self) -> datetime:
 33        return self.parse_timestamp(self._zero_value)
 34
 35    @classmethod
 36    def get_end_provider(cls) -> Callable[[], datetime]:
 37        return ab_datetime_now
 38
 39    @abstractmethod
 40    def increment(self, timestamp: datetime) -> datetime: ...
 41
 42    @abstractmethod
 43    def parse_timestamp(self, timestamp: Any) -> datetime: ...
 44
 45    @abstractmethod
 46    def output_format(self, timestamp: datetime) -> Any: ...
 47
 48    def parse_value(self, value: Any) -> Any:
 49        """
 50        Parse the value of the cursor field into a comparable value.
 51        """
 52        return self.parse_timestamp(value)
 53
 54    def _compare_intervals(self, end_time: Any, start_time: Any) -> bool:
 55        return bool(self.increment(end_time) >= start_time)
 56
 57    def convert_from_sequential_state(
 58        self,
 59        cursor_field: CursorField,
 60        stream_state: MutableMapping[str, Any],
 61        start: Optional[datetime],
 62    ) -> Tuple[datetime, MutableMapping[str, Any]]:
 63        """
 64        Convert the state message to the format required by the ConcurrentCursor.
 65
 66        e.g.
 67        {
 68            "state_type": ConcurrencyCompatibleStateType.date_range.value,
 69            "metadata": { … },
 70            "slices": [
 71                {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"},
 72            ]
 73        }
 74        """
 75        sync_start = self._get_sync_start(cursor_field, stream_state, start)
 76        if self.is_state_message_compatible(stream_state):
 77            return sync_start, stream_state
 78
 79        # Create a slice to represent the records synced during prior syncs.
 80        # The start and end are the same to avoid confusion as to whether the records for this slice
 81        # were actually synced
 82        slices = [
 83            {
 84                self.START_KEY: start if start is not None else sync_start,
 85                self.END_KEY: sync_start,
 86                self.MOST_RECENT_RECORD_KEY: sync_start,
 87            }
 88        ]
 89
 90        return sync_start, {
 91            "state_type": ConcurrencyCompatibleStateType.date_range.value,
 92            "slices": slices,
 93            "legacy": stream_state,
 94        }
 95
 96    def _get_sync_start(
 97        self,
 98        cursor_field: CursorField,
 99        stream_state: MutableMapping[str, Any],
100        start: Optional[datetime],
101    ) -> datetime:
102        sync_start = start if start is not None else self.zero_value
103        prev_sync_low_water_mark = (
104            self.parse_timestamp(stream_state[cursor_field.cursor_field_key])
105            if cursor_field.cursor_field_key in stream_state
106            else None
107        )
108        if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
109            return prev_sync_low_water_mark
110        else:
111            return sync_start
112
113
114class EpochValueConcurrentStreamStateConverter(DateTimeStreamStateConverter):
115    """
116    e.g.
117    { "created": 1617030403 }
118    =>
119    {
120        "state_type": "date-range",
121        "metadata": { … },
122        "slices": [
123            {starts: 0, end: 1617030403, finished_processing: true}
124        ]
125    }
126    """
127
128    _zero_value = 0
129
130    def increment(self, timestamp: datetime) -> datetime:
131        return timestamp + timedelta(seconds=1)
132
133    def output_format(self, timestamp: datetime) -> int:
134        return int(timestamp.timestamp())
135
136    def parse_timestamp(self, timestamp: int) -> datetime:
137        dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc)
138        if not isinstance(dt_object, AirbyteDateTime):
139            raise ValueError(
140                f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})"
141            )
142        return dt_object
143
144
145class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter):
146    """
147    e.g.
148    { "created": "2021-01-18T21:18:20.000Z" }
149    =>
150    {
151        "state_type": "date-range",
152        "metadata": { … },
153        "slices": [
154            {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true}
155        ]
156    }
157    """
158
159    _zero_value = "0001-01-01T00:00:00.000Z"
160
161    def __init__(
162        self, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None
163    ):
164        super().__init__(is_sequential_state=is_sequential_state)
165        self._cursor_granularity = cursor_granularity or timedelta(milliseconds=1)
166
167    def increment(self, timestamp: datetime) -> datetime:
168        return timestamp + self._cursor_granularity
169
170    def output_format(self, timestamp: datetime) -> str:
171        """Format datetime with milliseconds always included.
172
173        Args:
174            timestamp: The datetime to format.
175
176        Returns:
177            str: ISO8601/RFC3339 formatted string with milliseconds.
178        """
179        dt = AirbyteDateTime.from_datetime(timestamp)
180        # Always include milliseconds, even if zero
181        millis = dt.microsecond // 1000 if dt.microsecond else 0
182        return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z"
183
184    def parse_timestamp(self, timestamp: str) -> datetime:
185        dt_object = ab_datetime_parse(timestamp)
186        if not isinstance(dt_object, AirbyteDateTime):
187            raise ValueError(
188                f"AirbyteDateTime object was expected but got {type(dt_object)} from parse({timestamp})"
189            )
190        return dt_object
191
192
193class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
194    """
195    Datetime State converter that emits state according to the supplied datetime format. The converter supports reading
196    incoming state in any valid datetime format using AirbyteDateTime parsing utilities.
197    """
198
199    def __init__(
200        self,
201        datetime_format: str,
202        input_datetime_formats: Optional[List[str]] = None,
203        is_sequential_state: bool = True,
204        cursor_granularity: Optional[timedelta] = None,
205    ):
206        super().__init__(
207            is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity
208        )
209        self._datetime_format = datetime_format
210        self._input_datetime_formats = input_datetime_formats if input_datetime_formats else []
211        self._input_datetime_formats += [self._datetime_format]
212        self._parser = DatetimeParser()
213
214    def output_format(self, timestamp: datetime) -> str:
215        return self._parser.format(timestamp, self._datetime_format)
216
217    def parse_timestamp(self, timestamp: str) -> datetime:
218        for datetime_format in self._input_datetime_formats:
219            try:
220                return self._parser.parse(timestamp, datetime_format)
221            except ValueError:
222                pass
223        raise ValueError(f"No format in {self._input_datetime_formats} matching {timestamp}")
 21class DateTimeStreamStateConverter(AbstractStreamStateConverter):
 22    def _from_state_message(self, value: Any) -> Any:
 23        return self.parse_timestamp(value)
 24
 25    def _to_state_message(self, value: Any) -> Any:
 26        return self.output_format(value)
 27
 28    @property
 29    @abstractmethod
 30    def _zero_value(self) -> Any: ...
 31
 32    @property
 33    def zero_value(self) -> datetime:
 34        return self.parse_timestamp(self._zero_value)
 35
 36    @classmethod
 37    def get_end_provider(cls) -> Callable[[], datetime]:
 38        return ab_datetime_now
 39
 40    @abstractmethod
 41    def increment(self, timestamp: datetime) -> datetime: ...
 42
 43    @abstractmethod
 44    def parse_timestamp(self, timestamp: Any) -> datetime: ...
 45
 46    @abstractmethod
 47    def output_format(self, timestamp: datetime) -> Any: ...
 48
 49    def parse_value(self, value: Any) -> Any:
 50        """
 51        Parse the value of the cursor field into a comparable value.
 52        """
 53        return self.parse_timestamp(value)
 54
 55    def _compare_intervals(self, end_time: Any, start_time: Any) -> bool:
 56        return bool(self.increment(end_time) >= start_time)
 57
 58    def convert_from_sequential_state(
 59        self,
 60        cursor_field: CursorField,
 61        stream_state: MutableMapping[str, Any],
 62        start: Optional[datetime],
 63    ) -> Tuple[datetime, MutableMapping[str, Any]]:
 64        """
 65        Convert the state message to the format required by the ConcurrentCursor.
 66
 67        e.g.
 68        {
 69            "state_type": ConcurrencyCompatibleStateType.date_range.value,
 70            "metadata": { … },
 71            "slices": [
 72                {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"},
 73            ]
 74        }
 75        """
 76        sync_start = self._get_sync_start(cursor_field, stream_state, start)
 77        if self.is_state_message_compatible(stream_state):
 78            return sync_start, stream_state
 79
 80        # Create a slice to represent the records synced during prior syncs.
 81        # The start and end are the same to avoid confusion as to whether the records for this slice
 82        # were actually synced
 83        slices = [
 84            {
 85                self.START_KEY: start if start is not None else sync_start,
 86                self.END_KEY: sync_start,
 87                self.MOST_RECENT_RECORD_KEY: sync_start,
 88            }
 89        ]
 90
 91        return sync_start, {
 92            "state_type": ConcurrencyCompatibleStateType.date_range.value,
 93            "slices": slices,
 94            "legacy": stream_state,
 95        }
 96
 97    def _get_sync_start(
 98        self,
 99        cursor_field: CursorField,
100        stream_state: MutableMapping[str, Any],
101        start: Optional[datetime],
102    ) -> datetime:
103        sync_start = start if start is not None else self.zero_value
104        prev_sync_low_water_mark = (
105            self.parse_timestamp(stream_state[cursor_field.cursor_field_key])
106            if cursor_field.cursor_field_key in stream_state
107            else None
108        )
109        if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
110            return prev_sync_low_water_mark
111        else:
112            return sync_start

Helper class that provides a standard way to create an ABC using inheritance.

zero_value: datetime.datetime
32    @property
33    def zero_value(self) -> datetime:
34        return self.parse_timestamp(self._zero_value)
@classmethod
def get_end_provider(cls) -> Callable[[], datetime.datetime]:
36    @classmethod
37    def get_end_provider(cls) -> Callable[[], datetime]:
38        return ab_datetime_now
@abstractmethod
def increment(self, timestamp: datetime.datetime) -> datetime.datetime:
40    @abstractmethod
41    def increment(self, timestamp: datetime) -> datetime: ...

Increment a timestamp by a single unit.

@abstractmethod
def parse_timestamp(self, timestamp: Any) -> datetime.datetime:
43    @abstractmethod
44    def parse_timestamp(self, timestamp: Any) -> datetime: ...
@abstractmethod
def output_format(self, timestamp: datetime.datetime) -> Any:
46    @abstractmethod
47    def output_format(self, timestamp: datetime) -> Any: ...

Convert the cursor value type to a JSON valid type.

def parse_value(self, value: Any) -> Any:
49    def parse_value(self, value: Any) -> Any:
50        """
51        Parse the value of the cursor field into a comparable value.
52        """
53        return self.parse_timestamp(value)

Parse the value of the cursor field into a comparable value.

def convert_from_sequential_state( self, cursor_field: airbyte_cdk.CursorField, stream_state: MutableMapping[str, Any], start: Optional[datetime.datetime]) -> Tuple[datetime.datetime, MutableMapping[str, Any]]:
58    def convert_from_sequential_state(
59        self,
60        cursor_field: CursorField,
61        stream_state: MutableMapping[str, Any],
62        start: Optional[datetime],
63    ) -> Tuple[datetime, MutableMapping[str, Any]]:
64        """
65        Convert the state message to the format required by the ConcurrentCursor.
66
67        e.g.
68        {
69            "state_type": ConcurrencyCompatibleStateType.date_range.value,
70            "metadata": { … },
71            "slices": [
72                {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"},
73            ]
74        }
75        """
76        sync_start = self._get_sync_start(cursor_field, stream_state, start)
77        if self.is_state_message_compatible(stream_state):
78            return sync_start, stream_state
79
80        # Create a slice to represent the records synced during prior syncs.
81        # The start and end are the same to avoid confusion as to whether the records for this slice
82        # were actually synced
83        slices = [
84            {
85                self.START_KEY: start if start is not None else sync_start,
86                self.END_KEY: sync_start,
87                self.MOST_RECENT_RECORD_KEY: sync_start,
88            }
89        ]
90
91        return sync_start, {
92            "state_type": ConcurrencyCompatibleStateType.date_range.value,
93            "slices": slices,
94            "legacy": stream_state,
95        }

Convert the state message to the format required by the ConcurrentCursor.

e.g. { "state_type": ConcurrencyCompatibleStateType.date_range.value, "metadata": { … }, "slices": [ {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"}, ] }

class EpochValueConcurrentStreamStateConverter(DateTimeStreamStateConverter):
115class EpochValueConcurrentStreamStateConverter(DateTimeStreamStateConverter):
116    """
117    e.g.
118    { "created": 1617030403 }
119    =>
120    {
121        "state_type": "date-range",
122        "metadata": { … },
123        "slices": [
124            {starts: 0, end: 1617030403, finished_processing: true}
125        ]
126    }
127    """
128
129    _zero_value = 0
130
131    def increment(self, timestamp: datetime) -> datetime:
132        return timestamp + timedelta(seconds=1)
133
134    def output_format(self, timestamp: datetime) -> int:
135        return int(timestamp.timestamp())
136
137    def parse_timestamp(self, timestamp: int) -> datetime:
138        dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc)
139        if not isinstance(dt_object, AirbyteDateTime):
140            raise ValueError(
141                f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})"
142            )
143        return dt_object

e.g. { "created": 1617030403 } => { "state_type": "date-range", "metadata": { … }, "slices": [ {starts: 0, end: 1617030403, finished_processing: true} ] }

def increment(self, timestamp: datetime.datetime) -> datetime.datetime:
131    def increment(self, timestamp: datetime) -> datetime:
132        return timestamp + timedelta(seconds=1)

Increment a timestamp by a single unit.

def output_format(self, timestamp: datetime.datetime) -> int:
134    def output_format(self, timestamp: datetime) -> int:
135        return int(timestamp.timestamp())

Convert the cursor value type to a JSON valid type.

def parse_timestamp(self, timestamp: int) -> datetime.datetime:
137    def parse_timestamp(self, timestamp: int) -> datetime:
138        dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc)
139        if not isinstance(dt_object, AirbyteDateTime):
140            raise ValueError(
141                f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})"
142            )
143        return dt_object
class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter):
146class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter):
147    """
148    e.g.
149    { "created": "2021-01-18T21:18:20.000Z" }
150    =>
151    {
152        "state_type": "date-range",
153        "metadata": { … },
154        "slices": [
155            {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true}
156        ]
157    }
158    """
159
160    _zero_value = "0001-01-01T00:00:00.000Z"
161
162    def __init__(
163        self, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None
164    ):
165        super().__init__(is_sequential_state=is_sequential_state)
166        self._cursor_granularity = cursor_granularity or timedelta(milliseconds=1)
167
168    def increment(self, timestamp: datetime) -> datetime:
169        return timestamp + self._cursor_granularity
170
171    def output_format(self, timestamp: datetime) -> str:
172        """Format datetime with milliseconds always included.
173
174        Args:
175            timestamp: The datetime to format.
176
177        Returns:
178            str: ISO8601/RFC3339 formatted string with milliseconds.
179        """
180        dt = AirbyteDateTime.from_datetime(timestamp)
181        # Always include milliseconds, even if zero
182        millis = dt.microsecond // 1000 if dt.microsecond else 0
183        return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z"
184
185    def parse_timestamp(self, timestamp: str) -> datetime:
186        dt_object = ab_datetime_parse(timestamp)
187        if not isinstance(dt_object, AirbyteDateTime):
188            raise ValueError(
189                f"AirbyteDateTime object was expected but got {type(dt_object)} from parse({timestamp})"
190            )
191        return dt_object

e.g. { "created": "2021-01-18T21:18:20.000Z" } => { "state_type": "date-range", "metadata": { … }, "slices": [ {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} ] }

IsoMillisConcurrentStreamStateConverter( is_sequential_state: bool = True, cursor_granularity: Optional[datetime.timedelta] = None)
162    def __init__(
163        self, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None
164    ):
165        super().__init__(is_sequential_state=is_sequential_state)
166        self._cursor_granularity = cursor_granularity or timedelta(milliseconds=1)
def increment(self, timestamp: datetime.datetime) -> datetime.datetime:
168    def increment(self, timestamp: datetime) -> datetime:
169        return timestamp + self._cursor_granularity

Increment a timestamp by a single unit.

def output_format(self, timestamp: datetime.datetime) -> str:
171    def output_format(self, timestamp: datetime) -> str:
172        """Format datetime with milliseconds always included.
173
174        Args:
175            timestamp: The datetime to format.
176
177        Returns:
178            str: ISO8601/RFC3339 formatted string with milliseconds.
179        """
180        dt = AirbyteDateTime.from_datetime(timestamp)
181        # Always include milliseconds, even if zero
182        millis = dt.microsecond // 1000 if dt.microsecond else 0
183        return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z"

Format datetime with milliseconds always included.

Arguments:
  • timestamp: The datetime to format.
Returns:

str: ISO8601/RFC3339 formatted string with milliseconds.

def parse_timestamp(self, timestamp: str) -> datetime.datetime:
185    def parse_timestamp(self, timestamp: str) -> datetime:
186        dt_object = ab_datetime_parse(timestamp)
187        if not isinstance(dt_object, AirbyteDateTime):
188            raise ValueError(
189                f"AirbyteDateTime object was expected but got {type(dt_object)} from parse({timestamp})"
190            )
191        return dt_object
class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
194class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter):
195    """
196    Datetime State converter that emits state according to the supplied datetime format. The converter supports reading
197    incoming state in any valid datetime format using AirbyteDateTime parsing utilities.
198    """
199
200    def __init__(
201        self,
202        datetime_format: str,
203        input_datetime_formats: Optional[List[str]] = None,
204        is_sequential_state: bool = True,
205        cursor_granularity: Optional[timedelta] = None,
206    ):
207        super().__init__(
208            is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity
209        )
210        self._datetime_format = datetime_format
211        self._input_datetime_formats = input_datetime_formats if input_datetime_formats else []
212        self._input_datetime_formats += [self._datetime_format]
213        self._parser = DatetimeParser()
214
215    def output_format(self, timestamp: datetime) -> str:
216        return self._parser.format(timestamp, self._datetime_format)
217
218    def parse_timestamp(self, timestamp: str) -> datetime:
219        for datetime_format in self._input_datetime_formats:
220            try:
221                return self._parser.parse(timestamp, datetime_format)
222            except ValueError:
223                pass
224        raise ValueError(f"No format in {self._input_datetime_formats} matching {timestamp}")

Datetime State converter that emits state according to the supplied datetime format. The converter supports reading incoming state in any valid datetime format using AirbyteDateTime parsing utilities.

CustomFormatConcurrentStreamStateConverter( datetime_format: str, input_datetime_formats: Optional[List[str]] = None, is_sequential_state: bool = True, cursor_granularity: Optional[datetime.timedelta] = None)
200    def __init__(
201        self,
202        datetime_format: str,
203        input_datetime_formats: Optional[List[str]] = None,
204        is_sequential_state: bool = True,
205        cursor_granularity: Optional[timedelta] = None,
206    ):
207        super().__init__(
208            is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity
209        )
210        self._datetime_format = datetime_format
211        self._input_datetime_formats = input_datetime_formats if input_datetime_formats else []
212        self._input_datetime_formats += [self._datetime_format]
213        self._parser = DatetimeParser()
def output_format(self, timestamp: datetime.datetime) -> str:
215    def output_format(self, timestamp: datetime) -> str:
216        return self._parser.format(timestamp, self._datetime_format)

Format datetime with milliseconds always included.

Arguments:
  • timestamp: The datetime to format.
Returns:

str: ISO8601/RFC3339 formatted string with milliseconds.

def parse_timestamp(self, timestamp: str) -> datetime.datetime:
218    def parse_timestamp(self, timestamp: str) -> datetime:
219        for datetime_format in self._input_datetime_formats:
220            try:
221                return self._parser.parse(timestamp, datetime_format)
222            except ValueError:
223                pass
224        raise ValueError(f"No format in {self._input_datetime_formats} matching {timestamp}")