airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2
  3from typing import Any, Mapping
  4
  5from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
  6from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
  7from airbyte_cdk.sources.declarative.models import (
  8    DatetimeBasedCursor,
  9    SubstreamPartitionRouter,
 10)
 11from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig
 12
 13
 14def _is_already_migrated(stream_state: Mapping[str, Any]) -> bool:
 15    return "states" in stream_state
 16
 17
 18class LegacyToPerPartitionStateMigration(StateMigration):
 19    """
 20    Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
 21    The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
 22
 23    Example input state:
 24    {
 25    "13506132": {
 26      "last_changed": "2022-12-27T08:34:39+00:00"
 27    }
 28    Example output state:
 29    {
 30      "partition": {"id": "13506132"},
 31      "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
 32    }
 33    """
 34
 35    def __init__(
 36        self,
 37        partition_router: SubstreamPartitionRouter,
 38        cursor: DatetimeBasedCursor,
 39        config: Mapping[str, Any],
 40        parameters: Mapping[str, Any],
 41    ):
 42        self._partition_router = partition_router
 43        self._cursor = cursor
 44        self._config = config
 45        self._parameters = parameters
 46        self._partition_key_field = InterpolatedString.create(
 47            self._get_partition_field(self._partition_router), parameters=self._parameters
 48        ).eval(self._config)
 49        self._cursor_field = InterpolatedString.create(
 50            self._cursor.cursor_field, parameters=self._parameters
 51        ).eval(self._config)
 52
 53    def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
 54        parent_stream_config = partition_router.parent_stream_configs[0]
 55
 56        # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
 57        partition_field = (
 58            parent_stream_config.partition_field
 59            if isinstance(parent_stream_config, ParentStreamConfig)
 60            else parent_stream_config.get("partition_field")  # type: ignore # See above comment on why parent_stream_config might be a dict
 61        )
 62
 63        return partition_field
 64
 65    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
 66        if _is_already_migrated(stream_state):
 67            return False
 68
 69        # There is exactly one parent stream
 70        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
 71        if number_of_parent_streams != 1:
 72            # There should be exactly one parent stream
 73            return False
 74        """
 75        The expected state format is
 76        "<parent_key_id>" : {
 77          "<cursor_field>" : "<cursor_value>"
 78        }
 79        """
 80        if not stream_state:
 81            return False
 82        for key, value in stream_state.items():
 83            # it is expected the internal value to be a dictionary according to docstring
 84            if not isinstance(value, dict):
 85                return False
 86            keys = list(value.keys())
 87            if len(keys) != 1:
 88                # The input partitioned state should only have one key
 89                return False
 90            if keys[0] != self._cursor_field:
 91                # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
 92                return False
 93
 94        return True
 95
 96    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 97        states = [
 98            {"partition": {self._partition_key_field: key}, "cursor": value}
 99            for key, value in stream_state.items()
100        ]
101        return {"states": states}
class LegacyToPerPartitionStateMigration(airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration):
 19class LegacyToPerPartitionStateMigration(StateMigration):
 20    """
 21    Transforms the input state for per-partitioned streams from the legacy format to the low-code format.
 22    The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
 23
 24    Example input state:
 25    {
 26    "13506132": {
 27      "last_changed": "2022-12-27T08:34:39+00:00"
 28    }
 29    Example output state:
 30    {
 31      "partition": {"id": "13506132"},
 32      "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"}
 33    }
 34    """
 35
 36    def __init__(
 37        self,
 38        partition_router: SubstreamPartitionRouter,
 39        cursor: DatetimeBasedCursor,
 40        config: Mapping[str, Any],
 41        parameters: Mapping[str, Any],
 42    ):
 43        self._partition_router = partition_router
 44        self._cursor = cursor
 45        self._config = config
 46        self._parameters = parameters
 47        self._partition_key_field = InterpolatedString.create(
 48            self._get_partition_field(self._partition_router), parameters=self._parameters
 49        ).eval(self._config)
 50        self._cursor_field = InterpolatedString.create(
 51            self._cursor.cursor_field, parameters=self._parameters
 52        ).eval(self._config)
 53
 54    def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str:
 55        parent_stream_config = partition_router.parent_stream_configs[0]
 56
 57        # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components.
 58        partition_field = (
 59            parent_stream_config.partition_field
 60            if isinstance(parent_stream_config, ParentStreamConfig)
 61            else parent_stream_config.get("partition_field")  # type: ignore # See above comment on why parent_stream_config might be a dict
 62        )
 63
 64        return partition_field
 65
 66    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
 67        if _is_already_migrated(stream_state):
 68            return False
 69
 70        # There is exactly one parent stream
 71        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
 72        if number_of_parent_streams != 1:
 73            # There should be exactly one parent stream
 74            return False
 75        """
 76        The expected state format is
 77        "<parent_key_id>" : {
 78          "<cursor_field>" : "<cursor_value>"
 79        }
 80        """
 81        if not stream_state:
 82            return False
 83        for key, value in stream_state.items():
 84            # it is expected the internal value to be a dictionary according to docstring
 85            if not isinstance(value, dict):
 86                return False
 87            keys = list(value.keys())
 88            if len(keys) != 1:
 89                # The input partitioned state should only have one key
 90                return False
 91            if keys[0] != self._cursor_field:
 92                # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
 93                return False
 94
 95        return True
 96
 97    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 98        states = [
 99            {"partition": {self._partition_key_field: key}, "cursor": value}
100            for key, value in stream_state.items()
101        ]
102        return {"states": states}

