airbyte_cdk.sources.declarative.partition_routers.partition_router

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import abstractmethod
 6from dataclasses import dataclass
 7from typing import Mapping, Optional
 8
 9from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
10from airbyte_cdk.sources.types import StreamState
11
12
13@dataclass
14class PartitionRouter(StreamSlicer):
15    """
16    Base class for partition routers.
17    Methods:
18        set_parent_state(stream_state): Set the state of the parent streams.
19        get_parent_state(): Get the state of the parent streams.
20    """
21
22    @abstractmethod
23    def set_initial_state(self, stream_state: StreamState) -> None:
24        """
25        Set the state of the parent streams.
26
27        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
28        incrementally using the state.
29
30        Args:
31            stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
32                                        'parent_state' which is a dictionary of parent state names to their corresponding state.
33                Example:
34                {
35                    "parent_state": {
36                        "parent_stream_name_1": { ... },
37                        "parent_stream_name_2": { ... },
38                        ...
39                    }
40                }
41        """
42
43    @abstractmethod
44    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
45        """
46        Get the state of the parent streams.
47
48        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
49        incrementally using the state.
50
51        Returns:
52            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
53                 The returned format will be:
54                 {
55                     "parent_stream_name1": {
56                         "last_updated": "2023-05-27T00:00:00Z"
57                     },
58                     "parent_stream_name2": {
59                         "last_updated": "2023-05-27T00:00:00Z"
60                     }
61                 }
62        """
@dataclass
class PartitionRouter(airbyte_cdk.sources.declarative.stream_slicers.stream_slicer.StreamSlicer):
14@dataclass
15class PartitionRouter(StreamSlicer):
16    """
17    Base class for partition routers.
18    Methods:
19        set_parent_state(stream_state): Set the state of the parent streams.
20        get_parent_state(): Get the state of the parent streams.
21    """
22
23    @abstractmethod
24    def set_initial_state(self, stream_state: StreamState) -> None:
25        """
26        Set the state of the parent streams.
27
28        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
29        incrementally using the state.
30
31        Args:
32            stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
33                                        'parent_state' which is a dictionary of parent state names to their corresponding state.
34                Example:
35                {
36                    "parent_state": {
37                        "parent_stream_name_1": { ... },
38                        "parent_stream_name_2": { ... },
39                        ...
40                    }
41                }
42        """
43
44    @abstractmethod
45    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
46        """
47        Get the state of the parent streams.
48
49        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
50        incrementally using the state.
51
52        Returns:
53            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
54                 The returned format will be:
55                 {
56                     "parent_stream_name1": {
57                         "last_updated": "2023-05-27T00:00:00Z"
58                     },
59                     "parent_stream_name2": {
60                         "last_updated": "2023-05-27T00:00:00Z"
61                     }
62                 }
63        """

Base class for partition routers.

Methods:

set_parent_state(stream_state): Set the state of the parent streams. get_parent_state(): Get the state of the parent streams.

@abstractmethod
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
23    @abstractmethod
24    def set_initial_state(self, stream_state: StreamState) -> None:
25        """
26        Set the state of the parent streams.
27
28        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
29        incrementally using the state.
30
31        Args:
32            stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
33                                        'parent_state' which is a dictionary of parent state names to their corresponding state.
34                Example:
35                {
36                    "parent_state": {
37                        "parent_stream_name_1": { ... },
38                        "parent_stream_name_2": { ... },
39                        ...
40                    }
41                }
42        """

Set the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.

Arguments:
  • stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 'parent_state' which is a dictionary of parent state names to their corresponding state. Example: { "parent_state": { "parent_stream_name_1": { ... }, "parent_stream_name_2": { ... }, ... } }
@abstractmethod
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
44    @abstractmethod
45    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
46        """
47        Get the state of the parent streams.
48
49        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
50        incrementally using the state.
51
52        Returns:
53            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
54                 The returned format will be:
55                 {
56                     "parent_stream_name1": {
57                         "last_updated": "2023-05-27T00:00:00Z"
58                     },
59                     "parent_stream_name2": {
60                         "last_updated": "2023-05-27T00:00:00Z"
61                     }
62                 }
63        """

Get the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.

Returns:

Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }