airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3from typing import Any, Mapping 4 5from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 6from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration 7from airbyte_cdk.sources.declarative.models import ( 8 CustomIncrementalSync, 9 DatetimeBasedCursor, 10 SubstreamPartitionRouter, 11) 12from airbyte_cdk.sources.declarative.models.declarative_component_schema import ParentStreamConfig 13 14 15def _is_already_migrated(stream_state: Mapping[str, Any]) -> bool: 16 return "states" in stream_state 17 18 19class LegacyToPerPartitionStateMigration(StateMigration): 20 """ 21 Transforms the input state for per-partitioned streams from the legacy format to the low-code format. 22 The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter. 23 24 Example input state: 25 { 26 "13506132": { 27 "last_changed": "2022-12-27T08:34:39+00:00" 28 } 29 Example output state: 30 { 31 "partition": {"id": "13506132"}, 32 "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} 33 } 34 """ 35 36 def __init__( 37 self, 38 partition_router: SubstreamPartitionRouter, 39 cursor: CustomIncrementalSync | DatetimeBasedCursor, 40 config: Mapping[str, Any], 41 parameters: Mapping[str, Any], 42 ): 43 self._partition_router = partition_router 44 self._cursor = cursor 45 self._config = config 46 self._parameters = parameters 47 self._partition_key_field = InterpolatedString.create( 48 self._get_partition_field(self._partition_router), parameters=self._parameters 49 ).eval(self._config) 50 self._cursor_field = InterpolatedString.create( 51 self._cursor.cursor_field, parameters=self._parameters 52 ).eval(self._config) 53 54 def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str: 55 parent_stream_config = partition_router.parent_stream_configs[0] 56 57 # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components. 58 partition_field = ( 59 parent_stream_config.partition_field 60 if isinstance(parent_stream_config, ParentStreamConfig) 61 else parent_stream_config.get("partition_field") # type: ignore # See above comment on why parent_stream_config might be a dict 62 ) 63 64 return partition_field 65 66 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 67 if _is_already_migrated(stream_state): 68 return False 69 70 # There is exactly one parent stream 71 number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed 72 if number_of_parent_streams != 1: 73 # There should be exactly one parent stream 74 return False 75 """ 76 The expected state format is 77 "<parent_key_id>" : { 78 "<cursor_field>" : "<cursor_value>" 79 } 80 """ 81 if stream_state: 82 for key, value in stream_state.items(): 83 if isinstance(value, dict): 84 keys = list(value.keys()) 85 if len(keys) != 1: 86 # The input partitioned state should only have one key 87 return False 88 if keys[0] != self._cursor_field: 89 # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} 90 return False 91 return True 92 93 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 94 states = [ 95 {"partition": {self._partition_key_field: key}, "cursor": value} 96 for key, value in stream_state.items() 97 ] 98 return {"states": states}
class
LegacyToPerPartitionStateMigration(airbyte_cdk.sources.declarative.migrations.state_migration.StateMigration):
20class LegacyToPerPartitionStateMigration(StateMigration): 21 """ 22 Transforms the input state for per-partitioned streams from the legacy format to the low-code format. 23 The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter. 24 25 Example input state: 26 { 27 "13506132": { 28 "last_changed": "2022-12-27T08:34:39+00:00" 29 } 30 Example output state: 31 { 32 "partition": {"id": "13506132"}, 33 "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} 34 } 35 """ 36 37 def __init__( 38 self, 39 partition_router: SubstreamPartitionRouter, 40 cursor: CustomIncrementalSync | DatetimeBasedCursor, 41 config: Mapping[str, Any], 42 parameters: Mapping[str, Any], 43 ): 44 self._partition_router = partition_router 45 self._cursor = cursor 46 self._config = config 47 self._parameters = parameters 48 self._partition_key_field = InterpolatedString.create( 49 self._get_partition_field(self._partition_router), parameters=self._parameters 50 ).eval(self._config) 51 self._cursor_field = InterpolatedString.create( 52 self._cursor.cursor_field, parameters=self._parameters 53 ).eval(self._config) 54 55 def _get_partition_field(self, partition_router: SubstreamPartitionRouter) -> str: 56 parent_stream_config = partition_router.parent_stream_configs[0] 57 58 # Retrieve the partition field with a condition, as properties are returned as a dictionary for custom components. 59 partition_field = ( 60 parent_stream_config.partition_field 61 if isinstance(parent_stream_config, ParentStreamConfig) 62 else parent_stream_config.get("partition_field") # type: ignore # See above comment on why parent_stream_config might be a dict 63 ) 64 65 return partition_field 66 67 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 68 if _is_already_migrated(stream_state): 69 return False 70 71 # There is exactly one parent stream 72 number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed 73 if number_of_parent_streams != 1: 74 # There should be exactly one parent stream 75 return False 76 """ 77 The expected state format is 78 "<parent_key_id>" : { 79 "<cursor_field>" : "<cursor_value>" 80 } 81 """ 82 if stream_state: 83 for key, value in stream_state.items(): 84 if isinstance(value, dict): 85 keys = list(value.keys()) 86 if len(keys) != 1: 87 # The input partitioned state should only have one key 88 return False 89 if keys[0] != self._cursor_field: 90 # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} 91 return False 92 return True 93 94 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 95 states = [ 96 {"partition": {self._partition_key_field: key}, "cursor": value} 97 for key, value in stream_state.items() 98 ] 99 return {"states": states}
Transforms the input state for per-partitioned streams from the legacy format to the low-code format. The cursor field and partition ID fields are automatically extracted from the stream's DatetimebasedCursor and SubstreamPartitionRouter.
Example input state: { "13506132": { "last_changed": "2022-12-27T08:34:39+00:00" } Example output state: { "partition": {"id": "13506132"}, "cursor": {"last_changed": "2022-12-27T08:34:39+00:00"} }
LegacyToPerPartitionStateMigration( partition_router: airbyte_cdk.sources.declarative.models.declarative_component_schema.SubstreamPartitionRouter, cursor: airbyte_cdk.sources.declarative.models.declarative_component_schema.CustomIncrementalSync | airbyte_cdk.sources.declarative.models.declarative_component_schema.DatetimeBasedCursor, config: Mapping[str, Any], parameters: Mapping[str, Any])
37 def __init__( 38 self, 39 partition_router: SubstreamPartitionRouter, 40 cursor: CustomIncrementalSync | DatetimeBasedCursor, 41 config: Mapping[str, Any], 42 parameters: Mapping[str, Any], 43 ): 44 self._partition_router = partition_router 45 self._cursor = cursor 46 self._config = config 47 self._parameters = parameters 48 self._partition_key_field = InterpolatedString.create( 49 self._get_partition_field(self._partition_router), parameters=self._parameters 50 ).eval(self._config) 51 self._cursor_field = InterpolatedString.create( 52 self._cursor.cursor_field, parameters=self._parameters 53 ).eval(self._config)
def
should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
67 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 68 if _is_already_migrated(stream_state): 69 return False 70 71 # There is exactly one parent stream 72 number_of_parent_streams = len(self._partition_router.parent_stream_configs) # type: ignore # custom partition will introduce this attribute if needed 73 if number_of_parent_streams != 1: 74 # There should be exactly one parent stream 75 return False 76 """ 77 The expected state format is 78 "<parent_key_id>" : { 79 "<cursor_field>" : "<cursor_value>" 80 } 81 """ 82 if stream_state: 83 for key, value in stream_state.items(): 84 if isinstance(value, dict): 85 keys = list(value.keys()) 86 if len(keys) != 1: 87 # The input partitioned state should only have one key 88 return False 89 if keys[0] != self._cursor_field: 90 # Unexpected key. Found {keys[0]}. Expected {self._cursor.cursor_field} 91 return False 92 return True
Check if the stream_state should be migrated
Parameters
- stream_state: The stream_state to potentially migrate
Returns
true if the state is of the expected format and should be migrated. False otherwise.
def
migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
94 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 95 states = [ 96 {"partition": {self._partition_key_field: key}, "cursor": value} 97 for key, value in stream_state.items() 98 ] 99 return {"states": states}
Migrate the stream_state. Assumes should_migrate(stream_state) returned True.
Parameters
- stream_state: The stream_state to migrate
Returns
The migrated stream_state