airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Any, Callable, MutableMapping, Optional, Tuple
 6
 7from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
 8from airbyte_cdk.sources.streams.concurrent.state_converters.abstract_stream_state_converter import (
 9    AbstractStreamStateConverter,
10    ConcurrencyCompatibleStateType,
11)
12
13
14class IncrementingCountStreamStateConverter(AbstractStreamStateConverter):
15    def _from_state_message(self, value: Any) -> Any:
16        return value
17
18    def _to_state_message(self, value: Any) -> Any:
19        return value
20
21    @classmethod
22    def get_end_provider(cls) -> Callable[[], float]:
23        return lambda: float("inf")
24
25    def convert_from_sequential_state(
26        self,
27        cursor_field: "CursorField",  # to deprecate as it is only needed for sequential state
28        stream_state: MutableMapping[str, Any],
29        start: Optional[Any],
30    ) -> Tuple[Any, MutableMapping[str, Any]]:
31        """
32        Convert the state message to the format required by the ConcurrentCursor.
33
34        e.g.
35        {
36            "state_type": ConcurrencyCompatibleStateType.date_range.value,
37            "metadata": { … },
38            "slices": [
39                {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"},
40            ]
41        }
42        """
43        sync_start = self._get_sync_start(cursor_field, stream_state, start)
44        if self.is_state_message_compatible(stream_state):
45            return sync_start, stream_state
46
47        # Create a slice to represent the records synced during prior syncs.
48        # The start and end are the same to avoid confusion as to whether the records for this slice
49        # were actually synced
50        slices = [
51            {
52                self.START_KEY: start if start is not None else sync_start,
53                self.END_KEY: sync_start,  # this may not be relevant anymore
54                self.MOST_RECENT_RECORD_KEY: sync_start,
55            }
56        ]
57
58        return sync_start, {
59            "state_type": ConcurrencyCompatibleStateType.integer.value,
60            "slices": slices,
61            "legacy": stream_state,
62        }
63
64    def parse_value(self, value: int) -> int:
65        return value
66
67    @property
68    def zero_value(self) -> int:
69        return 0
70
71    def increment(self, value: int) -> int:
72        return value + 1
73
74    def output_format(self, value: int) -> int:
75        return value
76
77    def _get_sync_start(
78        self,
79        cursor_field: CursorField,
80        stream_state: MutableMapping[str, Any],
81        start: Optional[int],
82    ) -> int:
83        sync_start = start if start is not None else self.zero_value
84        prev_sync_low_water_mark: Optional[int] = (
85            stream_state[cursor_field.cursor_field_key]
86            if cursor_field.cursor_field_key in stream_state
87            else None
88        )
89        if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
90            return prev_sync_low_water_mark
91        else:
92            return sync_start
15class IncrementingCountStreamStateConverter(AbstractStreamStateConverter):
16    def _from_state_message(self, value: Any) -> Any:
17        return value
18
19    def _to_state_message(self, value: Any) -> Any:
20        return value
21
22    @classmethod
23    def get_end_provider(cls) -> Callable[[], float]:
24        return lambda: float("inf")
25
26    def convert_from_sequential_state(
27        self,
28        cursor_field: "CursorField",  # to deprecate as it is only needed for sequential state
29        stream_state: MutableMapping[str, Any],
30        start: Optional[Any],
31    ) -> Tuple[Any, MutableMapping[str, Any]]:
32        """
33        Convert the state message to the format required by the ConcurrentCursor.
34
35        e.g.
36        {
37            "state_type": ConcurrencyCompatibleStateType.date_range.value,
38            "metadata": { … },
39            "slices": [
40                {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"},
41            ]
42        }
43        """
44        sync_start = self._get_sync_start(cursor_field, stream_state, start)
45        if self.is_state_message_compatible(stream_state):
46            return sync_start, stream_state
47
48        # Create a slice to represent the records synced during prior syncs.
49        # The start and end are the same to avoid confusion as to whether the records for this slice
50        # were actually synced
51        slices = [
52            {
53                self.START_KEY: start if start is not None else sync_start,
54                self.END_KEY: sync_start,  # this may not be relevant anymore
55                self.MOST_RECENT_RECORD_KEY: sync_start,
56            }
57        ]
58
59        return sync_start, {
60            "state_type": ConcurrencyCompatibleStateType.integer.value,
61            "slices": slices,
62            "legacy": stream_state,
63        }
64
65    def parse_value(self, value: int) -> int:
66        return value
67
68    @property
69    def zero_value(self) -> int:
70        return 0
71
72    def increment(self, value: int) -> int:
73        return value + 1
74
75    def output_format(self, value: int) -> int:
76        return value
77
78    def _get_sync_start(
79        self,
80        cursor_field: CursorField,
81        stream_state: MutableMapping[str, Any],
82        start: Optional[int],
83    ) -> int:
84        sync_start = start if start is not None else self.zero_value
85        prev_sync_low_water_mark: Optional[int] = (
86            stream_state[cursor_field.cursor_field_key]
87            if cursor_field.cursor_field_key in stream_state
88            else None
89        )
90        if prev_sync_low_water_mark and prev_sync_low_water_mark >= sync_start:
91            return prev_sync_low_water_mark
92        else:
93            return sync_start