Transforms the input state for per-partitioned streams from the legacy format to the low-code format. The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.

Example input state: { "13506132": { "last_changed": "2022-12-27T08:34:39+00:00" } Example output state: { "partition": {"id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }

LegacyToPerPartitionStateMigration( partition_router: airbyte_cdk.sources.declarative.models.declarative_component_schema.SubstreamPartitionRouter, cursor: airbyte_cdk.sources.declarative.models.declarative_component_schema.DatetimeBasedCursor, config: Mapping[str, Any], parameters: Mapping[str, Any])
36    def __init__(
37        self,
38        partition_router: SubstreamPartitionRouter,
39        cursor: DatetimeBasedCursor,
40        config: Mapping[str, Any],
41        parameters: Mapping[str, Any],
42    ):
43        self._partition_router = partition_router
44        self._cursor = cursor
45        self._config = config
46        self._parameters = parameters
47        self._partition_key_field = InterpolatedString.create(
48            self._get_partition_field(self._partition_router), parameters=self._parameters
49        ).eval(self._config)
50        self._cursor_field = InterpolatedString.create(
51            self._cursor.cursor_field, parameters=self._parameters
52        ).eval(self._config)
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
66    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
67        if _is_already_migrated(stream_state):
68            return False
69
70        # There is exactly one parent stream
71        number_of_parent_streams = len(self._partition_router.parent_stream_configs)  # type: ignore # custom partition will introduce this attribute if needed
72        if number_of_parent_streams != 1:
73            # There should be exactly one parent stream
74            return False
75        """
76        The expected state format is
77        "<parent_key_id>" : {
78          "<cursor_field>" : "<cursor_value>"
79        }
80        """
81        if not stream_state:
82            return False
83        for key, value in stream_state.items():
84            # it is expected the internal value to be a dictionary according to docstring
85            if not isinstance(value, dict):
86                return False
87            keys = list(value.keys())
88            if len(keys) != 1:
89                # The input partitioned state should only have one key
90                return False
91            if keys[0] != self._cursor_field:
92                # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field}
93                return False
94
95        return True

Check if the stream_state should be migrated

Parameters
  • stream_state: The stream_state to potentially migrate
Returns

true if the state is of the expected format and should be migrated. False otherwise.

def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 97    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
 98        states = [
 99            {"partition": {self._partition_key_field: key}, "cursor": value}
100            for key, value in stream_state.items()
101        ]
102        return {"states": states}

Migrate the stream_state. Assumes should_migrate(stream_state) returned True.

Parameters
  • stream_state: The stream_state to migrate
Returns

The migrated stream_state