airbyte_cdk.sources.declarative.partition_routers

 1#
 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
 6    AsyncJobPartitionRouter,
 7)
 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
 9    CartesianProductStreamSlicer,
10)
11from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
12    ListPartitionRouter,
13)
14from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
15from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
16    SinglePartitionRouter,
17)
18from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
19    SubstreamPartitionRouter,
20)
21
22__all__ = [
23    "AsyncJobPartitionRouter",
24    "CartesianProductStreamSlicer",
25    "ListPartitionRouter",
26    "SinglePartitionRouter",
27    "SubstreamPartitionRouter",
28    "PartitionRouter",
29]
@dataclass
class AsyncJobPartitionRouter(airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer.StreamSlicer):
20@dataclass
21class AsyncJobPartitionRouter(StreamSlicer):
22    """
23    Partition router that creates async jobs in a source API, periodically polls for job
24    completion, and supplies the completed job URL locations as stream slices so that
25    records can be extracted.
26    """
27
28    config: Config
29    parameters: InitVar[Mapping[str, Any]]
30    job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator]
31    stream_slicer: StreamSlicer = field(
32        default_factory=lambda: SinglePartitionRouter(parameters={})
33    )
34
35    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
36        self._job_orchestrator_factory = self.job_orchestrator_factory
37        self._job_orchestrator: Optional[AsyncJobOrchestrator] = None
38        self._parameters = parameters
39
40    def stream_slices(self) -> Iterable[StreamSlice]:
41        slices = self.stream_slicer.stream_slices()
42        self._job_orchestrator = self._job_orchestrator_factory(slices)
43
44        for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
45            yield StreamSlice(
46                partition=dict(completed_partition.stream_slice.partition),
47                cursor_slice=completed_partition.stream_slice.cursor_slice,
48                extra_fields={"jobs": list(completed_partition.jobs)},
49            )
50
51    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
52        """
53        This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
54        be responsible for. However, this was added in because the JobOrchestrator is required to
55        retrieve records. And without defining fetch_records() on this class, we're stuck with either
56        passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
57        """
58
59        if not self._job_orchestrator:
60            raise AirbyteTracedException(
61                message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
62                internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
63                failure_type=FailureType.system_error,
64            )
65
66        return self._job_orchestrator.fetch_records(async_jobs=async_jobs)

Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.

AsyncJobPartitionRouter( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], job_orchestrator_factory: Callable[[Iterable[airbyte_cdk.StreamSlice]], airbyte_cdk.sources.declarative.async_job.job_orchestrator.AsyncJobOrchestrator], stream_slicer: airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer.StreamSlicer = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
40    def stream_slices(self) -> Iterable[StreamSlice]:
41        slices = self.stream_slicer.stream_slices()
42        self._job_orchestrator = self._job_orchestrator_factory(slices)
43
44        for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
45            yield StreamSlice(
46                partition=dict(completed_partition.stream_slice.partition),
47                cursor_slice=completed_partition.stream_slice.cursor_slice,
48                extra_fields={"jobs": list(completed_partition.jobs)},
49            )

Defines stream slices

Returns

An iterable of stream slices

