airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3from typing import Any, Mapping 4 5from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 6from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration 7from airbyte_cdk.sources.declarative.models import ( 8 DatetimeBasedCursor, 9 SubstreamPartitionRouter, 10) 11from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig 12 13 14def _is_already_migrated(stream_state: Mapping[str, Any]) -> bool: 15 return "states" in stream_state 16 17 18class LegacyToPerPartitionStateMigration(StateMigration): 19 """ 20 Transforms the input state for per-partitioned streams from the legacy format to the low-code format. 21 The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter. 22 23 Example input state: 24 { 25 "13506132": { 26 "last_changed": "2022-12-27T08:34:39+00:00" 27 } 28 Example output state: 29 { 30 "partition": {"id": "13506132"}, 31 "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} 32 } 33 """ 34 35 def __init__( 36 self, 37 partition_router: SubstreamPartitionRouter, 38 cursor: DatetimeBasedCursor, 39 config: Mapping[str, Any], 40 parameters: Mapping[str, Any], 41 ): 42 self._partition_router = partition_router 43 self._cursor = cursor 44 self._config = config 45 self._parameters = parameters 46 self._partition_key_field = InterpolatedString.create( 47 self._get_partition_field(self._partition_router), parameters=self._parameters 48 ).eval(self._config) 49 self._cursor_field = InterpolatedString.create( 50 self._cursor.cursor_field, parameters=self._parameters 51 ).eval(self._config) 52 53 def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str: 54 parent_stream_config = partition_router.parent_stream_configs[0] 55 56 # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components. 57 partition_field = ( 58 parent_stream_config.partition_field 59 if isinstance(parent_stream_config, ParentStreamConfig) 60 else parent_stream_config.get("partition_field") # type: ignore # See above comment on why parent_stream_config might be a dict 61 ) 62 63 return partition_field 64 65 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 66 if _is_already_migrated(stream_state): 67 return False 68 69 # There is exactly one parent stream 70 number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed 71 if number_of_parent_streams != 1: 72 # There should be exactly one parent stream 73 return False 74 """ 75 The expected state format is 76 "<parent_key_id>" : { 77 "<cursor_field>" : "<cursor_value>" 78 } 79 """ 80 if not stream_state: 81 return False 82 for key, value in stream_state.items(): 83 # it is expected the internal value to be a dictionary according to docstring 84 if not isinstance(value, dict): 85 return False 86 keys = list(value.keys()) 87 if len(keys) != 1: 88 # The input partitioned state should only have one key 89 return False 90 if keys[0] != self._cursor_field: 91 # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} 92 return False 93 94 return True 95 96 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 97 states = [ 98 {"partition": {self._partition_key_field: key}, "cursor": value} 99 for key, value in stream_state.items() 100 ] 101 return {"states": states}
class
LegacyToPerPartitionStateMigration(airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration):
19class LegacyToPerPartitionStateMigration(StateMigration): 20 """ 21 Transforms the input state for per-partitioned streams from the legacy format to the low-code format. 22 The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter. 23 24 Example input state: 25 { 26 "13506132": { 27 "last_changed": "2022-12-27T08:34:39+00:00" 28 } 29 Example output state: 30 { 31 "partition": {"id": "13506132"}, 32 "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} 33 } 34 """ 35 36 def __init__( 37 self, 38 partition_router: SubstreamPartitionRouter, 39 cursor: DatetimeBasedCursor, 40 config: Mapping[str, Any], 41 parameters: Mapping[str, Any], 42 ): 43 self._partition_router = partition_router 44 self._cursor = cursor 45 self._config = config 46 self._parameters = parameters 47 self._partition_key_field = InterpolatedString.create( 48 self._get_partition_field(self._partition_router), parameters=self._parameters 49 ).eval(self._config) 50 self._cursor_field = InterpolatedString.create( 51 self._cursor.cursor_field, parameters=self._parameters 52 ).eval(self._config) 53 54 def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str: 55 parent_stream_config = partition_router.parent_stream_configs[0] 56 57 # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components. 58 partition_field = ( 59 parent_stream_config.partition_field 60 if isinstance(parent_stream_config, ParentStreamConfig) 61 else parent_stream_config.get("partition_field") # type: ignore # See above comment on why parent_stream_config might be a dict 62 ) 63 64 return partition_field 65 66 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 67 if _is_already_migrated(stream_state): 68 return False 69 70 # There is exactly one parent stream 71 number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed 72 if number_of_parent_streams != 1: 73 # There should be exactly one parent stream 74 return False 75 """ 76 The expected state format is 77 "<parent_key_id>" : { 78 "<cursor_field>" : "<cursor_value>" 79 } 80 """ 81 if not stream_state: 82 return False 83 for key, value in stream_state.items(): 84 # it is expected the internal value to be a dictionary according to docstring 85 if not isinstance(value, dict): 86 return False 87 keys = list(value.keys()) 88 if len(keys) != 1: 89 # The input partitioned state should only have one key 90 return False 91 if keys[0] != self._cursor_field: 92 # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} 93 return False 94 95 return True 96 97 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 98 states = [ 99 {"partition": {self._partition_key_field: key}, "cursor": value} 100 for key, value in stream_state.items() 101 ] 102 return {"states": states}
Transforms the input state for per-partitioned streams from the legacy format to the low-code format. The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
Example input state: { "13506132": { "last_changed": "2022-12-27T08:34:39+00:00" } Example output state: { "partition": {"id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }
LegacyToPerPartitionStateMigration( partition_router: airbyte_cdk.sources.declarative.models.declarative_component_schema.SubstreamPartitionRouter, cursor: airbyte_cdk.sources.declarative.models.declarative_component_schema.DatetimeBasedCursor, config: Mapping[str, Any], parameters: Mapping[str, Any])
36 def __init__( 37 self, 38 partition_router: SubstreamPartitionRouter, 39 cursor: DatetimeBasedCursor, 40 config: Mapping[str, Any], 41 parameters: Mapping[str, Any], 42 ): 43 self._partition_router = partition_router 44 self._cursor = cursor 45 self._config = config 46 self._parameters = parameters 47 self._partition_key_field = InterpolatedString.create( 48 self._get_partition_field(self._partition_router), parameters=self._parameters 49 ).eval(self._config) 50 self._cursor_field = InterpolatedString.create( 51 self._cursor.cursor_field, parameters=self._parameters 52 ).eval(self._config)
def
should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
66 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 67 if _is_already_migrated(stream_state): 68 return False 69 70 # There is exactly one parent stream 71 number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed 72 if number_of_parent_streams != 1: 73 # There should be exactly one parent stream 74 return False 75 """ 76 The expected state format is 77 "<parent_key_id>" : { 78 "<cursor_field>" : "<cursor_value>" 79 } 80 """ 81 if not stream_state: 82 return False 83 for key, value in stream_state.items(): 84 # it is expected the internal value to be a dictionary according to docstring 85 if not isinstance(value, dict): 86 return False 87 keys = list(value.keys()) 88 if len(keys) != 1: 89 # The input partitioned state should only have one key 90 return False 91 if keys[0] != self._cursor_field: 92 # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} 93 return False 94 95 return True
Check if the stream_state should be migrated
Parameters
- stream_state: The stream_state to potentially migrate
Returns
true if the state is of the expected format and should be migrated. False otherwise.
def
migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
97 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 98 states = [ 99 {"partition": {self._partition_key_field: key}, "cursor": value} 100 for key, value in stream_state.items() 101 ] 102 return {"states": states}
Migrate the stream_state. Assumes should_migrate(stream_state) returned True.
Parameters
- stream_state: The stream_state to migrate
Returns
The migrated stream_state