airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Any, Callable, MutableMapping, Optional, Tuple 6 7from airbyte_cdk.sources.streams.concurrent.cursor import CursorField 8from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import ( 9 AbstractStreamStateConverter, 10 ConcurrencyCompatibleStateType, 11) 12 13 14class IncrementingCountStreamStateConverter(AbstractStreamStateConverter): 15 def _from_state_message(self, value: Any) -> Any: 16 return value 17 18 def _to_state_message(self, value: Any) -> Any: 19 return value 20 21 @classmethod 22 def get_end_provider(cls) -> Callable[[], float]: 23 return lambda: float("inf") 24 25 def convert_from_sequential_state( 26 self, 27 cursor_field: "CursorField", # to deprecate as it is only needed for sequential state 28 stream_state: MutableMapping[str, Any], 29 start: Optional[Any], 30 ) -> Tuple[Any, MutableMapping[str, Any]]: 31 """ 32 Convert the state message to the format required by the ConcurrentCursor. 33 34 e.g. 35 { 36 "state_type": ConcurrencyCompatibleStateType.date_range.value, 37 "metadata": { … }, 38 "slices": [ 39 {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, 40 ] 41 } 42 """ 43 sync_start = self._get_sync_start(cursor_field, stream_state, start) 44 if self.is_state_message_compatible(stream_state): 45 return sync_start, stream_state 46 47 # Create a slice to represent the records synced during prior syncs. 48 # The start and end are the same to avoid confusion as to whether the records for this slice 49 # were actually synced 50 slices = [ 51 { 52 self.START_KEY: start if start is not None else sync_start, 53 self.END_KEY: sync_start, # this may not be relevant anymore 54 self.MOST_RECENT_RECORD_KEY: sync_start, 55 } 56 ] 57 58 return sync_start, { 59 "state_type": ConcurrencyCompatibleStateType.integer.value, 60 "slices": slices, 61 "legacy": stream_state, 62 } 63 64 def parse_value(self, value: int) -> int: 65 return value 66 67 @property 68 def zero_value(self) -> int: 69 return 0 70 71 def increment(self, value: int) -> int: 72 return value + 1 73 74 def output_format(self, value: int) -> int: 75 return value 76 77 def _get_sync_start( 78 self, 79 cursor_field: CursorField, 80 stream_state: MutableMapping[str, Any], 81 start: Optional[int], 82 ) -> int: 83 sync_start = start if start is not None else self.zero_value 84 prev_sync_low_water_mark: Optional[int] = ( 85 stream_state[cursor_field.cursor_field_key] 86 if cursor_field.cursor_field_key in stream_state 87 else None 88 ) 89 if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: 90 return prev_sync_low_water_mark 91 else: 92 return sync_start
class
IncrementingCountStreamStateConverter(airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter.AbstractStreamStateConverter):
15class IncrementingCountStreamStateConverter(AbstractStreamStateConverter): 16 def _from_state_message(self, value: Any) -> Any: 17 return value 18 19 def _to_state_message(self, value: Any) -> Any: 20 return value 21 22 @classmethod 23 def get_end_provider(cls) -> Callable[[], float]: 24 return lambda: float("inf") 25 26 def convert_from_sequential_state( 27 self, 28 cursor_field: "CursorField", # to deprecate as it is only needed for sequential state 29 stream_state: MutableMapping[str, Any], 30 start: Optional[Any], 31 ) -> Tuple[Any, MutableMapping[str, Any]]: 32 """ 33 Convert the state message to the format required by the ConcurrentCursor. 34 35 e.g. 36 { 37 "state_type": ConcurrencyCompatibleStateType.date_range.value, 38 "metadata": { … }, 39 "slices": [ 40 {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, 41 ] 42 } 43 """ 44 sync_start = self._get_sync_start(cursor_field, stream_state, start) 45 if self.is_state_message_compatible(stream_state): 46 return sync_start, stream_state 47 48 # Create a slice to represent the records synced during prior syncs. 49 # The start and end are the same to avoid confusion as to whether the records for this slice 50 # were actually synced 51 slices = [ 52 { 53 self.START_KEY: start if start is not None else sync_start, 54 self.END_KEY: sync_start, # this may not be relevant anymore 55 self.MOST_RECENT_RECORD_KEY: sync_start, 56 } 57 ] 58 59 return sync_start, { 60 "state_type": ConcurrencyCompatibleStateType.integer.value, 61 "slices": slices, 62 "legacy": stream_state, 63 } 64 65 def parse_value(self, value: int) -> int: 66 return value 67 68 @property 69 def zero_value(self) -> int: 70 return 0 71 72 def increment(self, value: int) -> int: 73 return value + 1 74 75 def output_format(self, value: int) -> int: 76 return value 77 78 def _get_sync_start( 79 self, 80 cursor_field: CursorField, 81 stream_state: MutableMapping[str, Any], 82 start: Optional[int], 83 ) -> int: 84 sync_start = start if start is not None else self.zero_value 85 prev_sync_low_water_mark: Optional[int] = ( 86 stream_state[cursor_field.cursor_field_key] 87 if cursor_field.cursor_field_key in stream_state 88 else None 89 ) 90 if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start: 91 return prev_sync_low_water_mark 92 else: 93 return sync_start
Helper class that provides a standard way to create an ABC using inheritance.
def
convert_from_sequential_state( self, cursor_field: airbyte_cdk.CursorField, stream_state: MutableMapping[str, Any], start: Optional[Any]) -> Tuple[Any, MutableMapping[str, Any]]:
26 def convert_from_sequential_state( 27 self, 28 cursor_field: "CursorField", # to deprecate as it is only needed for sequential state 29 stream_state: MutableMapping[str, Any], 30 start: Optional[Any], 31 ) -> Tuple[Any, MutableMapping[str, Any]]: 32 """ 33 Convert the state message to the format required by the ConcurrentCursor. 34 35 e.g. 36 { 37 "state_type": ConcurrencyCompatibleStateType.date_range.value, 38 "metadata": { … }, 39 "slices": [ 40 {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, 41 ] 42 } 43 """ 44 sync_start = self._get_sync_start(cursor_field, stream_state, start) 45 if self.is_state_message_compatible(stream_state): 46 return sync_start, stream_state 47 48 # Create a slice to represent the records synced during prior syncs. 49 # The start and end are the same to avoid confusion as to whether the records for this slice 50 # were actually synced 51 slices = [ 52 { 53 self.START_KEY: start if start is not None else sync_start, 54 self.END_KEY: sync_start, # this may not be relevant anymore 55 self.MOST_RECENT_RECORD_KEY: sync_start, 56 } 57 ] 58 59 return sync_start, { 60 "state_type": ConcurrencyCompatibleStateType.integer.value, 61 "slices": slices, 62 "legacy": stream_state, 63 }
Convert the state message to the format required by the ConcurrentCursor.
e.g. { "state_type": ConcurrencyCompatibleStateType.date_range.value, "metadata": { … }, "slices": [ {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, ] }
def
parse_value(self, value: int) -> int:
Parse the value of the cursor field into a comparable value.