airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2
  3from typing import Any, Mapping
  4
  5from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
  6from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
  7from airbyte_cdk.sources.declarative.models import (
  8    CustomIncrementalSync,
  9    DatetimeBasedCursor,
 10    SubstreamPartitionRouter,
 11)
 12from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig
 13
 14
 15def _is_already_migrated(stream_state: Mapping[str, Any]) -> bool:
 16    return "states" in stream_state
 17
 18
 19class LegacyToPerPartitionStateMigration(StateMigration):
 20    """
 21    Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
 22    The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
 23
 24    Example input state:
 25    {
 26    "13506132": {
 27      "last_changed": "2022-12-27T08:34:39+00:00"
 28    }
 29    Example output state:
 30    {
 31      "partition": {"id": "13506132"},
 32      "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
 33    }
 34    """
 35
 36    def __init__(
 37        self,
 38        partition_router: SubstreamPartitionRouter,
 39        cursor: CustomIncrementalSync | DatetimeBasedCursor,
 40        config: Mapping[str, Any],
 41        parameters: Mapping[str, Any],
 42    ):
 43        self._partition_router = partition_router
 44        self._cursor = cursor
 45        self._config = config
 46        self._parameters = parameters
 47        self._partition_key_field = InterpolatedString.create(
 48            self._get_partition_field(self._partition_router), parameters=self._parameters
 49        ).eval(self._config)
 50        self._cursor_field = InterpolatedString.create(
 51            self._cursor.cursor_field, parameters=self._parameters
 52        ).eval(self._config)
 53
 54    def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
 55        parent_stream_config = partition_router.parent_stream_configs[0]
 56
 57        # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
 58        partition_field = (
 59            parent_stream_config.partition_field
 60            if isinstance(parent_stream_config, ParentStreamConfig)
 61            else parent_stream_config.get("partition_field")  # type: ignore # See above comment on why parent_stream_config might be a dict
 62        )
 63
 64        return partition_field
 65
 66    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
 67        if _is_already_migrated(stream_state):
 68            return False
 69
 70        # There is exactly one parent stream
 71        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
 72        if number_of_parent_streams != 1:
 73            # There should be exactly one parent stream
 74            return False
 75        """
 76        The expected state format is
 77        "<parent_key_id>" : {
 78          "<cursor_field>" : "<cursor_value>"
 79        }
 80        """
 81        if not stream_state:
 82            return False
 83        for key, value in stream_state.items():
 84            # it is expected the internal value to be a dictionary according to docstring
 85            if not isinstance(value, dict):
 86                return False
 87            keys = list(value.keys())
 88            if len(keys) != 1:
 89                # The input partitioned state should only have one key
 90                return False
 91            if keys[0] != self._cursor_field:
 92                # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
 93                return False
 94
 95        return True
 96
 97    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 98        states = [
 99            {"partition": {self._partition_key_field: key}, "cursor": value}
100            for key, value in stream_state.items()
101        ]
102        return {"states": states}
class LegacyToPerPartitionStateMigration(airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration):
 20class LegacyToPerPartitionStateMigration(StateMigration):
 21    """
 22    Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
 23    The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
 24
 25    Example input state:
 26    {
 27    "13506132": {
 28      "last_changed": "2022-12-27T08:34:39+00:00"
 29    }
 30    Example output state:
 31    {
 32      "partition": {"id": "13506132"},
 33      "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
 34    }
 35    """
 36
 37    def __init__(
 38        self,
 39        partition_router: SubstreamPartitionRouter,
 40        cursor: CustomIncrementalSync | DatetimeBasedCursor,
 41        config: Mapping[str, Any],
 42        parameters: Mapping[str, Any],
 43    ):
 44        self._partition_router = partition_router
 45        self._cursor = cursor
 46        self._config = config
 47        self._parameters = parameters
 48        self._partition_key_field = InterpolatedString.create(
 49            self._get_partition_field(self._partition_router), parameters=self._parameters
 50        ).eval(self._config)
 51        self._cursor_field = InterpolatedString.create(
 52            self._cursor.cursor_field, parameters=self._parameters
 53        ).eval(self._config)
 54
 55    def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
 56        parent_stream_config = partition_router.parent_stream_configs[0]
 57
 58        # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
 59        partition_field = (
 60            parent_stream_config.partition_field
 61            if isinstance(parent_stream_config, ParentStreamConfig)
 62            else parent_stream_config.get("partition_field")  # type: ignore # See above comment on why parent_stream_config might be a dict
 63        )
 64
 65        return partition_field
 66
 67    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
 68        if _is_already_migrated(stream_state):
 69            return False
 70
 71        # There is exactly one parent stream
 72        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
 73        if number_of_parent_streams != 1:
 74            # There should be exactly one parent stream
 75            return False
 76        """
 77        The expected state format is
 78        "<parent_key_id>" : {
 79          "<cursor_field>" : "<cursor_value>"
 80        }
 81        """
 82        if not stream_state:
 83            return False
 84        for key, value in stream_state.items():
 85            # it is expected the internal value to be a dictionary according to docstring
 86            if not isinstance(value, dict):
 87                return False
 88            keys = list(value.keys())
 89            if len(keys) != 1:
 90                # The input partitioned state should only have one key
 91                return False
 92            if keys[0] != self._cursor_field:
 93                # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
 94                return False
 95
 96        return True
 97
 98    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 99        states = [
100            {"partition": {self._partition_key_field: key}, "cursor": value}
101            for key, value in stream_state.items()
102        ]
103        return {"states": states}

Transforms the input state for per-partitioned streams from the legacy format to the low-code format. The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.

Example input state: { "13506132": { "last_changed": "2022-12-27T08:34:39+00:00" } Example output state: { "partition": {"id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }

37    def __init__(
38        self,
39        partition_router: SubstreamPartitionRouter,
40        cursor: CustomIncrementalSync | DatetimeBasedCursor,
41        config: Mapping[str, Any],
42        parameters: Mapping[str, Any],
43    ):
44        self._partition_router = partition_router
45        self._cursor = cursor
46        self._config = config
47        self._parameters = parameters
48        self._partition_key_field = InterpolatedString.create(
49            self._get_partition_field(self._partition_router), parameters=self._parameters
50        ).eval(self._config)
51        self._cursor_field = InterpolatedString.create(
52            self._cursor.cursor_field, parameters=self._parameters
53        ).eval(self._config)
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
67    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
68        if _is_already_migrated(stream_state):
69            return False
70
71        # There is exactly one parent stream
72        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
73        if number_of_parent_streams != 1:
74            # There should be exactly one parent stream
75            return False
76        """
77        The expected state format is
78        "<parent_key_id>" : {
79          "<cursor_field>" : "<cursor_value>"
80        }
81        """
82        if not stream_state:
83            return False
84        for key, value in stream_state.items():
85            # it is expected the internal value to be a dictionary according to docstring
86            if not isinstance(value, dict):
87                return False
88            keys = list(value.keys())
89            if len(keys) != 1:
90                # The input partitioned state should only have one key
91                return False
92            if keys[0] != self._cursor_field:
93                # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
94                return False
95
96        return True

Check if the stream_state should be migrated

Parameters
  • stream_state: The stream_state to potentially migrate
Returns

true if the state is of the expected format and should be migrated. False otherwise.

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 98    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 99        states = [
100            {"partition": {self._partition_key_field: key}, "cursor": value}
101            for key, value in stream_state.items()
102        ]
103        return {"states": states}

Migrate the stream_state. Assumes should_migrate(stream_state) returned True.

Parameters
  • stream_state: The stream_state to migrate
Returns

The migrated stream_state