airbyte_cdk.sources.declarative.migrations.state_migration

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2
 3from abc import abstractmethod
 4from typing import Any, Mapping
 5
 6
 7class StateMigration:
 8    @abstractmethod
 9    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
10        """
11        Check if the stream_state should be migrated
12
13        :param stream_state: The stream_state to potentially migrate
14        :return: true if the state is of the expected format and should be migrated. False otherwise.
15        """
16
17    @abstractmethod
18    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
19        """
20        Migrate the stream_state. Assumes should_migrate(stream_state) returned True.
21
22        :param stream_state: The stream_state to migrate
23        :return: The migrated stream_state
24        """
class StateMigration:
 8class StateMigration:
 9    @abstractmethod
10    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
11        """
12        Check if the stream_state should be migrated
13
14        :param stream_state: The stream_state to potentially migrate
15        :return: true if the state is of the expected format and should be migrated. False otherwise.
16        """
17
18    @abstractmethod
19    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
20        """
21        Migrate the stream_state. Assumes should_migrate(stream_state) returned True.
22
23        :param stream_state: The stream_state to migrate
24        :return: The migrated stream_state
25        """
@abstractmethod
def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
 9    @abstractmethod
10    def should_migrate(self, stream_state: Mapping[str, Any]) -> bool:
11        """
12        Check if the stream_state should be migrated
13
14        :param stream_state: The stream_state to potentially migrate
15        :return: true if the state is of the expected format and should be migrated. False otherwise.
16        """

Check if the stream_state should be migrated

Parameters
  • stream_state: The stream_state to potentially migrate
Returns

true if the state is of the expected format and should be migrated. False otherwise.

@abstractmethod
def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
18    @abstractmethod
19    def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
20        """
21        Migrate the stream_state. Assumes should_migrate(stream_state) returned True.
22
23        :param stream_state: The stream_state to migrate
24        :return: The migrated stream_state
25        """

Migrate the stream_state. Assumes should_migrate(stream_state) returned True.

Parameters
  • stream_state: The stream_state to migrate
Returns

The migrated stream_state