airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3from typing import Any, Mapping
 4
 5from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 6from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
 7from airbyte_cdk.sources.declarative.models import (
 8    CustomIncrementalSync,
 9    DatetimeBasedCursor,
10    SubstreamPartitionRouter,
11)
12from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig
13
14
15def _is_already_migrated(stream_state: Mapping[str, Any]) -> bool:
16    return "states" in stream_state
17
18
19class LegacyToPerPartitionStateMigration(StateMigration):
20    """
21    Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
22    The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
23
24    Example input state:
25    {
26    "13506132": {
27      "last_changed": "2022-12-27T08:34:39+00:00"
28    }
29    Example output state:
30    {
31      "partition": {"id": "13506132"},
32      "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
33    }
34    """
35
36    def __init__(
37        self,
38        partition_router: SubstreamPartitionRouter,
39        cursor: CustomIncrementalSync | DatetimeBasedCursor,
40        config: Mapping[str, Any],
41        parameters: Mapping[str, Any],
42    ):
43        self._partition_router = partition_router
44        self._cursor = cursor
45        self._config = config
46        self._parameters = parameters
47        self._partition_key_field = InterpolatedString.create(
48            self._get_partition_field(self._partition_router), parameters=self._parameters
49        ).eval(self._config)
50        self._cursor_field = InterpolatedString.create(
51            self._cursor.cursor_field, parameters=self._parameters
52        ).eval(self._config)
53
54    def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
55        parent_stream_config = partition_router.parent_stream_configs[0]
56
57        # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
58        partition_field = (
59            parent_stream_config.partition_field
60            if isinstance(parent_stream_config, ParentStreamConfig)
61            else parent_stream_config.get("partition_field")  # type: ignore # See above comment on why parent_stream_config might be a dict
62        )
63
64        return partition_field
65
66    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
67        if _is_already_migrated(stream_state):
68            return False
69
70        # There is exactly one parent stream
71        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
72        if number_of_parent_streams != 1:
73            # There should be exactly one parent stream
74            return False
75        """
76        The expected state format is
77        "<parent_key_id>" : {
78          "<cursor_field>" : "<cursor_value>"
79        }
80        """
81        if stream_state:
82            for key, value in stream_state.items():
83                if isinstance(value, dict):
84                    keys = list(value.keys())
85                    if len(keys) != 1:
86                        # The input partitioned state should only have one key
87                        return False
88                    if keys[0] != self._cursor_field:
89                        # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
90                        return False
91        return True
92
93    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
94        states = [
95            {"partition": {self._partition_key_field: key}, "cursor": value}
96            for key, value in stream_state.items()
97        ]
98        return {"states": states}
class LegacyToPerPartitionStateMigration(airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration):
20class LegacyToPerPartitionStateMigration(StateMigration):
21    """
22    Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
23    The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
24
25    Example input state:
26    {
27    "13506132": {
28      "last_changed": "2022-12-27T08:34:39+00:00"
29    }
30    Example output state:
31    {
32      "partition": {"id": "13506132"},
33      "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
34    }
35    """
36
37    def __init__(
38        self,
39        partition_router: SubstreamPartitionRouter,
40        cursor: CustomIncrementalSync | DatetimeBasedCursor,
41        config: Mapping[str, Any],
42        parameters: Mapping[str, Any],
43    ):
44        self._partition_router = partition_router
45        self._cursor = cursor
46        self._config = config
47        self._parameters = parameters
48        self._partition_key_field = InterpolatedString.create(
49            self._get_partition_field(self._partition_router), parameters=self._parameters
50        ).eval(self._config)
51        self._cursor_field = InterpolatedString.create(
52            self._cursor.cursor_field, parameters=self._parameters
53        ).eval(self._config)
54
55    def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
56        parent_stream_config = partition_router.parent_stream_configs[0]
57
58        # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
59        partition_field = (
60            parent_stream_config.partition_field
61            if isinstance(parent_stream_config, ParentStreamConfig)
62            else parent_stream_config.get("partition_field")  # type: ignore # See above comment on why parent_stream_config might be a dict
63        )
64
65        return partition_field
66
67    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
68        if _is_already_migrated(stream_state):
69            return False
70
71        # There is exactly one parent stream
72        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
73        if number_of_parent_streams != 1:
74            # There should be exactly one parent stream
75            return False
76        """
77        The expected state format is
78        "<parent_key_id>" : {
79          "<cursor_field>" : "<cursor_value>"
80        }
81        """
82        if stream_state:
83            for key, value in stream_state.items():
84                if isinstance(value, dict):
85                    keys = list(value.keys())
86                    if len(keys) != 1:
87                        # The input partitioned state should only have one key
88                        return False
89                    if keys[0] != self._cursor_field:
90                        # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
91                        return False
92        return True
93
94    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
95        states = [
96            {"partition": {self._partition_key_field: key}, "cursor": value}
97            for key, value in stream_state.items()
98        ]
99        return {"states": states}

Transforms the input state for per-partitioned streams from the legacy format to the low-code format. The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.

Example input state: { "13506132": { "last_changed": "2022-12-27T08:34:39+00:00" } Example output state: { "partition": {"id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }

37    def __init__(
38        self,
39        partition_router: SubstreamPartitionRouter,
40        cursor: CustomIncrementalSync | DatetimeBasedCursor,
41        config: Mapping[str, Any],
42        parameters: Mapping[str, Any],
43    ):
44        self._partition_router = partition_router
45        self._cursor = cursor
46        self._config = config
47        self._parameters = parameters
48        self._partition_key_field = InterpolatedString.create(
49            self._get_partition_field(self._partition_router), parameters=self._parameters
50        ).eval(self._config)
51        self._cursor_field = InterpolatedString.create(
52            self._cursor.cursor_field, parameters=self._parameters
53        ).eval(self._config)
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
67    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
68        if _is_already_migrated(stream_state):
69            return False
70
71        # There is exactly one parent stream
72        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
73        if number_of_parent_streams != 1:
74            # There should be exactly one parent stream
75            return False
76        """
77        The expected state format is
78        "<parent_key_id>" : {
79          "<cursor_field>" : "<cursor_value>"
80        }
81        """
82        if stream_state:
83            for key, value in stream_state.items():
84                if isinstance(value, dict):
85                    keys = list(value.keys())
86                    if len(keys) != 1:
87                        # The input partitioned state should only have one key
88                        return False
89                    if keys[0] != self._cursor_field:
90                        # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
91                        return False
92        return True

Check if the stream_state should be migrated

Parameters
  • stream_state: The stream_state to potentially migrate
Returns

true if the state is of the expected format and should be migrated. False otherwise.

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
94    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
95        states = [
96            {"partition": {self._partition_key_field: key}, "cursor": value}
97            for key, value in stream_state.items()
98        ]
99        return {"states": states}

Migrate the stream_state. Assumes should_migrate(stream_state) returned True.

Parameters
  • stream_state: The stream_state to migrate
Returns

The migrated stream_state