def fetch_records( self, async_jobs: Iterable[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> Iterable[Mapping[str, Any]]:
51    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
52        """
53        This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
54        be responsible for. However, this was added in because the JobOrchestrator is required to
55        retrieve records. And without defining fetch_records() on this class, we're stuck with either
56        passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
57        """
58
59        if not self._job_orchestrator:
60            raise AirbyteTracedException(
61                message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
62                internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
63                failure_type=FailureType.system_error,
64            )
65
66        return self._job_orchestrator.fetch_records(async_jobs=async_jobs)

This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.

@dataclass
class CartesianProductStreamSlicer(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 40@dataclass
 41class CartesianProductStreamSlicer(PartitionRouter):
 42    """
 43    Stream slicers that iterates over the cartesian product of input stream slicers
 44    Given 2 stream slicers with the following slices:
 45    A: [{"i": 0}, {"i": 1}, {"i": 2}]
 46    B: [{"s": "hello"}, {"s": "world"}]
 47    the resulting stream slices are
 48    [
 49        {"i": 0, "s": "hello"},
 50        {"i": 0, "s": "world"},
 51        {"i": 1, "s": "hello"},
 52        {"i": 1, "s": "world"},
 53        {"i": 2, "s": "hello"},
 54        {"i": 2, "s": "world"},
 55    ]
 56
 57    Attributes:
 58        stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
 59    """
 60
 61    stream_slicers: List[PartitionRouter]
 62    parameters: InitVar[Mapping[str, Any]]
 63
 64    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 65        check_for_substream_in_slicers(self.stream_slicers, self.logger.warning)
 66
 67    def get_request_params(
 68        self,
 69        *,
 70        stream_state: Optional[StreamState] = None,
 71        stream_slice: Optional[StreamSlice] = None,
 72        next_page_token: Optional[Mapping[str, Any]] = None,
 73    ) -> Mapping[str, Any]:
 74        return dict(
 75            ChainMap(
 76                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 77                    s.get_request_params(
 78                        stream_state=stream_state,
 79                        stream_slice=stream_slice,
 80                        next_page_token=next_page_token,
 81                    )
 82                    for s in self.stream_slicers
 83                ]
 84            )
 85        )
 86
 87    def get_request_headers(
 88        self,
 89        *,
 90        stream_state: Optional[StreamState] = None,
 91        stream_slice: Optional[StreamSlice] = None,
 92        next_page_token: Optional[Mapping[str, Any]] = None,
 93    ) -> Mapping[str, Any]:
 94        return dict(
 95            ChainMap(
 96                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 97                    s.get_request_headers(
 98                        stream_state=stream_state,
 99                        stream_slice=stream_slice,
100                        next_page_token=next_page_token,
101                    )
102                    for s in self.stream_slicers
103                ]
104            )
105        )
106
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        return dict(
115            ChainMap(
116                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
117                    s.get_request_body_data(
118                        stream_state=stream_state,
119                        stream_slice=stream_slice,
120                        next_page_token=next_page_token,
121                    )
122                    for s in self.stream_slicers
123                ]
124            )
125        )
126
127    def get_request_body_json(
128        self,
129        *,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return dict(
135            ChainMap(
136                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
137                    s.get_request_body_json(
138                        stream_state=stream_state,
139                        stream_slice=stream_slice,
140                        next_page_token=next_page_token,
141                    )
142                    for s in self.stream_slicers
143                ]
144            )
145        )
146
147    def stream_slices(self) -> Iterable[StreamSlice]:
148        sub_slices = (s.stream_slices() for s in self.stream_slicers)
149        product = itertools.product(*sub_slices)
150        for stream_slice_tuple in product:
151            partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
152            cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice]
153            if len(cursor_slices) > 1:
154                raise ValueError(
155                    f"There should only be a single cursor slice. Found {cursor_slices}"
156                )
157            if cursor_slices:
158                cursor_slice = cursor_slices[0]
159            else:
160                cursor_slice = {}
161            yield StreamSlice(partition=partition, cursor_slice=cursor_slice)
162
163    def set_initial_state(self, stream_state: StreamState) -> None:
164        """
165        Parent stream states are not supported for cartesian product stream slicer
166        """
167        pass
168
169    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
170        """
171        Parent stream states are not supported for cartesian product stream slicer
172        """
173        pass
174
175    @property
176    def logger(self) -> logging.Logger:
177        return logging.getLogger("airbyte.CartesianProductStreamSlicer")

Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]

Attributes:
  • stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
