airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from abc import ABC, abstractmethod
  6from enum import Enum
  7from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple
  8
  9if TYPE_CHECKING:
 10    from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
 11
 12
 13class ConcurrencyCompatibleStateType(Enum):
 14    date_range = "date-range"
 15    integer = "integer"
 16
 17
 18class AbstractStreamStateConverter(ABC):
 19    START_KEY = "start"
 20    END_KEY = "end"
 21    MOST_RECENT_RECORD_KEY = "most_recent_cursor_value"
 22
 23    @abstractmethod
 24    def _from_state_message(self, value: Any) -> Any:
 25        pass
 26
 27    @abstractmethod
 28    def _to_state_message(self, value: Any) -> Any:
 29        pass
 30
 31    def __init__(self, is_sequential_state: bool = True):
 32        self._is_sequential_state = is_sequential_state
 33
 34    def convert_to_state_message(
 35        self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any]
 36    ) -> MutableMapping[str, Any]:
 37        """
 38        Convert the state message from the concurrency-compatible format to the stream's original format.
 39
 40        e.g.
 41        { "created": "2021-01-18T21:18:20.000Z" }
 42        """
 43        if self.is_state_message_compatible(stream_state) and self._is_sequential_state:
 44            legacy_state = stream_state.get("legacy", {})
 45            latest_complete_time = self._get_latest_complete_time(stream_state.get("slices", []))
 46            if latest_complete_time is not None:
 47                legacy_state.update(
 48                    {cursor_field.cursor_field_key: self._to_state_message(latest_complete_time)}
 49                )
 50            return legacy_state or {}
 51        else:
 52            return self.serialize(stream_state, ConcurrencyCompatibleStateType.date_range)
 53
 54    def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Any:
 55        """
 56        Get the latest time before which all records have been processed.
 57        """
 58        if not slices:
 59            raise RuntimeError(
 60                "Expected at least one slice but there were none. This is unexpected; please contact Support."
 61            )
 62        merged_intervals = self.merge_intervals(slices)
 63        first_interval = merged_intervals[0]
 64
 65        return first_interval.get("most_recent_cursor_value") or first_interval[self.START_KEY]
 66
 67    def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
 68        """
 69        Perform any transformations needed for compatibility with the converter.
 70        """
 71        for stream_slice in state.get("slices", []):
 72            stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY])
 73            stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY])
 74        return state
 75
 76    def serialize(
 77        self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType
 78    ) -> MutableMapping[str, Any]:
 79        """
 80        Perform any transformations needed for compatibility with the converter.
 81        """
 82        serialized_slices = []
 83        for stream_slice in state.get("slices", []):
 84            serialized_slice = {
 85                self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]),
 86                self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]),
 87            }
 88            if stream_slice.get(self.MOST_RECENT_RECORD_KEY):
 89                serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message(
 90                    stream_slice[self.MOST_RECENT_RECORD_KEY]
 91                )
 92            serialized_slices.append(serialized_slice)
 93        return {"slices": serialized_slices, "state_type": state_type.value}
 94
 95    @staticmethod
 96    def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool:
 97        return bool(state) and state.get("state_type") in [
 98            t.value for t in ConcurrencyCompatibleStateType
 99        ]
100
101    @abstractmethod
102    def convert_from_sequential_state(
103        self,
104        cursor_field: "CursorField",  # to deprecate as it is only needed for sequential state
105        stream_state: MutableMapping[str, Any],
106        start: Optional[Any],
107    ) -> Tuple[Any, MutableMapping[str, Any]]:
108        """
109        Convert the state message to the format required by the ConcurrentCursor.
110
111        e.g.
112        {
113            "state_type": ConcurrencyCompatibleStateType.date_range.value,
114            "metadata": { … },
115            "slices": [
116                {starts: 0, end: 1617030403, finished_processing: true}]
117        }
118        """
119        ...
120
121    @abstractmethod
122    def increment(self, value: Any) -> Any:
123        """
124        Increment a timestamp by a single unit.
125        """
126        ...
127
128    @abstractmethod
129    def output_format(self, value: Any) -> Any:
130        """
131        Convert the cursor value type to a JSON valid type.
132        """
133        ...
134
135    def merge_intervals(
136        self, intervals: List[MutableMapping[str, Any]]
137    ) -> List[MutableMapping[str, Any]]:
138        """
139        Compute and return a list of merged intervals.
140
141        Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the
142        `increment` method) than the end time of the first interval.
143        """
144        if not intervals:
145            return []
146
147        sorted_intervals = sorted(
148            intervals, key=lambda interval: (interval[self.START_KEY], interval[self.END_KEY])
149        )
150        merged_intervals = [sorted_intervals[0]]
151
152        for current_interval in sorted_intervals[1:]:
153            last_interval = merged_intervals[-1]
154            last_interval_end = last_interval[self.END_KEY]
155            current_interval_start = current_interval[self.START_KEY]
156
157            if self.increment(last_interval_end) >= current_interval_start:
158                last_interval[self.END_KEY] = max(last_interval_end, current_interval[self.END_KEY])
159                last_interval_cursor_value = last_interval.get("most_recent_cursor_value")
160                current_interval_cursor_value = current_interval.get("most_recent_cursor_value")
161
162                last_interval["most_recent_cursor_value"] = (
163                    max(current_interval_cursor_value, last_interval_cursor_value)
164                    if current_interval_cursor_value and last_interval_cursor_value
165                    else current_interval_cursor_value or last_interval_cursor_value
166                )
167            else:
168                # Add a new interval if no overlap
169                merged_intervals.append(current_interval)
170
171        return merged_intervals
172
173    @abstractmethod
174    def parse_value(self, value: Any) -> Any:
175        """
176        Parse the value of the cursor field into a comparable value.
177        """
178        ...
179
180    @property
181    @abstractmethod
182    def zero_value(self) -> Any: ...
class ConcurrencyCompatibleStateType(enum.Enum):
14class ConcurrencyCompatibleStateType(Enum):
15    date_range = "date-range"
16    integer = "integer"