Helper class that provides a standard way to create an ABC using inheritance.

@classmethod
def get_end_provider(cls) -> Callable[[], float]:
22    @classmethod
23    def get_end_provider(cls) -> Callable[[], float]:
24        return lambda: float("inf")
def convert_from_sequential_state( self, cursor_field: airbyte_cdk.CursorField, stream_state: MutableMapping[str, Any], start: Optional[Any]) -> Tuple[Any, MutableMapping[str, Any]]:
26    def convert_from_sequential_state(
27        self,
28        cursor_field: "CursorField",  # to deprecate as it is only needed for sequential state
29        stream_state: MutableMapping[str, Any],
30        start: Optional[Any],
31    ) -> Tuple[Any, MutableMapping[str, Any]]:
32        """
33        Convert the state message to the format required by the ConcurrentCursor.
34
35        e.g.
36        {
37            "state_type": ConcurrencyCompatibleStateType.date_range.value,
38            "metadata": { … },
39            "slices": [
40                {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"},
41            ]
42        }
43        """
44        sync_start = self._get_sync_start(cursor_field, stream_state, start)
45        if self.is_state_message_compatible(stream_state):
46            return sync_start, stream_state
47
48        # Create a slice to represent the records synced during prior syncs.
49        # The start and end are the same to avoid confusion as to whether the records for this slice
50        # were actually synced
51        slices = [
52            {
53                self.START_KEY: start if start is not None else sync_start,
54                self.END_KEY: sync_start,  # this may not be relevant anymore
55                self.MOST_RECENT_RECORD_KEY: sync_start,
56            }
57        ]
58
59        return sync_start, {
60            "state_type": ConcurrencyCompatibleStateType.integer.value,
61            "slices": slices,
62            "legacy": stream_state,
63        }

Convert the state message to the format required by the ConcurrentCursor.

e.g. { "state_type": ConcurrencyCompatibleStateType.date_range.value, "metadata": { … }, "slices": [ {"start": "10", "end": "2021-01-18T21:18:20.000+00:00"}, ] }

def parse_value(self, value: int) -> int:
65    def parse_value(self, value: int) -> int:
66        return value

Parse the value of the cursor field into a comparable value.

zero_value: int
68    @property
69    def zero_value(self) -> int:
70        return 0
def increment(self, value: int) -> int:
72    def increment(self, value: int) -> int:
73        return value + 1

Increment a timestamp by a single unit.

def output_format(self, value: int) -> int:
75    def output_format(self, value: int) -> int:
76        return value

Convert the cursor value type to a JSON valid type.