airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import abstractmethod 6from datetime import datetime, timedelta, timezone 7from typing import Any, Callable, List, MutableMapping, Optional, Tuple 8 9# FIXME We would eventually like the Concurrent package do be agnostic of the declarative package. However, this is a breaking change and 10# the goal in the short term is only to fix the issue we are seeing for source-declarative-manifest. 11from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser 12from airbyte_cdk.sources.streams.concurrent.cursor import CursorField 13from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 14 AbstractStreamStateConverter, 15 ConcurrencyCompatibleStateType, 16) 17from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse 18 19 20class DateTimeStreamStateConverter(AbstractStreamStateConverter): 21 def _from_state_message(self, value: Any) -> Any: 22 return self.parse_timestamp(value) 23 24 def _to_state_message(self, value: Any) -> Any: 25 return self.output_format(value) 26 27 @property 28 @abstractmethod 29 def _zero_value(self) -> Any: ... 30 31 @property 32 def zero_value(self) -> datetime: 33 return self.parse_timestamp(self._zero_value) 34 35 @classmethod 36 def get_end_provider(cls) -> Callable[[], datetime]: 37 return ab_datetime_now 38 39 @abstractmethod 40 def increment(self, timestamp: datetime) -> datetime: ... 41 42 @abstractmethod 43 def parse_timestamp(self, timestamp: Any) -> datetime: ... 44 45 @abstractmethod 46 def output_format(self, timestamp: datetime) -> Any: ... 47 48 def parse_value(self, value: Any) -> Any: 49 """ 50 Parse the value of the cursor field into a comparable value. 51 """ 52 return self.parse_timestamp(value) 53 54 def _compare_intervals(self, end_time: Any, start_time: Any) -> bool: 55 return bool(self.increment(end_time) >= start_time) 56 57 def convert_from_sequential_state( 58 self, 59 cursor_field: CursorField, 60 stream_state: MutableMapping[str, Any], 61 start: Optional[datetime], 62 ) -> Tuple[datetime, MutableMapping[str, Any]]: 63 """ 64 Convert the state message to the format required by the ConcurrentCursor. 65 66 e.g. 67 { 68 "state_type": ConcurrencyCompatibleStateType.date_range.value, 69 "metadata": { … }, 70 "slices": [ 71 {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"}, 72 ] 73 } 74 """ 75 sync_start = self._get_sync_start(cursor_field, stream_state, start) 76 if self.is_state_message_compatible(stream_state): 77 return sync_start, stream_state 78 79 # Create a slice to represent the records synced during prior syncs. 80 # The start and end are the same to avoid confusion as to whether the records for this slice 81 # were actually synced 82 slices = [ 83 { 84 self.START_KEY: start if start is not None else sync_start, 85 self.END_KEY: sync_start, 86 self.MOST_RECENT_RECORD_KEY: sync_start, 87 } 88 ] 89 90 return sync_start, { 91 "state_type": ConcurrencyCompatibleStateType.date_range.value, 92 "slices": slices, 93 "legacy": stream_state, 94 } 95 96 def _get_sync_start( 97 self, 98 cursor_field: CursorField, 99 stream_state: MutableMapping[str, Any], 100 start: Optional[datetime], 101 ) -> datetime: 102 sync_start = start if start is not None else self.zero_value 103 prev_sync_low_water_mark = ( 104 self.parse_timestamp(stream_state[cursor_field.cursor_field_key]) 105 if cursor_field.cursor_field_key in stream_state 106 else None 107 ) 108 if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: 109 return prev_sync_low_water_mark 110 else: 111 return sync_start 112 113 114class EpochValueConcurrentStreamStateConverter(DateTimeStreamStateConverter): 115 """ 116 e.g. 117 { "created": 1617030403 } 118 => 119 { 120 "state_type": "date-range", 121 "metadata": { … }, 122 "slices": [ 123 {starts: 0, end: 1617030403, finished_processing: true} 124 ] 125 } 126 """ 127 128 _zero_value = 0 129 130 def increment(self, timestamp: datetime) -> datetime: 131 return timestamp + timedelta(seconds=1) 132 133 def output_format(self, timestamp: datetime) -> int: 134 return int(timestamp.timestamp()) 135 136 def parse_timestamp(self, timestamp: int) -> datetime: 137 dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc) 138 if not isinstance(dt_object, AirbyteDateTime): 139 raise ValueError( 140 f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})" 141 ) 142 return dt_object 143 144 145class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter): 146 """ 147 e.g. 148 { "created": "2021-01-18T21:18:20.000Z" } 149 => 150 { 151 "state_type": "date-range", 152 "metadata": { … }, 153 "slices": [ 154 {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} 155 ] 156 } 157 """ 158 159 _zero_value = "0001-01-01T00:00:00.000Z" 160 161 def __init__( 162 self, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None 163 ): 164 super().__init__(is_sequential_state=is_sequential_state) 165 self._cursor_granularity = cursor_granularity or timedelta(milliseconds=1) 166 167 def increment(self, timestamp: datetime) -> datetime: 168 return timestamp + self._cursor_granularity 169 170 def output_format(self, timestamp: datetime) -> str: 171 """Format datetime with milliseconds always included. 172 173 Args: 174 timestamp: The datetime to format. 175 176 Returns: 177 str: ISO8601/RFC3339 formatted string with milliseconds. 178 """ 179 dt = AirbyteDateTime.from_datetime(timestamp) 180 # Always include milliseconds, even if zero 181 millis = dt.microsecond // 1000 if dt.microsecond else 0 182 return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z" 183 184 def parse_timestamp(self, timestamp: str) -> datetime: 185 dt_object = ab_datetime_parse(timestamp) 186 if not isinstance(dt_object, AirbyteDateTime): 187 raise ValueError( 188 f"AirbyteDateTime object was expected but got {type(dt_object)} from parse({timestamp})" 189 ) 190 return dt_object 191 192 193class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter): 194 """ 195 Datetime State converter that emits state according to the supplied datetime format. The converter supports reading 196 incoming state in any valid datetime format using AirbyteDateTime parsing utilities. 197 """ 198 199 def __init__( 200 self, 201 datetime_format: str, 202 input_datetime_formats: Optional[List[str]] = None, 203 is_sequential_state: bool = True, 204 cursor_granularity: Optional[timedelta] = None, 205 ): 206 super().__init__( 207 is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity 208 ) 209 self._datetime_format = datetime_format 210 self._input_datetime_formats = input_datetime_formats if input_datetime_formats else [] 211 self._input_datetime_formats += [self._datetime_format] 212 self._parser = DatetimeParser() 213 214 def output_format(self, timestamp: datetime) -> str: 215 return self._parser.format(timestamp, self._datetime_format) 216 217 def parse_timestamp(self, timestamp: str) -> datetime: 218 for datetime_format in self._input_datetime_formats: 219 try: 220 return self._parser.parse(timestamp, datetime_format) 221 except ValueError: 222 pass 223 raise ValueError(f"No format in {self._input_datetime_formats} matching {timestamp}")
21class DateTimeStreamStateConverter(AbstractStreamStateConverter): 22 def _from_state_message(self, value: Any) -> Any: 23 return self.parse_timestamp(value) 24 25 def _to_state_message(self, value: Any) -> Any: 26 return self.output_format(value) 27 28 @property 29 @abstractmethod 30 def _zero_value(self) -> Any: ... 31 32 @property 33 def zero_value(self) -> datetime: 34 return self.parse_timestamp(self._zero_value) 35 36 @classmethod 37 def get_end_provider(cls) -> Callable[[], datetime]: 38 return ab_datetime_now 39 40 @abstractmethod 41 def increment(self, timestamp: datetime) -> datetime: ... 42 43 @abstractmethod 44 def parse_timestamp(self, timestamp: Any) -> datetime: ... 45 46 @abstractmethod 47 def output_format(self, timestamp: datetime) -> Any: ... 48 49 def parse_value(self, value: Any) -> Any: 50 """ 51 Parse the value of the cursor field into a comparable value. 52 """ 53 return self.parse_timestamp(value) 54 55 def _compare_intervals(self, end_time: Any, start_time: Any) -> bool: 56 return bool(self.increment(end_time) >= start_time) 57 58 def convert_from_sequential_state( 59 self, 60 cursor_field: CursorField, 61 stream_state: MutableMapping[str, Any], 62 start: Optional[datetime], 63 ) -> Tuple[datetime, MutableMapping[str, Any]]: 64 """ 65 Convert the state message to the format required by the ConcurrentCursor. 66 67 e.g. 68 { 69 "state_type": ConcurrencyCompatibleStateType.date_range.value, 70 "metadata": { … }, 71 "slices": [ 72 {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"}, 73 ] 74 } 75 """ 76 sync_start = self._get_sync_start(cursor_field, stream_state, start) 77 if self.is_state_message_compatible(stream_state): 78 return sync_start, stream_state 79 80 # Create a slice to represent the records synced during prior syncs. 81 # The start and end are the same to avoid confusion as to whether the records for this slice 82 # were actually synced 83 slices = [ 84 { 85 self.START_KEY: start if start is not None else sync_start, 86 self.END_KEY: sync_start, 87 self.MOST_RECENT_RECORD_KEY: sync_start, 88 } 89 ] 90 91 return sync_start, { 92 "state_type": ConcurrencyCompatibleStateType.date_range.value, 93 "slices": slices, 94 "legacy": stream_state, 95 } 96 97 def _get_sync_start( 98 self, 99 cursor_field: CursorField, 100 stream_state: MutableMapping[str, Any], 101 start: Optional[datetime], 102 ) -> datetime: 103 sync_start = start if start is not None else self.zero_value 104 prev_sync_low_water_mark = ( 105 self.parse_timestamp(stream_state[cursor_field.cursor_field_key]) 106 if cursor_field.cursor_field_key in stream_state 107 else None 108 ) 109 if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: 110 return prev_sync_low_water_mark 111 else: 112 return sync_start
Helper class that provides a standard way to create an ABC using inheritance.
Increment a timestamp by a single unit.
Convert the cursor value type to a JSON valid type.
49 def parse_value(self, value: Any) -> Any: 50 """ 51 Parse the value of the cursor field into a comparable value. 52 """ 53 return self.parse_timestamp(value)
Parse the value of the cursor field into a comparable value.
58 def convert_from_sequential_state( 59 self, 60 cursor_field: CursorField, 61 stream_state: MutableMapping[str, Any], 62 start: Optional[datetime], 63 ) -> Tuple[datetime, MutableMapping[str, Any]]: 64 """ 65 Convert the state message to the format required by the ConcurrentCursor. 66 67 e.g. 68 { 69 "state_type": ConcurrencyCompatibleStateType.date_range.value, 70 "metadata": { … }, 71 "slices": [ 72 {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"}, 73 ] 74 } 75 """ 76 sync_start = self._get_sync_start(cursor_field, stream_state, start) 77 if self.is_state_message_compatible(stream_state): 78 return sync_start, stream_state 79 80 # Create a slice to represent the records synced during prior syncs. 81 # The start and end are the same to avoid confusion as to whether the records for this slice 82 # were actually synced 83 slices = [ 84 { 85 self.START_KEY: start if start is not None else sync_start, 86 self.END_KEY: sync_start, 87 self.MOST_RECENT_RECORD_KEY: sync_start, 88 } 89 ] 90 91 return sync_start, { 92 "state_type": ConcurrencyCompatibleStateType.date_range.value, 93 "slices": slices, 94 "legacy": stream_state, 95 }
Convert the state message to the format required by the ConcurrentCursor.
e.g. { "state_type": ConcurrencyCompatibleStateType.date_range.value, "metadata": { … }, "slices": [ {"start": "2021-01-18T21:18:20.000+00:00", "end": "2021-01-18T21:18:20.000+00:00"}, ] }
115class EpochValueConcurrentStreamStateConverter(DateTimeStreamStateConverter): 116 """ 117 e.g. 118 { "created": 1617030403 } 119 => 120 { 121 "state_type": "date-range", 122 "metadata": { … }, 123 "slices": [ 124 {starts: 0, end: 1617030403, finished_processing: true} 125 ] 126 } 127 """ 128 129 _zero_value = 0 130 131 def increment(self, timestamp: datetime) -> datetime: 132 return timestamp + timedelta(seconds=1) 133 134 def output_format(self, timestamp: datetime) -> int: 135 return int(timestamp.timestamp()) 136 137 def parse_timestamp(self, timestamp: int) -> datetime: 138 dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc) 139 if not isinstance(dt_object, AirbyteDateTime): 140 raise ValueError( 141 f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})" 142 ) 143 return dt_object
e.g. { "created": 1617030403 } => { "state_type": "date-range", "metadata": { … }, "slices": [ {starts: 0, end: 1617030403, finished_processing: true} ] }
131 def increment(self, timestamp: datetime) -> datetime: 132 return timestamp + timedelta(seconds=1)
Increment a timestamp by a single unit.
Convert the cursor value type to a JSON valid type.
137 def parse_timestamp(self, timestamp: int) -> datetime: 138 dt_object = AirbyteDateTime.fromtimestamp(timestamp, timezone.utc) 139 if not isinstance(dt_object, AirbyteDateTime): 140 raise ValueError( 141 f"AirbyteDateTime object was expected but got {type(dt_object)} from AirbyteDateTime.fromtimestamp({timestamp})" 142 ) 143 return dt_object
Inherited Members
146class IsoMillisConcurrentStreamStateConverter(DateTimeStreamStateConverter): 147 """ 148 e.g. 149 { "created": "2021-01-18T21:18:20.000Z" } 150 => 151 { 152 "state_type": "date-range", 153 "metadata": { … }, 154 "slices": [ 155 {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} 156 ] 157 } 158 """ 159 160 _zero_value = "0001-01-01T00:00:00.000Z" 161 162 def __init__( 163 self, is_sequential_state: bool = True, cursor_granularity: Optional[timedelta] = None 164 ): 165 super().__init__(is_sequential_state=is_sequential_state) 166 self._cursor_granularity = cursor_granularity or timedelta(milliseconds=1) 167 168 def increment(self, timestamp: datetime) -> datetime: 169 return timestamp + self._cursor_granularity 170 171 def output_format(self, timestamp: datetime) -> str: 172 """Format datetime with milliseconds always included. 173 174 Args: 175 timestamp: The datetime to format. 176 177 Returns: 178 str: ISO8601/RFC3339 formatted string with milliseconds. 179 """ 180 dt = AirbyteDateTime.from_datetime(timestamp) 181 # Always include milliseconds, even if zero 182 millis = dt.microsecond // 1000 if dt.microsecond else 0 183 return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z" 184 185 def parse_timestamp(self, timestamp: str) -> datetime: 186 dt_object = ab_datetime_parse(timestamp) 187 if not isinstance(dt_object, AirbyteDateTime): 188 raise ValueError( 189 f"AirbyteDateTime object was expected but got {type(dt_object)} from parse({timestamp})" 190 ) 191 return dt_object
e.g. { "created": "2021-01-18T21:18:20.000Z" } => { "state_type": "date-range", "metadata": { … }, "slices": [ {starts: "2020-01-18T21:18:20.000Z", end: "2021-01-18T21:18:20.000Z", finished_processing: true} ] }
168 def increment(self, timestamp: datetime) -> datetime: 169 return timestamp + self._cursor_granularity
Increment a timestamp by a single unit.
171 def output_format(self, timestamp: datetime) -> str: 172 """Format datetime with milliseconds always included. 173 174 Args: 175 timestamp: The datetime to format. 176 177 Returns: 178 str: ISO8601/RFC3339 formatted string with milliseconds. 179 """ 180 dt = AirbyteDateTime.from_datetime(timestamp) 181 # Always include milliseconds, even if zero 182 millis = dt.microsecond // 1000 if dt.microsecond else 0 183 return f"{dt.year:04d}-{dt.month:02d}-{dt.day:02d}T{dt.hour:02d}:{dt.minute:02d}:{dt.second:02d}.{millis:03d}Z"
Format datetime with milliseconds always included.
Arguments:
- timestamp: The datetime to format.
Returns:
str: ISO8601/RFC3339 formatted string with milliseconds.
Inherited Members
194class CustomFormatConcurrentStreamStateConverter(IsoMillisConcurrentStreamStateConverter): 195 """ 196 Datetime State converter that emits state according to the supplied datetime format. The converter supports reading 197 incoming state in any valid datetime format using AirbyteDateTime parsing utilities. 198 """ 199 200 def __init__( 201 self, 202 datetime_format: str, 203 input_datetime_formats: Optional[List[str]] = None, 204 is_sequential_state: bool = True, 205 cursor_granularity: Optional[timedelta] = None, 206 ): 207 super().__init__( 208 is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity 209 ) 210 self._datetime_format = datetime_format 211 self._input_datetime_formats = input_datetime_formats if input_datetime_formats else [] 212 self._input_datetime_formats += [self._datetime_format] 213 self._parser = DatetimeParser() 214 215 def output_format(self, timestamp: datetime) -> str: 216 return self._parser.format(timestamp, self._datetime_format) 217 218 def parse_timestamp(self, timestamp: str) -> datetime: 219 for datetime_format in self._input_datetime_formats: 220 try: 221 return self._parser.parse(timestamp, datetime_format) 222 except ValueError: 223 pass 224 raise ValueError(f"No format in {self._input_datetime_formats} matching {timestamp}")
Datetime State converter that emits state according to the supplied datetime format. The converter supports reading incoming state in any valid datetime format using AirbyteDateTime parsing utilities.
200 def __init__( 201 self, 202 datetime_format: str, 203 input_datetime_formats: Optional[List[str]] = None, 204 is_sequential_state: bool = True, 205 cursor_granularity: Optional[timedelta] = None, 206 ): 207 super().__init__( 208 is_sequential_state=is_sequential_state, cursor_granularity=cursor_granularity 209 ) 210 self._datetime_format = datetime_format 211 self._input_datetime_formats = input_datetime_formats if input_datetime_formats else [] 212 self._input_datetime_formats += [self._datetime_format] 213 self._parser = DatetimeParser()
215 def output_format(self, timestamp: datetime) -> str: 216 return self._parser.format(timestamp, self._datetime_format)
Format datetime with milliseconds always included.
Arguments:
- timestamp: The datetime to format.
Returns:
str: ISO8601/RFC3339 formatted string with milliseconds.
218 def parse_timestamp(self, timestamp: str) -> datetime: 219 for datetime_format in self._input_datetime_formats: 220 try: 221 return self._parser.parse(timestamp, datetime_format) 222 except ValueError: 223 pass 224 raise ValueError(f"No format in {self._input_datetime_formats} matching {timestamp}")