An enumeration.

date_range = <ConcurrencyCompatibleStateType.date_range: 'date-range'>
class AbstractStreamStateConverter(abc.ABC):
 19class AbstractStreamStateConverter(ABC):
 20    START_KEY = "start"
 21    END_KEY = "end"
 22    MOST_RECENT_RECORD_KEY = "most_recent_cursor_value"
 23
 24    @abstractmethod
 25    def _from_state_message(self, value: Any) -> Any:
 26        pass
 27
 28    @abstractmethod
 29    def _to_state_message(self, value: Any) -> Any:
 30        pass
 31
 32    def __init__(self, is_sequential_state: bool = True):
 33        self._is_sequential_state = is_sequential_state
 34
 35    def convert_to_state_message(
 36        self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any]
 37    ) -> MutableMapping[str, Any]:
 38        """
 39        Convert the state message from the concurrency-compatible format to the stream's original format.
 40
 41        e.g.
 42        { "created": "2021-01-18T21:18:20.000Z" }
 43        """
 44        if self.is_state_message_compatible(stream_state) and self._is_sequential_state:
 45            legacy_state = stream_state.get("legacy", {})
 46            latest_complete_time = self._get_latest_complete_time(stream_state.get("slices", []))
 47            if latest_complete_time is not None:
 48                legacy_state.update(
 49                    {cursor_field.cursor_field_key: self._to_state_message(latest_complete_time)}
 50                )
 51            return legacy_state or {}
 52        else:
 53            return self.serialize(stream_state, ConcurrencyCompatibleStateType.date_range)
 54
 55    def _get_latest_complete_time(self, slices: List[MutableMapping[str, Any]]) -> Any:
 56        """
 57        Get the latest time before which all records have been processed.
 58        """
 59        if not slices:
 60            raise RuntimeError(
 61                "Expected at least one slice but there were none. This is unexpected; please contact Support."
 62            )
 63        merged_intervals = self.merge_intervals(slices)
 64        first_interval = merged_intervals[0]
 65
 66        return first_interval.get("most_recent_cursor_value") or first_interval[self.START_KEY]
 67
 68    def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
 69        """
 70        Perform any transformations needed for compatibility with the converter.
 71        """
 72        for stream_slice in state.get("slices", []):
 73            stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY])
 74            stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY])
 75        return state
 76
 77    def serialize(
 78        self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType
 79    ) -> MutableMapping[str, Any]:
 80        """
 81        Perform any transformations needed for compatibility with the converter.
 82        """
 83        serialized_slices = []
 84        for stream_slice in state.get("slices", []):
 85            serialized_slice = {
 86                self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]),
 87                self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]),
 88            }
 89            if stream_slice.get(self.MOST_RECENT_RECORD_KEY):
 90                serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message(
 91                    stream_slice[self.MOST_RECENT_RECORD_KEY]
 92                )
 93            serialized_slices.append(serialized_slice)
 94        return {"slices": serialized_slices, "state_type": state_type.value}
 95
 96    @staticmethod
 97    def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool:
 98        return bool(state) and state.get("state_type") in [
 99            t.value for t in ConcurrencyCompatibleStateType
100        ]
101
102    @abstractmethod
103    def convert_from_sequential_state(
104        self,
105        cursor_field: "CursorField",  # to deprecate as it is only needed for sequential state
106        stream_state: MutableMapping[str, Any],
107        start: Optional[Any],
108    ) -> Tuple[Any, MutableMapping[str, Any]]:
109        """
110        Convert the state message to the format required by the ConcurrentCursor.
111
112        e.g.
113        {
114            "state_type": ConcurrencyCompatibleStateType.date_range.value,
115            "metadata": { … },
116            "slices": [
117                {starts: 0, end: 1617030403, finished_processing: true}]
118        }
119        """
120        ...
121
122    @abstractmethod
123    def increment(self, value: Any) -> Any:
124        """
125        Increment a timestamp by a single unit.
126        """
127        ...
128
129    @abstractmethod
130    def output_format(self, value: Any) -> Any:
131        """
132        Convert the cursor value type to a JSON valid type.
133        """
134        ...
135
136    def merge_intervals(
137        self, intervals: List[MutableMapping[str, Any]]
138    ) -> List[MutableMapping[str, Any]]:
139        """
140        Compute and return a list of merged intervals.
141
142        Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the
143        `increment` method) than the end time of the first interval.
144        """
145        if not intervals:
146            return []
147
148        sorted_intervals = sorted(
149            intervals, key=lambda interval: (interval[self.START_KEY], interval[self.END_KEY])
150        )
151        merged_intervals = [sorted_intervals[0]]
152
153        for current_interval in sorted_intervals[1:]:
154            last_interval = merged_intervals[-1]
155            last_interval_end = last_interval[self.END_KEY]
156            current_interval_start = current_interval[self.START_KEY]
157
158            if self.increment(last_interval_end) >= current_interval_start:
159                last_interval[self.END_KEY] = max(last_interval_end, current_interval[self.END_KEY])
160                last_interval_cursor_value = last_interval.get("most_recent_cursor_value")
161                current_interval_cursor_value = current_interval.get("most_recent_cursor_value")
162
163                last_interval["most_recent_cursor_value"] = (
164                    max(current_interval_cursor_value, last_interval_cursor_value)
165                    if current_interval_cursor_value and last_interval_cursor_value
166                    else current_interval_cursor_value or last_interval_cursor_value
167                )
168            else:
169                # Add a new interval if no overlap
170                merged_intervals.append(current_interval)
171
172        return merged_intervals
173
174    @abstractmethod
175    def parse_value(self, value: Any) -> Any:
176        """
177        Parse the value of the cursor field into a comparable value.
178        """
179        ...
180
181    @property
182    @abstractmethod
183    def zero_value(self) -> Any: ...

