airbyte_cdk.sources.declarative.partition_routers.partition_router
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from abc import abstractmethod 6from dataclasses import dataclass 7from typing import Mapping, Optional 8 9from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer 10from airbyte_cdk.sources.types import StreamState 11 12 13@dataclass 14class PartitionRouter(StreamSlicer): 15 """ 16 Base class for partition routers. 17 Methods: 18 set_parent_state(stream_state): Set the state of the parent streams. 19 get_parent_state(): Get the state of the parent streams. 20 """ 21 22 @abstractmethod 23 def set_initial_state(self, stream_state: StreamState) -> None: 24 """ 25 Set the state of the parent streams. 26 27 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 28 incrementally using the state. 29 30 Args: 31 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 32 'parent_state' which is a dictionary of parent state names to their corresponding state. 33 Example: 34 { 35 "parent_state": { 36 "parent_stream_name_1": { ... }, 37 "parent_stream_name_2": { ... }, 38 ... 39 } 40 } 41 """ 42 43 @abstractmethod 44 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 45 """ 46 Get the state of the parent streams. 47 48 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 49 incrementally using the state. 50 51 Returns: 52 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 53 The returned format will be: 54 { 55 "parent_stream_name1": { 56 "last_updated": "2023-05-27T00:00:00Z" 57 }, 58 "parent_stream_name2": { 59 "last_updated": "2023-05-27T00:00:00Z" 60 } 61 } 62 """
14@dataclass 15class PartitionRouter(StreamSlicer): 16 """ 17 Base class for partition routers. 18 Methods: 19 set_parent_state(stream_state): Set the state of the parent streams. 20 get_parent_state(): Get the state of the parent streams. 21 """ 22 23 @abstractmethod 24 def set_initial_state(self, stream_state: StreamState) -> None: 25 """ 26 Set the state of the parent streams. 27 28 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 29 incrementally using the state. 30 31 Args: 32 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 33 'parent_state' which is a dictionary of parent state names to their corresponding state. 34 Example: 35 { 36 "parent_state": { 37 "parent_stream_name_1": { ... }, 38 "parent_stream_name_2": { ... }, 39 ... 40 } 41 } 42 """ 43 44 @abstractmethod 45 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 46 """ 47 Get the state of the parent streams. 48 49 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 50 incrementally using the state. 51 52 Returns: 53 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 54 The returned format will be: 55 { 56 "parent_stream_name1": { 57 "last_updated": "2023-05-27T00:00:00Z" 58 }, 59 "parent_stream_name2": { 60 "last_updated": "2023-05-27T00:00:00Z" 61 } 62 } 63 """
Base class for partition routers.
Methods:
set_parent_state(stream_state): Set the state of the parent streams. get_parent_state(): Get the state of the parent streams.
23 @abstractmethod 24 def set_initial_state(self, stream_state: StreamState) -> None: 25 """ 26 Set the state of the parent streams. 27 28 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 29 incrementally using the state. 30 31 Args: 32 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 33 'parent_state' which is a dictionary of parent state names to their corresponding state. 34 Example: 35 { 36 "parent_state": { 37 "parent_stream_name_1": { ... }, 38 "parent_stream_name_2": { ... }, 39 ... 40 } 41 } 42 """
Set the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 'parent_state' which is a dictionary of parent state names to their corresponding state. Example: { "parent_state": { "parent_stream_name_1": { ... }, "parent_stream_name_2": { ... }, ... } }
44 @abstractmethod 45 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 46 """ 47 Get the state of the parent streams. 48 49 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 50 incrementally using the state. 51 52 Returns: 53 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 54 The returned format will be: 55 { 56 "parent_stream_name1": { 57 "last_updated": "2023-05-27T00:00:00Z" 58 }, 59 "parent_stream_name2": { 60 "last_updated": "2023-05-27T00:00:00Z" 61 } 62 } 63 """
Get the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Returns:
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }