airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from abc import ABC, abstractmethod 6from enum import Enum 7from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple 8 9if TYPE_CHECKING: 10 from airbyte_cdk.sources.streams.concurrent.cursor import CursorField 11 12 13class ConcurrencyCompatibleStateType(Enum): 14 date_range = "date-range" 15 integer = "integer" 16 17 18class AbstractStreamStateConverter(ABC): 19 START_KEY = "start" 20 END_KEY = "end" 21 MOST_RECENT_RECORD_KEY = "most_recent_cursor_value" 22 23 @abstractmethod 24 def _from_state_message(self, value: Any) -> Any: 25 pass 26 27 @abstractmethod 28 def _to_state_message(self, value: Any) -> Any: 29 pass 30 31 def __init__(self, is_sequential_state: bool = True): 32 self._is_sequential_state = is_sequential_state 33 34 def convert_to_state_message( 35 self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any] 36 ) -> MutableMapping[str, Any]: 37 """ 38 Convert the state message from the concurrency-compatible format to the stream's original format. 39 40 e.g. 41 { "created": "2021-01-18T21:18:20.000Z" } 42 """ 43 if self.is_state_message_compatible(stream_state) and self._is_sequential_state: 44 legacy_state = stream_state.get("legacy", {}) 45 latest_complete_time = self._get_latest_complete_time(stream_state.get("slices", [])) 46 if latest_complete_time is not None: 47 legacy_state.update( 48 {cursor_field.cursor_field_key: self._to_state_message(latest_complete_time)} 49 ) 50 return legacy_state or {} 51 else: 52 return self.serialize(stream_state, ConcurrencyCompatibleStateType.date_range) 53 54 def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Any: 55 """ 56 Get the latest time before which all records have been processed. 57 """ 58 if not slices: 59 raise RuntimeError( 60 "Expected at least one slice but there were none. This is unexpected; please contact Support." 61 ) 62 merged_intervals = self.merge_intervals(slices) 63 first_interval = merged_intervals[0] 64 65 return first_interval.get("most_recent_cursor_value") or first_interval[self.START_KEY] 66 67 def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: 68 """ 69 Perform any transformations needed for compatibility with the converter. 70 """ 71 for stream_slice in state.get("slices", []): 72 stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY]) 73 stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY]) 74 return state 75 76 def serialize( 77 self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType 78 ) -> MutableMapping[str, Any]: 79 """ 80 Perform any transformations needed for compatibility with the converter. 81 """ 82 serialized_slices = [] 83 for stream_slice in state.get("slices", []): 84 serialized_slice = { 85 self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]), 86 self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]), 87 } 88 if stream_slice.get(self.MOST_RECENT_RECORD_KEY): 89 serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message( 90 stream_slice[self.MOST_RECENT_RECORD_KEY] 91 ) 92 serialized_slices.append(serialized_slice) 93 return {"slices": serialized_slices, "state_type": state_type.value} 94 95 @staticmethod 96 def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool: 97 return bool(state) and state.get("state_type") in [ 98 t.value for t in ConcurrencyCompatibleStateType 99 ] 100 101 @abstractmethod 102 def convert_from_sequential_state( 103 self, 104 cursor_field: "CursorField", # to deprecate as it is only needed for sequential state 105 stream_state: MutableMapping[str, Any], 106 start: Optional[Any], 107 ) -> Tuple[Any, MutableMapping[str, Any]]: 108 """ 109 Convert the state message to the format required by the ConcurrentCursor. 110 111 e.g. 112 { 113 "state_type": ConcurrencyCompatibleStateType.date_range.value, 114 "metadata": { … }, 115 "slices": [ 116 {starts: 0, end: 1617030403, finished_processing: true}] 117 } 118 """ 119 ... 120 121 @abstractmethod 122 def increment(self, value: Any) -> Any: 123 """ 124 Increment a timestamp by a single unit. 125 """ 126 ... 127 128 @abstractmethod 129 def output_format(self, value: Any) -> Any: 130 """ 131 Convert the cursor value type to a JSON valid type. 132 """ 133 ... 134 135 def merge_intervals( 136 self, intervals: List[MutableMapping[str, Any]] 137 ) -> List[MutableMapping[str, Any]]: 138 """ 139 Compute and return a list of merged intervals. 140 141 Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the 142 `increment` method) than the end time of the first interval. 143 """ 144 if not intervals: 145 return [] 146 147 sorted_intervals = sorted( 148 intervals, key=lambda interval: (interval[self.START_KEY], interval[self.END_KEY]) 149 ) 150 merged_intervals = [sorted_intervals[0]] 151 152 for current_interval in sorted_intervals[1:]: 153 last_interval = merged_intervals[-1] 154 last_interval_end = last_interval[self.END_KEY] 155 current_interval_start = current_interval[self.START_KEY] 156 157 if self.increment(last_interval_end) >= current_interval_start: 158 last_interval[self.END_KEY] = max(last_interval_end, current_interval[self.END_KEY]) 159 last_interval_cursor_value = last_interval.get("most_recent_cursor_value") 160 current_interval_cursor_value = current_interval.get("most_recent_cursor_value") 161 162 last_interval["most_recent_cursor_value"] = ( 163 max(current_interval_cursor_value, last_interval_cursor_value) 164 if current_interval_cursor_value and last_interval_cursor_value 165 else current_interval_cursor_value or last_interval_cursor_value 166 ) 167 else: 168 # Add a new interval if no overlap 169 merged_intervals.append(current_interval) 170 171 return merged_intervals 172 173 @abstractmethod 174 def parse_value(self, value: Any) -> Any: 175 """ 176 Parse the value of the cursor field into a comparable value. 177 """ 178 ... 179 180 @property 181 @abstractmethod 182 def zero_value(self) -> Any: ...
An enumeration.
19class AbstractStreamStateConverter(ABC): 20 START_KEY = "start" 21 END_KEY = "end" 22 MOST_RECENT_RECORD_KEY = "most_recent_cursor_value" 23 24 @abstractmethod 25 def _from_state_message(self, value: Any) -> Any: 26 pass 27 28 @abstractmethod 29 def _to_state_message(self, value: Any) -> Any: 30 pass 31 32 def __init__(self, is_sequential_state: bool = True): 33 self._is_sequential_state = is_sequential_state 34 35 def convert_to_state_message( 36 self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any] 37 ) -> MutableMapping[str, Any]: 38 """ 39 Convert the state message from the concurrency-compatible format to the stream's original format. 40 41 e.g. 42 { "created": "2021-01-18T21:18:20.000Z" } 43 """ 44 if self.is_state_message_compatible(stream_state) and self._is_sequential_state: 45 legacy_state = stream_state.get("legacy", {}) 46 latest_complete_time = self._get_latest_complete_time(stream_state.get("slices", [])) 47 if latest_complete_time is not None: 48 legacy_state.update( 49 {cursor_field.cursor_field_key: self._to_state_message(latest_complete_time)} 50 ) 51 return legacy_state or {} 52 else: 53 return self.serialize(stream_state, ConcurrencyCompatibleStateType.date_range) 54 55 def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Any: 56 """ 57 Get the latest time before which all records have been processed. 58 """ 59 if not slices: 60 raise RuntimeError( 61 "Expected at least one slice but there were none. This is unexpected; please contact Support." 62 ) 63 merged_intervals = self.merge_intervals(slices) 64 first_interval = merged_intervals[0] 65 66 return first_interval.get("most_recent_cursor_value") or first_interval[self.START_KEY] 67 68 def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: 69 """ 70 Perform any transformations needed for compatibility with the converter. 71 """ 72 for stream_slice in state.get("slices", []): 73 stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY]) 74 stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY]) 75 return state 76 77 def serialize( 78 self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType 79 ) -> MutableMapping[str, Any]: 80 """ 81 Perform any transformations needed for compatibility with the converter. 82 """ 83 serialized_slices = [] 84 for stream_slice in state.get("slices", []): 85 serialized_slice = { 86 self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]), 87 self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]), 88 } 89 if stream_slice.get(self.MOST_RECENT_RECORD_KEY): 90 serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message( 91 stream_slice[self.MOST_RECENT_RECORD_KEY] 92 ) 93 serialized_slices.append(serialized_slice) 94 return {"slices": serialized_slices, "state_type": state_type.value} 95 96 @staticmethod 97 def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool: 98 return bool(state) and state.get("state_type") in [ 99 t.value for t in ConcurrencyCompatibleStateType 100 ] 101 102 @abstractmethod 103 def convert_from_sequential_state( 104 self, 105 cursor_field: "CursorField", # to deprecate as it is only needed for sequential state 106 stream_state: MutableMapping[str, Any], 107 start: Optional[Any], 108 ) -> Tuple[Any, MutableMapping[str, Any]]: 109 """ 110 Convert the state message to the format required by the ConcurrentCursor. 111 112 e.g. 113 { 114 "state_type": ConcurrencyCompatibleStateType.date_range.value, 115 "metadata": { … }, 116 "slices": [ 117 {starts: 0, end: 1617030403, finished_processing: true}] 118 } 119 """ 120 ... 121 122 @abstractmethod 123 def increment(self, value: Any) -> Any: 124 """ 125 Increment a timestamp by a single unit. 126 """ 127 ... 128 129 @abstractmethod 130 def output_format(self, value: Any) -> Any: 131 """ 132 Convert the cursor value type to a JSON valid type. 133 """ 134 ... 135 136 def merge_intervals( 137 self, intervals: List[MutableMapping[str, Any]] 138 ) -> List[MutableMapping[str, Any]]: 139 """ 140 Compute and return a list of merged intervals. 141 142 Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the 143 `increment` method) than the end time of the first interval. 144 """ 145 if not intervals: 146 return [] 147 148 sorted_intervals = sorted( 149 intervals, key=lambda interval: (interval[self.START_KEY], interval[self.END_KEY]) 150 ) 151 merged_intervals = [sorted_intervals[0]] 152 153 for current_interval in sorted_intervals[1:]: 154 last_interval = merged_intervals[-1] 155 last_interval_end = last_interval[self.END_KEY] 156 current_interval_start = current_interval[self.START_KEY] 157 158 if self.increment(last_interval_end) >= current_interval_start: 159 last_interval[self.END_KEY] = max(last_interval_end, current_interval[self.END_KEY]) 160 last_interval_cursor_value = last_interval.get("most_recent_cursor_value") 161 current_interval_cursor_value = current_interval.get("most_recent_cursor_value") 162 163 last_interval["most_recent_cursor_value"] = ( 164 max(current_interval_cursor_value, last_interval_cursor_value) 165 if current_interval_cursor_value and last_interval_cursor_value 166 else current_interval_cursor_value or last_interval_cursor_value 167 ) 168 else: 169 # Add a new interval if no overlap 170 merged_intervals.append(current_interval) 171 172 return merged_intervals 173 174 @abstractmethod 175 def parse_value(self, value: Any) -> Any: 176 """ 177 Parse the value of the cursor field into a comparable value. 178 """ 179 ... 180 181 @property 182 @abstractmethod 183 def zero_value(self) -> Any: ...
Helper class that provides a standard way to create an ABC using inheritance.
35 def convert_to_state_message( 36 self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any] 37 ) -> MutableMapping[str, Any]: 38 """ 39 Convert the state message from the concurrency-compatible format to the stream's original format. 40 41 e.g. 42 { "created": "2021-01-18T21:18:20.000Z" } 43 """ 44 if self.is_state_message_compatible(stream_state) and self._is_sequential_state: 45 legacy_state = stream_state.get("legacy", {}) 46 latest_complete_time = self._get_latest_complete_time(stream_state.get("slices", [])) 47 if latest_complete_time is not None: 48 legacy_state.update( 49 {cursor_field.cursor_field_key: self._to_state_message(latest_complete_time)} 50 ) 51 return legacy_state or {} 52 else: 53 return self.serialize(stream_state, ConcurrencyCompatibleStateType.date_range)
Convert the state message from the concurrency-compatible format to the stream's original format.
e.g. { "created": "2021-01-18T21:18:20.000Z" }
68 def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]: 69 """ 70 Perform any transformations needed for compatibility with the converter. 71 """ 72 for stream_slice in state.get("slices", []): 73 stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY]) 74 stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY]) 75 return state
Perform any transformations needed for compatibility with the converter.
77 def serialize( 78 self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType 79 ) -> MutableMapping[str, Any]: 80 """ 81 Perform any transformations needed for compatibility with the converter. 82 """ 83 serialized_slices = [] 84 for stream_slice in state.get("slices", []): 85 serialized_slice = { 86 self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]), 87 self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]), 88 } 89 if stream_slice.get(self.MOST_RECENT_RECORD_KEY): 90 serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message( 91 stream_slice[self.MOST_RECENT_RECORD_KEY] 92 ) 93 serialized_slices.append(serialized_slice) 94 return {"slices": serialized_slices, "state_type": state_type.value}
Perform any transformations needed for compatibility with the converter.
102 @abstractmethod 103 def convert_from_sequential_state( 104 self, 105 cursor_field: "CursorField", # to deprecate as it is only needed for sequential state 106 stream_state: MutableMapping[str, Any], 107 start: Optional[Any], 108 ) -> Tuple[Any, MutableMapping[str, Any]]: 109 """ 110 Convert the state message to the format required by the ConcurrentCursor. 111 112 e.g. 113 { 114 "state_type": ConcurrencyCompatibleStateType.date_range.value, 115 "metadata": { … }, 116 "slices": [ 117 {starts: 0, end: 1617030403, finished_processing: true}] 118 } 119 """ 120 ...
Convert the state message to the format required by the ConcurrentCursor.
e.g. { "state_type": ConcurrencyCompatibleStateType.date_range.value, "metadata": { … }, "slices": [ {starts: 0, end: 1617030403, finished_processing: true}] }
122 @abstractmethod 123 def increment(self, value: Any) -> Any: 124 """ 125 Increment a timestamp by a single unit. 126 """ 127 ...
Increment a timestamp by a single unit.
129 @abstractmethod 130 def output_format(self, value: Any) -> Any: 131 """ 132 Convert the cursor value type to a JSON valid type. 133 """ 134 ...
Convert the cursor value type to a JSON valid type.
136 def merge_intervals( 137 self, intervals: List[MutableMapping[str, Any]] 138 ) -> List[MutableMapping[str, Any]]: 139 """ 140 Compute and return a list of merged intervals. 141 142 Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the 143 `increment` method) than the end time of the first interval. 144 """ 145 if not intervals: 146 return [] 147 148 sorted_intervals = sorted( 149 intervals, key=lambda interval: (interval[self.START_KEY], interval[self.END_KEY]) 150 ) 151 merged_intervals = [sorted_intervals[0]] 152 153 for current_interval in sorted_intervals[1:]: 154 last_interval = merged_intervals[-1] 155 last_interval_end = last_interval[self.END_KEY] 156 current_interval_start = current_interval[self.START_KEY] 157 158 if self.increment(last_interval_end) >= current_interval_start: 159 last_interval[self.END_KEY] = max(last_interval_end, current_interval[self.END_KEY]) 160 last_interval_cursor_value = last_interval.get("most_recent_cursor_value") 161 current_interval_cursor_value = current_interval.get("most_recent_cursor_value") 162 163 last_interval["most_recent_cursor_value"] = ( 164 max(current_interval_cursor_value, last_interval_cursor_value) 165 if current_interval_cursor_value and last_interval_cursor_value 166 else current_interval_cursor_value or last_interval_cursor_value 167 ) 168 else: 169 # Add a new interval if no overlap 170 merged_intervals.append(current_interval) 171 172 return merged_intervals
Compute and return a list of merged intervals.
Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the
increment
method) than the end time of the first interval.