Helper class that provides a standard way to create an ABC using inheritance.

START_KEY = 'start'
END_KEY = 'end'
MOST_RECENT_RECORD_KEY = 'most_recent_cursor_value'
def convert_to_state_message( self, cursor_field: airbyte_cdk.CursorField, stream_state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
35    def convert_to_state_message(
36        self, cursor_field: "CursorField", stream_state: MutableMapping[str, Any]
37    ) -> MutableMapping[str, Any]:
38        """
39        Convert the state message from the concurrency-compatible format to the stream's original format.
40
41        e.g.
42        { "created": "2021-01-18T21:18:20.000Z" }
43        """
44        if self.is_state_message_compatible(stream_state) and self._is_sequential_state:
45            legacy_state = stream_state.get("legacy", {})
46            latest_complete_time = self._get_latest_complete_time(stream_state.get("slices", []))
47            if latest_complete_time is not None:
48                legacy_state.update(
49                    {cursor_field.cursor_field_key: self._to_state_message(latest_complete_time)}
50                )
51            return legacy_state or {}
52        else:
53            return self.serialize(stream_state, ConcurrencyCompatibleStateType.date_range)

Convert the state message from the concurrency-compatible format to the stream's original format.

e.g. { "created": "2021-01-18T21:18:20.000Z" }

def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
68    def deserialize(self, state: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
69        """
70        Perform any transformations needed for compatibility with the converter.
71        """
72        for stream_slice in state.get("slices", []):
73            stream_slice[self.START_KEY] = self._from_state_message(stream_slice[self.START_KEY])
74            stream_slice[self.END_KEY] = self._from_state_message(stream_slice[self.END_KEY])
75        return state

Perform any transformations needed for compatibility with the converter.

def serialize( self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType) -> MutableMapping[str, Any]:
77    def serialize(
78        self, state: MutableMapping[str, Any], state_type: ConcurrencyCompatibleStateType
79    ) -> MutableMapping[str, Any]:
80        """
81        Perform any transformations needed for compatibility with the converter.
82        """
83        serialized_slices = []
84        for stream_slice in state.get("slices", []):
85            serialized_slice = {
86                self.START_KEY: self._to_state_message(stream_slice[self.START_KEY]),
87                self.END_KEY: self._to_state_message(stream_slice[self.END_KEY]),
88            }
89            if stream_slice.get(self.MOST_RECENT_RECORD_KEY):
90                serialized_slice[self.MOST_RECENT_RECORD_KEY] = self._to_state_message(
91                    stream_slice[self.MOST_RECENT_RECORD_KEY]
92                )
93            serialized_slices.append(serialized_slice)
94        return {"slices": serialized_slices, "state_type": state_type.value}

Perform any transformations needed for compatibility with the converter.

@staticmethod
def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool:
 96    @staticmethod
 97    def is_state_message_compatible(state: MutableMapping[str, Any]) -> bool:
 98        return bool(state) and state.get("state_type") in [
 99            t.value for t in ConcurrencyCompatibleStateType
100        ]
@abstractmethod
def convert_from_sequential_state( self, cursor_field: airbyte_cdk.CursorField, stream_state: MutableMapping[str, Any], start: Optional[Any]) -> Tuple[Any, MutableMapping[str, Any]]:
102    @abstractmethod
103    def convert_from_sequential_state(
104        self,
105        cursor_field: "CursorField",  # to deprecate as it is only needed for sequential state
106        stream_state: MutableMapping[str, Any],
107        start: Optional[Any],
108    ) -> Tuple[Any, MutableMapping[str, Any]]:
109        """
110        Convert the state message to the format required by the ConcurrentCursor.
111
112        e.g.
113        {
114            "state_type": ConcurrencyCompatibleStateType.date_range.value,
115            "metadata": { … },
116            "slices": [
117                {starts: 0, end: 1617030403, finished_processing: true}]
118        }
119        """
120        ...

Convert the state message to the format required by the ConcurrentCursor.

e.g. { "state_type": ConcurrencyCompatibleStateType.date_range.value, "metadata": { … }, "slices": [ {starts: 0, end: 1617030403, finished_processing: true}] }

@abstractmethod
def increment(self, value: Any) -> Any:
122    @abstractmethod
123    def increment(self, value: Any) -> Any:
124        """
125        Increment a timestamp by a single unit.
126        """
127        ...

Increment a timestamp by a single unit.

@abstractmethod
def output_format(self, value: Any) -> Any:
129    @abstractmethod
130    def output_format(self, value: Any) -> Any:
131        """
132        Convert the cursor value type to a JSON valid type.
133        """
134        ...

Convert the cursor value type to a JSON valid type.

def merge_intervals( self, intervals: List[MutableMapping[str, Any]]) -> List[MutableMapping[str, Any]]:
136    def merge_intervals(
137        self, intervals: List[MutableMapping[str, Any]]
138    ) -> List[MutableMapping[str, Any]]:
139        """
140        Compute and return a list of merged intervals.
141
142        Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the
143        `increment` method) than the end time of the first interval.
144        """
145        if not intervals:
146            return []
147
148        sorted_intervals = sorted(
149            intervals, key=lambda interval: (interval[self.START_KEY], interval[self.END_KEY])
150        )
151        merged_intervals = [sorted_intervals[0]]
152
153        for current_interval in sorted_intervals[1:]:
154            last_interval = merged_intervals[-1]
155            last_interval_end = last_interval[self.END_KEY]
156            current_interval_start = current_interval[self.START_KEY]
157
158            if self.increment(last_interval_end) >= current_interval_start:
159                last_interval[self.END_KEY] = max(last_interval_end, current_interval[self.END_KEY])
160                last_interval_cursor_value = last_interval.get("most_recent_cursor_value")
161                current_interval_cursor_value = current_interval.get("most_recent_cursor_value")
162
163                last_interval["most_recent_cursor_value"] = (
164                    max(current_interval_cursor_value, last_interval_cursor_value)
165                    if current_interval_cursor_value and last_interval_cursor_value
166                    else current_interval_cursor_value or last_interval_cursor_value
167                )
168            else:
169                # Add a new interval if no overlap
170                merged_intervals.append(current_interval)
171
172        return merged_intervals

Compute and return a list of merged intervals.

Intervals may be merged if the start time of the second interval is 1 unit or less (as defined by the increment method) than the end time of the first interval.

@abstractmethod
def parse_value(self, value: Any) -> Any:
174    @abstractmethod
175    def parse_value(self, value: Any) -> Any:
176        """
177        Parse the value of the cursor field into a comparable value.
178        """
179        ...

Parse the value of the cursor field into a comparable value.

zero_value: Any
181    @property
182    @abstractmethod
183    def zero_value(self) -> Any: ...