CartesianProductStreamSlicer( stream_slicers: List[PartitionRouter], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_slicers: List[PartitionRouter]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
67    def get_request_params(
68        self,
69        *,
70        stream_state: Optional[StreamState] = None,
71        stream_slice: Optional[StreamSlice] = None,
72        next_page_token: Optional[Mapping[str, Any]] = None,
73    ) -> Mapping[str, Any]:
74        return dict(
75            ChainMap(
76                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
77                    s.get_request_params(
78                        stream_state=stream_state,
79                        stream_slice=stream_slice,
80                        next_page_token=next_page_token,
81                    )
82                    for s in self.stream_slicers
83                ]
84            )
85        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 87    def get_request_headers(
 88        self,
 89        *,
 90        stream_state: Optional[StreamState] = None,
 91        stream_slice: Optional[StreamSlice] = None,
 92        next_page_token: Optional[Mapping[str, Any]] = None,
 93    ) -> Mapping[str, Any]:
 94        return dict(
 95            ChainMap(
 96                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 97                    s.get_request_headers(
 98                        stream_state=stream_state,
 99                        stream_slice=stream_slice,
100                        next_page_token=next_page_token,
101                    )
102                    for s in self.stream_slicers
103                ]
104            )
105        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        return dict(
115            ChainMap(
116                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
117                    s.get_request_body_data(
118                        stream_state=stream_state,
119                        stream_slice=stream_slice,
120                        next_page_token=next_page_token,
121                    )
122                    for s in self.stream_slicers
123                ]
124            )
125        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
127    def get_request_body_json(
128        self,
129        *,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return dict(
135            ChainMap(
136                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
137                    s.get_request_body_json(
138                        stream_state=stream_state,
139                        stream_slice=stream_slice,
140                        next_page_token=next_page_token,
141                    )
142                    for s in self.stream_slicers
143                ]
144            )
145        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
147    def stream_slices(self) -> Iterable[StreamSlice]:
148        sub_slices = (s.stream_slices() for s in self.stream_slicers)
149        product = itertools.product(*sub_slices)
150        for stream_slice_tuple in product:
151            partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
152            cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice]
153            if len(cursor_slices) > 1:
154                raise ValueError(
155                    f"There should only be a single cursor slice. Found {cursor_slices}"
156                )
157            if cursor_slices:
158                cursor_slice = cursor_slices[0]
159            else:
160                cursor_slice = {}
161            yield StreamSlice(partition=partition, cursor_slice=cursor_slice)

Defines stream slices

Returns

An iterable of stream slices

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
163    def set_initial_state(self, stream_state: StreamState) -> None:
164        """
165        Parent stream states are not supported for cartesian product stream slicer
166        """
167        pass

Parent stream states are not supported for cartesian product stream slicer

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
169    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
170        """
171        Parent stream states are not supported for cartesian product stream slicer
172        """
173        pass

Parent stream states are not supported for cartesian product stream slicer

logger: logging.Logger
175    @property
176    def logger(self) -> logging.Logger:
177        return logging.getLogger("airbyte.CartesianProductStreamSlicer")
@dataclass
class ListPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 18@dataclass
 19class ListPartitionRouter(PartitionRouter):
 20    """
 21    Partition router that iterates over the values of a list
 22    If values is a string, then evaluate it as literal and assert the resulting literal is a list
 23
 24    Attributes:
 25        values (Union[str, List[str]]): The values to iterate over
 26        cursor_field (Union[InterpolatedString, str]): The name of the cursor field
 27        config (Config): The user-provided configuration as specified by the source's spec
 28        request_option (Optional[RequestOption]): The request option to configure the HTTP request
 29    """
 30
 31    values: Union[str, List[str]]
 32    cursor_field: Union[InterpolatedString, str]
 33    config: Config
 34    parameters: InitVar[Mapping[str, Any]]
 35    request_option: Optional[RequestOption] = None
 36
 37    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 38        if isinstance(self.values, str):
 39            self.values = InterpolatedString.create(self.values, parameters=parameters).eval(
 40                self.config
 41            )
 42        self._cursor_field = (
 43            InterpolatedString(string=self.cursor_field, parameters=parameters)
 44            if isinstance(self.cursor_field, str)
 45            else self.cursor_field
 46        )
 47
 48        self._cursor = None
 49
 50    def get_request_params(
 51        self,
 52        stream_state: Optional[StreamState] = None,
 53        stream_slice: Optional[StreamSlice] = None,
 54        next_page_token: Optional[Mapping[str, Any]] = None,
 55    ) -> Mapping[str, Any]:
 56        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 57        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
 58
 59    def get_request_headers(
 60        self,
 61        stream_state: Optional[StreamState] = None,
 62        stream_slice: Optional[StreamSlice] = None,
 63        next_page_token: Optional[Mapping[str, Any]] = None,
 64    ) -> Mapping[str, Any]:
 65        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 66        return self._get_request_option(RequestOptionType.header, stream_slice)
 67
 68    def get_request_body_data(
 69        self,
 70        stream_state: Optional[StreamState] = None,
 71        stream_slice: Optional[StreamSlice] = None,
 72        next_page_token: Optional[Mapping[str, Any]] = None,
 73    ) -> Mapping[str, Any]:
 74        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 75        return self._get_request_option(RequestOptionType.body_data, stream_slice)
 76
 77    def get_request_body_json(
 78        self,
 79        stream_state: Optional[StreamState] = None,
 80        stream_slice: Optional[StreamSlice] = None,
 81        next_page_token: Optional[Mapping[str, Any]] = None,
 82    ) -> Mapping[str, Any]:
 83        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 84        return self._get_request_option(RequestOptionType.body_json, stream_slice)
 85
 86    def stream_slices(self) -> Iterable[StreamSlice]:
 87        return [
 88            StreamSlice(
 89                partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={}
 90            )
 91            for slice_value in self.values
 92        ]
 93
 94    def _get_request_option(
 95        self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
 96    ) -> Mapping[str, Any]:
 97        if (
 98            self.request_option
 99            and self.request_option.inject_into == request_option_type
100            and stream_slice
101        ):
102            slice_value = stream_slice.get(self._cursor_field.eval(self.config))
103            if slice_value:
104                options: MutableMapping[str, Any] = {}
105                self.request_option.inject_into_request(options, slice_value, self.config)
106                return options
107            else:
108                return {}
109        else:
110            return {}
111
112    def set_initial_state(self, stream_state: StreamState) -> None:
113        """
114        ListPartitionRouter doesn't have parent streams
115        """
116        pass
117
118    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
119        """
120        ListPartitionRouter doesn't have parent streams
121        """
122        pass

Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list

Attributes:
  • values (Union[str, List[str]]): The values to iterate over
  • cursor_field (Union[InterpolatedString, str]): The name of the cursor field
  • config (Config): The user-provided configuration as specified by the source's spec
  • request_option (Optional[RequestOption]): The request option to configure the HTTP request
ListPartitionRouter( values: Union[str, List[str]], cursor_field: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], request_option: Optional[airbyte_cdk.RequestOption] = None)
values: Union[str, List[str]]
cursor_field: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
request_option: Optional[airbyte_cdk.RequestOption] = None
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
50    def get_request_params(
51        self,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Mapping[str, Any]:
56        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
57        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
59    def get_request_headers(
60        self,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
66        return self._get_request_option(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
68    def get_request_body_data(
69        self,
70        stream_state: Optional[StreamState] = None,
71        stream_slice: Optional[StreamSlice] = None,
72        next_page_token: Optional[Mapping[str, Any]] = None,
73    ) -> Mapping[str, Any]:
74        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
75        return self._get_request_option(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
77    def get_request_body_json(
78        self,
79        stream_state: Optional[StreamState] = None,
80        stream_slice: Optional[StreamSlice] = None,
81        next_page_token: Optional[Mapping[str, Any]] = None,
82    ) -> Mapping[str, Any]:
83        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
84        return self._get_request_option(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
86    def stream_slices(self) -> Iterable[StreamSlice]:
87        return [
88            StreamSlice(
89                partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={}
90            )
91            for slice_value in self.values
92        ]

Defines stream slices

Returns

An iterable of stream slices

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
112    def set_initial_state(self, stream_state: StreamState) -> None:
113        """
114        ListPartitionRouter doesn't have parent streams
115        """
116        pass

ListPartitionRouter doesn't have parent streams

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
118    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
119        """
120        ListPartitionRouter doesn't have parent streams
121        """
122        pass

ListPartitionRouter doesn't have parent streams

@dataclass
class SinglePartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
13@dataclass
14class SinglePartitionRouter(PartitionRouter):
15    """Partition router returning only a stream slice"""
16
17    parameters: InitVar[Mapping[str, Any]]
18
19    def get_request_params(
20        self,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23        next_page_token: Optional[Mapping[str, Any]] = None,
24    ) -> Mapping[str, Any]:
25        return {}
26
27    def get_request_headers(
28        self,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        return {}
34
35    def get_request_body_data(
36        self,
37        stream_state: Optional[StreamState] = None,
38        stream_slice: Optional[StreamSlice] = None,
39        next_page_token: Optional[Mapping[str, Any]] = None,
40    ) -> Mapping[str, Any]:
41        return {}
42
43    def get_request_body_json(
44        self,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return {}
50
51    def stream_slices(self) -> Iterable[StreamSlice]:
52        yield StreamSlice(partition={}, cursor_slice={})
53
54    def set_initial_state(self, stream_state: StreamState) -> None:
55        """
56        SinglePartitionRouter doesn't have parent streams
57        """
58        pass
59
60    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
61        """
62        SinglePartitionRouter doesn't have parent streams
63        """
64        pass

Partition router returning only a stream slice

SinglePartitionRouter(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
19    def get_request_params(
20        self,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23        next_page_token: Optional[Mapping[str, Any]] = None,
24    ) -> Mapping[str, Any]:
25        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
27    def get_request_headers(
28        self,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
35    def get_request_body_data(
36        self,
37        stream_state: Optional[StreamState] = None,
38        stream_slice: Optional[StreamSlice] = None,
39        next_page_token: Optional[Mapping[str, Any]] = None,
40    ) -> Mapping[str, Any]:
41        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
43    def get_request_body_json(
44        self,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
51    def stream_slices(self) -> Iterable[StreamSlice]:
52        yield StreamSlice(partition={}, cursor_slice={})

Defines stream slices

Returns

An iterable of stream slices

def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
54    def set_initial_state(self, stream_state: StreamState) -> None:
55        """
56        SinglePartitionRouter doesn't have parent streams
57        """
58        pass

SinglePartitionRouter doesn't have parent streams

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
60    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
61        """
62        SinglePartitionRouter doesn't have parent streams
63        """
64        pass

SinglePartitionRouter doesn't have parent streams

@dataclass
class SubstreamPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 80@dataclass
 81class SubstreamPartitionRouter(PartitionRouter):
 82    """
 83    Partition router that iterates over the parent's stream records and emits slices
 84    Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components
 85
 86    Attributes:
 87        parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
 88    """
 89
 90    parent_stream_configs: List[ParentStreamConfig]
 91    config: Config
 92    parameters: InitVar[Mapping[str, Any]]
 93
 94    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 95        if not self.parent_stream_configs:
 96            raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream")
 97        self._parameters = parameters
 98
 99    def get_request_params(
100        self,
101        stream_state: Optional[StreamState] = None,
102        stream_slice: Optional[StreamSlice] = None,
103        next_page_token: Optional[Mapping[str, Any]] = None,
104    ) -> Mapping[str, Any]:
105        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
106        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
107
108    def get_request_headers(
109        self,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
115        return self._get_request_option(RequestOptionType.header, stream_slice)
116
117    def get_request_body_data(
118        self,
119        stream_state: Optional[StreamState] = None,
120        stream_slice: Optional[StreamSlice] = None,
121        next_page_token: Optional[Mapping[str, Any]] = None,
122    ) -> Mapping[str, Any]:
123        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
124        return self._get_request_option(RequestOptionType.body_data, stream_slice)
125
126    def get_request_body_json(
127        self,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
133        return self._get_request_option(RequestOptionType.body_json, stream_slice)
134
135    def _get_request_option(
136        self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
137    ) -> Mapping[str, Any]:
138        params: MutableMapping[str, Any] = {}
139        if stream_slice:
140            for parent_config in self.parent_stream_configs:
141                if (
142                    parent_config.request_option
143                    and parent_config.request_option.inject_into == option_type
144                ):
145                    key = parent_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
146                    value = stream_slice.get(key)
147                    if value:
148                        parent_config.request_option.inject_into_request(params, value, self.config)
149        return params
150
151    def stream_slices(self) -> Iterable[StreamSlice]:
152        """
153        Iterate over each parent stream's record and create a StreamSlice for each record.
154
155        For each stream, iterate over its stream_slices.
156        For each stream slice, iterate over each record.
157        yield a stream slice for each such records.
158
159        If a parent slice contains no record, emit a slice with parent_record=None.
160
161        The template string can interpolate the following values:
162        - parent_stream_slice: mapping representing the parent's stream slice
163        - parent_record: mapping representing the parent record
164        - parent_stream_name: string representing the parent stream name
165        """
166        if not self.parent_stream_configs:
167            yield from []
168        else:
169            for parent_stream_config in self.parent_stream_configs:
170                parent_stream = parent_stream_config.stream
171                parent_field = parent_stream_config.parent_key.eval(self.config)  # type: ignore # parent_key is always casted to an interpolated string
172                partition_field = parent_stream_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
173                extra_fields = None
174                if parent_stream_config.extra_fields:
175                    extra_fields = [
176                        [field_path_part.eval(self.config) for field_path_part in field_path]  # type: ignore [union-attr]
177                        for field_path in parent_stream_config.extra_fields
178                    ]
179
180                # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
181                # not support either substreams or RFR, but something that needs to be considered once we do
182                for parent_record in parent_stream.read_only_records():
183                    parent_partition = None
184                    # Skip non-records (eg AirbyteLogMessage)
185                    if isinstance(parent_record, AirbyteMessage):
186                        self.logger.warning(
187                            f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
188                        )
189                        if parent_record.type == MessageType.RECORD:
190                            parent_record = parent_record.record.data  # type: ignore[union-attr, assignment]  # record is always a Record
191                        else:
192                            continue
193                    elif isinstance(parent_record, Record):
194                        parent_partition = (
195                            parent_record.associated_slice.partition
196                            if parent_record.associated_slice
197                            else {}
198                        )
199                        parent_record = parent_record.data
200                    elif not isinstance(parent_record, Mapping):
201                        # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
202                        raise AirbyteTracedException(
203                            message=f"Parent stream returned records as invalid type {type(parent_record)}"
204                        )
205                    try:
206                        partition_value = dpath.get(
207                            parent_record,  # type: ignore [arg-type]
208                            parent_field,
209                        )
210                    except KeyError:
211                        continue
212
213                    # Add extra fields
214                    extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields)
215
216                    if parent_stream_config.lazy_read_pointer:
217                        extracted_extra_fields = {
218                            "child_response": self._extract_child_response(
219                                parent_record,
220                                parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
221                            ),
222                            **extracted_extra_fields,
223                        }
224
225                    yield StreamSlice(
226                        partition={
227                            partition_field: partition_value,
228                            "parent_slice": parent_partition or {},
229                        },
230                        cursor_slice={},
231                        extra_fields=extracted_extra_fields,
232                    )
233
234    def _extract_child_response(
235        self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
236    ) -> requests.Response:
237        """Extract child records from a parent record based on lazy pointers."""
238
239        def _create_response(data: MutableMapping[str, Any]) -> SafeResponse:
240            """Create a SafeResponse with the given data."""
241            response = SafeResponse()
242            response.content = json.dumps(data).encode("utf-8")
243            response.status_code = 200
244            return response
245
246        path = [path.eval(self.config) for path in pointer]
247        return _create_response(dpath.get(parent_record, path, default=[]))  # type: ignore # argunet will be a MutableMapping, given input data structure
248
249    def _extract_extra_fields(
250        self,
251        parent_record: Mapping[str, Any] | AirbyteMessage,
252        extra_fields: Optional[List[List[str]]] = None,
253    ) -> Mapping[str, Any]:
254        """
255        Extracts additional fields specified by their paths from the parent record.
256
257        Args:
258            parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from.
259            extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record.
260
261        Returns:
262            Mapping[str, Any]: A dictionary containing the extracted fields.
263                               The keys are the joined field paths, and the values are the corresponding extracted values.
264        """
265        extracted_extra_fields = {}
266        if extra_fields:
267            for extra_field_path in extra_fields:
268                try:
269                    extra_field_value = dpath.get(
270                        parent_record,  # type: ignore [arg-type]
271                        extra_field_path,
272                    )
273                    self.logger.debug(
274                        f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}"
275                    )
276                except KeyError:
277                    self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}")
278                    extra_field_value = None
279                extracted_extra_fields[".".join(extra_field_path)] = extra_field_value
280        return extracted_extra_fields
281
282    def set_initial_state(self, stream_state: StreamState) -> None:
283        """
284        Set the state of the parent streams.
285
286        If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format.
287        This migration applies only to parent streams with incremental dependencies.
288
289        Args:
290            stream_state (StreamState): The state of the streams to be set.
291
292        Example of state format:
293        {
294            "parent_state": {
295                "parent_stream_name1": {
296                    "last_updated": "2023-05-27T00:00:00Z"
297                },
298                "parent_stream_name2": {
299                    "last_updated": "2023-05-27T00:00:00Z"
300                }
301            }
302        }
303
304        Example of migrating to parent state format:
305        - Initial state:
306        {
307            "updated_at": "2023-05-27T00:00:00Z"
308        }
309        - After migration:
310        {
311            "updated_at": "2023-05-27T00:00:00Z",
312            "parent_state": {
313                "parent_stream_name": {
314                    "parent_stream_cursor": "2023-05-27T00:00:00Z"
315                }
316            }
317        }
318        """
319        if not stream_state:
320            return
321
322        parent_state = stream_state.get("parent_state", {})
323
324        # Set state for each parent stream with an incremental dependency
325        for parent_config in self.parent_stream_configs:
326            if (
327                not parent_state.get(parent_config.stream.name, {})
328                and parent_config.incremental_dependency
329            ):
330                # Migrate child state to parent state format
331                parent_state = self._migrate_child_state_to_parent_state(stream_state)
332
333            if parent_config.incremental_dependency:
334                parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
335
336    def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState:
337        """
338        Migrate the child or global stream state into the parent stream's state format.
339
340        This method converts the child stream state—or, if present, the global state—into a format that is
341        compatible with parent streams that use incremental synchronization. The migration occurs only for
342        parent streams with incremental dependencies. It filters out per-partition states and retains only the
343        global state in the form {cursor_field: cursor_value}.
344
345        The method supports multiple input formats:
346          - A simple global state, e.g.:
347                {"updated_at": "2023-05-27T00:00:00Z"}
348          - A state object that contains a "state" key (which is assumed to hold the global state), e.g.:
349                {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
350            In this case, the migration uses the first value from the "state" dictionary.
351          - Any per-partition state formats or other non-simple structures are ignored during migration.
352
353        Args:
354            stream_state (StreamState): The state to migrate. Expected formats include:
355                - {"updated_at": "2023-05-27T00:00:00Z"}
356                - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
357                  (In this format, only the first global state value is used, and per-partition states are ignored.)
358
359        Returns:
360            StreamState: A migrated state for parent streams in the format:
361                {
362                    "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
363                }
364            where each parent stream with an incremental dependency is assigned its corresponding cursor value.
365
366        Example:
367            Input: {"updated_at": "2023-05-27T00:00:00Z"}
368            Output: {
369                "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
370            }
371        """
372        substream_state_values = list(stream_state.values())
373        substream_state = substream_state_values[0] if substream_state_values else {}
374
375        # Ignore per-partition states or invalid formats.
376        if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1:
377            # If a global state is present under the key "state", use its first value.
378            if "state" in stream_state and isinstance(stream_state["state"], dict):
379                substream_state = list(stream_state["state"].values())[0]
380            else:
381                return {}
382
383        # Build the parent state for all parent streams with incremental dependencies.
384        parent_state = {}
385        if substream_state:
386            for parent_config in self.parent_stream_configs:
387                if parent_config.incremental_dependency:
388                    parent_state[parent_config.stream.name] = {
389                        parent_config.stream.cursor_field: substream_state
390                    }
391
392        return parent_state
393
394    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
395        """
396        Get the state of the parent streams.
397
398        Returns:
399            StreamState: The current state of the parent streams.
400
401        Example of state format:
402        {
403            "parent_stream_name1": {
404                "last_updated": "2023-05-27T00:00:00Z"
405            },
406            "parent_stream_name2": {
407                "last_updated": "2023-05-27T00:00:00Z"
408            }
409        }
410        """
411        parent_state = {}
412        for parent_config in self.parent_stream_configs:
413            if parent_config.incremental_dependency:
414                parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state)
415        return parent_state
416
417    @property
418    def logger(self) -> logging.Logger:
419        return logging.getLogger("airbyte.SubstreamPartitionRouter")

Partition router that iterates over the parent's stream records and emits slices Will populate the state with partition_field and parent_slice so they can be accessed by other components

Attributes:
  • parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
SubstreamPartitionRouter( parent_stream_configs: List[airbyte_cdk.ParentStreamConfig], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parent_stream_configs: List[airbyte_cdk.ParentStreamConfig]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 99    def get_request_params(
100        self,
101        stream_state: Optional[StreamState] = None,
102        stream_slice: Optional[StreamSlice] = None,
103        next_page_token: Optional[Mapping[str, Any]] = None,
104    ) -> Mapping[str, Any]:
105        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
106        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
108    def get_request_headers(
109        self,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
115        return self._get_request_option(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
117    def get_request_body_data(
118        self,
119        stream_state: Optional[StreamState] = None,
120        stream_slice: Optional[StreamSlice] = None,
121        next_page_token: Optional[Mapping[str, Any]] = None,
122    ) -> Mapping[str, Any]:
123        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
124        return self._get_request_option(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
126    def get_request_body_json(
127        self,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
133        return self._get_request_option(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
151    def stream_slices(self) -> Iterable[StreamSlice]:
152        """
153        Iterate over each parent stream's record and create a StreamSlice for each record.
154
155        For each stream, iterate over its stream_slices.
156        For each stream slice, iterate over each record.
157        yield a stream slice for each such records.
158
159        If a parent slice contains no record, emit a slice with parent_record=None.
160
161        The template string can interpolate the following values:
162        - parent_stream_slice: mapping representing the parent's stream slice
163        - parent_record: mapping representing the parent record
164        - parent_stream_name: string representing the parent stream name
165        """
166        if not self.parent_stream_configs:
167            yield from []
168        else:
169            for parent_stream_config in self.parent_stream_configs:
170                parent_stream = parent_stream_config.stream
171                parent_field = parent_stream_config.parent_key.eval(self.config)  # type: ignore # parent_key is always casted to an interpolated string
172                partition_field = parent_stream_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
173                extra_fields = None
174                if parent_stream_config.extra_fields:
175                    extra_fields = [
176                        [field_path_part.eval(self.config) for field_path_part in field_path]  # type: ignore [union-attr]
177                        for field_path in parent_stream_config.extra_fields
178                    ]
179
180                # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
181                # not support either substreams or RFR, but something that needs to be considered once we do
182                for parent_record in parent_stream.read_only_records():
183                    parent_partition = None
184                    # Skip non-records (eg AirbyteLogMessage)
185                    if isinstance(parent_record, AirbyteMessage):
186                        self.logger.warning(
187                            f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
188                        )
189                        if parent_record.type == MessageType.RECORD:
190                            parent_record = parent_record.record.data  # type: ignore[union-attr, assignment]  # record is always a Record
191                        else:
192                            continue
193                    elif isinstance(parent_record, Record):
194                        parent_partition = (
195                            parent_record.associated_slice.partition
196                            if parent_record.associated_slice
197                            else {}
198                        )
199                        parent_record = parent_record.data
200                    elif not isinstance(parent_record, Mapping):
201                        # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
202                        raise AirbyteTracedException(
203                            message=f"Parent stream returned records as invalid type {type(parent_record)}"
204                        )
205                    try:
206                        partition_value = dpath.get(
207                            parent_record,  # type: ignore [arg-type]
208                            parent_field,
209                        )
210                    except KeyError:
211                        continue
212
213                    # Add extra fields
214                    extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields)
215
216                    if parent_stream_config.lazy_read_pointer:
217                        extracted_extra_fields = {
218                            "child_response": self._extract_child_response(
219                                parent_record,
220                                parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
221                            ),
222                            **extracted_extra_fields,
223                        }
224
225                    yield StreamSlice(
226                        partition={
227                            partition_field: partition_value,
228                            "parent_slice": parent_partition or {},
229                        },
230                        cursor_slice={},
231                        extra_fields=extracted_extra_fields,
232                    )

Iterate over each parent stream's record and create a StreamSlice for each record.

For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.

If a parent slice contains no record, emit a slice with parent_record=None.

The template string can interpolate the following values:

  • parent_stream_slice: mapping representing the parent's stream slice
  • parent_record: mapping representing the parent record
  • parent_stream_name: string representing the parent stream name
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
282    def set_initial_state(self, stream_state: StreamState) -> None:
283        """
284        Set the state of the parent streams.
285
286        If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format.
287        This migration applies only to parent streams with incremental dependencies.
288
289        Args:
290            stream_state (StreamState): The state of the streams to be set.
291
292        Example of state format:
293        {
294            "parent_state": {
295                "parent_stream_name1": {
296                    "last_updated": "2023-05-27T00:00:00Z"
297                },
298                "parent_stream_name2": {
299                    "last_updated": "2023-05-27T00:00:00Z"
300                }
301            }
302        }
303
304        Example of migrating to parent state format:
305        - Initial state:
306        {
307            "updated_at": "2023-05-27T00:00:00Z"
308        }
309        - After migration:
310        {
311            "updated_at": "2023-05-27T00:00:00Z",
312            "parent_state": {
313                "parent_stream_name": {
314                    "parent_stream_cursor": "2023-05-27T00:00:00Z"
315                }
316            }
317        }
318        """
319        if not stream_state:
320            return
321
322        parent_state = stream_state.get("parent_state", {})
323
324        # Set state for each parent stream with an incremental dependency
325        for parent_config in self.parent_stream_configs:
326            if (
327                not parent_state.get(parent_config.stream.name, {})
328                and parent_config.incremental_dependency
329            ):
330                # Migrate child state to parent state format
331                parent_state = self._migrate_child_state_to_parent_state(stream_state)
332
333            if parent_config.incremental_dependency:
334                parent_config.stream.state = parent_state.get(parent_config.stream.name, {})

Set the state of the parent streams.

If the parent_state key is missing from stream_state, migrate the child stream state to the parent stream's state format. This migration applies only to parent streams with incremental dependencies.

Arguments:
  • stream_state (StreamState): The state of the streams to be set.

Example of state format: { "parent_state": { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } } }

Example of migrating to parent state format:

  • Initial state: { "updated_at": "2023-05-27T00:00:00Z" }
  • After migration: { "updated_at": "2023-05-27T00:00:00Z", "parent_state": { "parent_stream_name": { "parent_stream_cursor": "2023-05-27T00:00:00Z" } } }
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
394    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
395        """
396        Get the state of the parent streams.
397
398        Returns:
399            StreamState: The current state of the parent streams.
400
401        Example of state format:
402        {
403            "parent_stream_name1": {
404                "last_updated": "2023-05-27T00:00:00Z"
405            },
406            "parent_stream_name2": {
407                "last_updated": "2023-05-27T00:00:00Z"
408            }
409        }
410        """
411        parent_state = {}
412        for parent_config in self.parent_stream_configs:
413            if parent_config.incremental_dependency:
414                parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state)
415        return parent_state

Get the state of the parent streams.

Returns:

StreamState: The current state of the parent streams.

Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }

logger: logging.Logger
417    @property
418    def logger(self) -> logging.Logger:
419        return logging.getLogger("airbyte.SubstreamPartitionRouter")
@dataclass
class PartitionRouter(airbyte_cdk.sources.declarative.stream_slicers.stream_slicer.StreamSlicer):
14@dataclass
15class PartitionRouter(StreamSlicer):
16    """
17    Base class for partition routers.
18    Methods:
19        set_parent_state(stream_state): Set the state of the parent streams.
20        get_parent_state(): Get the state of the parent streams.
21    """
22
23    @abstractmethod
24    def set_initial_state(self, stream_state: StreamState) -> None:
25        """
26        Set the state of the parent streams.
27
28        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
29        incrementally using the state.
30
31        Args:
32            stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
33                                        'parent_state' which is a dictionary of parent state names to their corresponding state.
34                Example:
35                {
36                    "parent_state": {
37                        "parent_stream_name_1": { ... },
38                        "parent_stream_name_2": { ... },
39                        ...
40                    }
41                }
42        """
43
44    @abstractmethod
45    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
46        """
47        Get the state of the parent streams.
48
49        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
50        incrementally using the state.
51
52        Returns:
53            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
54                 The returned format will be:
55                 {
56                     "parent_stream_name1": {
57                         "last_updated": "2023-05-27T00:00:00Z"
58                     },
59                     "parent_stream_name2": {
60                         "last_updated": "2023-05-27T00:00:00Z"
61                     }
62                 }
63        """

Base class for partition routers.

Methods:

set_parent_state(stream_state): Set the state of the parent streams. get_parent_state(): Get the state of the parent streams.

@abstractmethod
def set_initial_state(self, stream_state: Mapping[str, Any]) -> None:
23    @abstractmethod
24    def set_initial_state(self, stream_state: StreamState) -> None:
25        """
26        Set the state of the parent streams.
27
28        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
29        incrementally using the state.
30
31        Args:
32            stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes
33                                        'parent_state' which is a dictionary of parent state names to their corresponding state.
34                Example:
35                {
36                    "parent_state": {
37                        "parent_stream_name_1": { ... },
38                        "parent_stream_name_2": { ... },
39                        ...
40                    }
41                }
42        """

Set the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.

Arguments:
  • stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 'parent_state' which is a dictionary of parent state names to their corresponding state. Example: { "parent_state": { "parent_stream_name_1": { ... }, "parent_stream_name_2": { ... }, ... } }
@abstractmethod
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
44    @abstractmethod
45    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
46        """
47        Get the state of the parent streams.
48
49        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
50        incrementally using the state.
51
52        Returns:
53            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
54                 The returned format will be:
55                 {
56                     "parent_stream_name1": {
57                         "last_updated": "2023-05-27T00:00:00Z"
58                     },
59                     "parent_stream_name2": {
60                         "last_updated": "2023-05-27T00:00:00Z"
61                     }
62                 }
63        """

Get the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.

Returns:

Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }