airbyte_cdk.sources.declarative.migrations.state_migration
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2 3from abc import abstractmethod 4from typing import Any, Mapping 5 6 7class StateMigration: 8 @abstractmethod 9 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 10 """ 11 Check if the stream_state should be migrated 12 13 :param stream_state: The stream_state to potentially migrate 14 :return: true if the state is of the expected format and should be migrated. False otherwise. 15 """ 16 17 @abstractmethod 18 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 19 """ 20 Migrate the stream_state. Assumes should_migrate(stream_state) returned True. 21 22 :param stream_state: The stream_state to migrate 23 :return: The migrated stream_state 24 """
class
StateMigration:
8class StateMigration: 9 @abstractmethod 10 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 11 """ 12 Check if the stream_state should be migrated 13 14 :param stream_state: The stream_state to potentially migrate 15 :return: true if the state is of the expected format and should be migrated. False otherwise. 16 """ 17 18 @abstractmethod 19 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 20 """ 21 Migrate the stream_state. Assumes should_migrate(stream_state) returned True. 22 23 :param stream_state: The stream_state to migrate 24 :return: The migrated stream_state 25 """
@abstractmethod
def
should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
9 @abstractmethod 10 def should_migrate(self, stream_state: Mapping[str, Any]) -> bool: 11 """ 12 Check if the stream_state should be migrated 13 14 :param stream_state: The stream_state to potentially migrate 15 :return: true if the state is of the expected format and should be migrated. False otherwise. 16 """
Check if the stream_state should be migrated
Parameters
- stream_state: The stream_state to potentially migrate
Returns
true if the state is of the expected format and should be migrated. False otherwise.
@abstractmethod
def
migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
18 @abstractmethod 19 def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: 20 """ 21 Migrate the stream_state. Assumes should_migrate(stream_state) returned True. 22 23 :param stream_state: The stream_state to migrate 24 :return: The migrated stream_state 25 """
Migrate the stream_state. Assumes should_migrate(stream_state) returned True.
Parameters
- stream_state: The stream_state to migrate
Returns
The migrated stream_state