airbyte_cdk.sources.declarative.partition_routers

 1#
 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
 6    AsyncJobPartitionRouter,
 7)
 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
 9    CartesianProductStreamSlicer,
10)
11from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
12    GroupingPartitionRouter,
13)
14from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
15    ListPartitionRouter,
16)
17from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
18from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
19    SinglePartitionRouter,
20)
21from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
22    SubstreamPartitionRouter,
23)
24
25__all__ = [
26    "AsyncJobPartitionRouter",
27    "CartesianProductStreamSlicer",
28    "GroupingPartitionRouter",
29    "ListPartitionRouter",
30    "SinglePartitionRouter",
31    "SubstreamPartitionRouter",
32    "PartitionRouter",
33]
@dataclass
class AsyncJobPartitionRouter(airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer.StreamSlicer):
20@dataclass
21class AsyncJobPartitionRouter(StreamSlicer):
22    """
23    Partition router that creates async jobs in a source API, periodically polls for job
24    completion, and supplies the completed job URL locations as stream slices so that
25    records can be extracted.
26    """
27
28    config: Config
29    parameters: InitVar[Mapping[str, Any]]
30    job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator]
31    stream_slicer: StreamSlicer = field(
32        default_factory=lambda: SinglePartitionRouter(parameters={})
33    )
34
35    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
36        self._job_orchestrator_factory = self.job_orchestrator_factory
37        self._job_orchestrator: Optional[AsyncJobOrchestrator] = None
38        self._parameters = parameters
39
40    def stream_slices(self) -> Iterable[StreamSlice]:
41        slices = self.stream_slicer.stream_slices()
42        self._job_orchestrator = self._job_orchestrator_factory(slices)
43
44        for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
45            yield StreamSlice(
46                partition=dict(completed_partition.stream_slice.partition),
47                cursor_slice=completed_partition.stream_slice.cursor_slice,
48                extra_fields={"jobs": list(completed_partition.jobs)},
49            )
50
51    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
52        """
53        This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
54        be responsible for. However, this was added in because the JobOrchestrator is required to
55        retrieve records. And without defining fetch_records() on this class, we're stuck with either
56        passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
57        """
58
59        if not self._job_orchestrator:
60            raise AirbyteTracedException(
61                message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
62                internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
63                failure_type=FailureType.system_error,
64            )
65
66        return self._job_orchestrator.fetch_records(async_jobs=async_jobs)

Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.

AsyncJobPartitionRouter( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], job_orchestrator_factory: Callable[[Iterable[airbyte_cdk.StreamSlice]], airbyte_cdk.sources.declarative.async_job.job_orchestrator.AsyncJobOrchestrator], stream_slicer: airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer.StreamSlicer = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
40    def stream_slices(self) -> Iterable[StreamSlice]:
41        slices = self.stream_slicer.stream_slices()
42        self._job_orchestrator = self._job_orchestrator_factory(slices)
43
44        for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
45            yield StreamSlice(
46                partition=dict(completed_partition.stream_slice.partition),
47                cursor_slice=completed_partition.stream_slice.cursor_slice,
48                extra_fields={"jobs": list(completed_partition.jobs)},
49            )

Defines stream slices

Returns

An iterable of stream slices

def fetch_records( self, async_jobs: Iterable[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> Iterable[Mapping[str, Any]]:
51    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
52        """
53        This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
54        be responsible for. However, this was added in because the JobOrchestrator is required to
55        retrieve records. And without defining fetch_records() on this class, we're stuck with either
56        passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
57        """
58
59        if not self._job_orchestrator:
60            raise AirbyteTracedException(
61                message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
62                internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
63                failure_type=FailureType.system_error,
64            )
65
66        return self._job_orchestrator.fetch_records(async_jobs=async_jobs)

This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.

@dataclass
class CartesianProductStreamSlicer(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 40@dataclass
 41class CartesianProductStreamSlicer(PartitionRouter):
 42    """
 43    Stream slicers that iterates over the cartesian product of input stream slicers
 44    Given 2 stream slicers with the following slices:
 45    A: [{"i": 0}, {"i": 1}, {"i": 2}]
 46    B: [{"s": "hello"}, {"s": "world"}]
 47    the resulting stream slices are
 48    [
 49        {"i": 0, "s": "hello"},
 50        {"i": 0, "s": "world"},
 51        {"i": 1, "s": "hello"},
 52        {"i": 1, "s": "world"},
 53        {"i": 2, "s": "hello"},
 54        {"i": 2, "s": "world"},
 55    ]
 56
 57    Attributes:
 58        stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
 59    """
 60
 61    stream_slicers: List[PartitionRouter]
 62    parameters: InitVar[Mapping[str, Any]]
 63
 64    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 65        check_for_substream_in_slicers(self.stream_slicers, self.logger.warning)
 66
 67    def get_request_params(
 68        self,
 69        *,
 70        stream_state: Optional[StreamState] = None,
 71        stream_slice: Optional[StreamSlice] = None,
 72        next_page_token: Optional[Mapping[str, Any]] = None,
 73    ) -> Mapping[str, Any]:
 74        return dict(
 75            ChainMap(
 76                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 77                    s.get_request_params(
 78                        stream_state=stream_state,
 79                        stream_slice=stream_slice,
 80                        next_page_token=next_page_token,
 81                    )
 82                    for s in self.stream_slicers
 83                ]
 84            )
 85        )
 86
 87    def get_request_headers(
 88        self,
 89        *,
 90        stream_state: Optional[StreamState] = None,
 91        stream_slice: Optional[StreamSlice] = None,
 92        next_page_token: Optional[Mapping[str, Any]] = None,
 93    ) -> Mapping[str, Any]:
 94        return dict(
 95            ChainMap(
 96                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 97                    s.get_request_headers(
 98                        stream_state=stream_state,
 99                        stream_slice=stream_slice,
100                        next_page_token=next_page_token,
101                    )
102                    for s in self.stream_slicers
103                ]
104            )
105        )
106
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        return dict(
115            ChainMap(
116                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
117                    s.get_request_body_data(
118                        stream_state=stream_state,
119                        stream_slice=stream_slice,
120                        next_page_token=next_page_token,
121                    )
122                    for s in self.stream_slicers
123                ]
124            )
125        )
126
127    def get_request_body_json(
128        self,
129        *,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return dict(
135            ChainMap(
136                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
137                    s.get_request_body_json(
138                        stream_state=stream_state,
139                        stream_slice=stream_slice,
140                        next_page_token=next_page_token,
141                    )
142                    for s in self.stream_slicers
143                ]
144            )
145        )
146
147    def stream_slices(self) -> Iterable[StreamSlice]:
148        sub_slices = (s.stream_slices() for s in self.stream_slicers)
149        product = itertools.product(*sub_slices)
150        for stream_slice_tuple in product:
151            partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
152            cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice]
153            extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
154            if len(cursor_slices) > 1:
155                raise ValueError(
156                    f"There should only be a single cursor slice. Found {cursor_slices}"
157                )
158            if cursor_slices:
159                cursor_slice = cursor_slices[0]
160            else:
161                cursor_slice = {}
162            yield StreamSlice(
163                partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields
164            )
165
166    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
167        """
168        Parent stream states are not supported for cartesian product stream slicer
169        """
170        pass
171
172    @property
173    def logger(self) -> logging.Logger:
174        return logging.getLogger("airbyte.CartesianProductStreamSlicer")

Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]

Attributes:
  • stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
CartesianProductStreamSlicer( stream_slicers: List[PartitionRouter], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_slicers: List[PartitionRouter]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
67    def get_request_params(
68        self,
69        *,
70        stream_state: Optional[StreamState] = None,
71        stream_slice: Optional[StreamSlice] = None,
72        next_page_token: Optional[Mapping[str, Any]] = None,
73    ) -> Mapping[str, Any]:
74        return dict(
75            ChainMap(
76                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
77                    s.get_request_params(
78                        stream_state=stream_state,
79                        stream_slice=stream_slice,
80                        next_page_token=next_page_token,
81                    )
82                    for s in self.stream_slicers
83                ]
84            )
85        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 87    def get_request_headers(
 88        self,
 89        *,
 90        stream_state: Optional[StreamState] = None,
 91        stream_slice: Optional[StreamSlice] = None,
 92        next_page_token: Optional[Mapping[str, Any]] = None,
 93    ) -> Mapping[str, Any]:
 94        return dict(
 95            ChainMap(
 96                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 97                    s.get_request_headers(
 98                        stream_state=stream_state,
 99                        stream_slice=stream_slice,
100                        next_page_token=next_page_token,
101                    )
102                    for s in self.stream_slicers
103                ]
104            )
105        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        return dict(
115            ChainMap(
116                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
117                    s.get_request_body_data(
118                        stream_state=stream_state,
119                        stream_slice=stream_slice,
120                        next_page_token=next_page_token,
121                    )
122                    for s in self.stream_slicers
123                ]
124            )
125        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
127    def get_request_body_json(
128        self,
129        *,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return dict(
135            ChainMap(
136                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
137                    s.get_request_body_json(
138                        stream_state=stream_state,
139                        stream_slice=stream_slice,
140                        next_page_token=next_page_token,
141                    )
142                    for s in self.stream_slicers
143                ]
144            )
145        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
147    def stream_slices(self) -> Iterable[StreamSlice]:
148        sub_slices = (s.stream_slices() for s in self.stream_slicers)
149        product = itertools.product(*sub_slices)
150        for stream_slice_tuple in product:
151            partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
152            cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice]
153            extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
154            if len(cursor_slices) > 1:
155                raise ValueError(
156                    f"There should only be a single cursor slice. Found {cursor_slices}"
157                )
158            if cursor_slices:
159                cursor_slice = cursor_slices[0]
160            else:
161                cursor_slice = {}
162            yield StreamSlice(
163                partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields
164            )

Defines stream slices

Returns

An iterable of stream slices

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
166    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
167        """
168        Parent stream states are not supported for cartesian product stream slicer
169        """
170        pass

Parent stream states are not supported for cartesian product stream slicer

logger: logging.Logger
172    @property
173    def logger(self) -> logging.Logger:
174        return logging.getLogger("airbyte.CartesianProductStreamSlicer")
@dataclass
class GroupingPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 13@dataclass
 14class GroupingPartitionRouter(PartitionRouter):
 15    """
 16    A partition router that groups partitions from an underlying partition router into batches of a specified size.
 17    This is useful for APIs that support filtering by multiple partition keys in a single request.
 18
 19    Attributes:
 20        group_size (int): The number of partitions to include in each group.
 21        underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
 22        deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
 23        config (Config): The connector configuration.
 24        parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
 25    """
 26
 27    group_size: int
 28    underlying_partition_router: PartitionRouter
 29    config: Config
 30    deduplicate: bool = True
 31
 32    def __post_init__(self) -> None:
 33        self._state: Optional[Mapping[str, StreamState]] = {}
 34
 35    def stream_slices(self) -> Iterable[StreamSlice]:
 36        """
 37        Lazily groups partitions from the underlying partition router into batches of size `group_size`.
 38
 39        This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
 40        When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
 41        If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
 42
 43        Yields:
 44            Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
 45        """
 46        batch = []
 47        seen_keys = set()
 48
 49        # Iterate over partitions lazily from the underlying router
 50        for partition in self.underlying_partition_router.stream_slices():
 51            # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
 52            partition_keys = list(partition.partition.keys())
 53            # skip parent_slice as it is part of SubstreamPartitionRouter partition
 54            if "parent_slice" in partition_keys:
 55                partition_keys.remove("parent_slice")
 56            if len(partition_keys) != 1:
 57                raise ValueError(
 58                    f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}"
 59                )
 60            key = partition.partition[partition_keys[0]]
 61
 62            # Skip duplicates if deduplication is enabled
 63            if self.deduplicate and key in seen_keys:
 64                continue
 65
 66            # Add partition to the batch
 67            batch.append(partition)
 68            if self.deduplicate:
 69                seen_keys.add(key)
 70
 71            # Yield the batch when it reaches the group_size
 72            if len(batch) == self.group_size:
 73                self._state = self.underlying_partition_router.get_stream_state()
 74                yield self._create_grouped_slice(batch)
 75                batch = []  # Reset the batch
 76
 77        self._state = self.underlying_partition_router.get_stream_state()
 78        # Yield any remaining partitions if the batch isn't empty
 79        if batch:
 80            yield self._create_grouped_slice(batch)
 81
 82    def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
 83        """
 84        Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values.
 85
 86        Args:
 87            batch (list[StreamSlice]): A list of StreamSlice objects to group.
 88
 89        Returns:
 90            StreamSlice: A single StreamSlice with combined partition and extra field values.
 91        """
 92        # Combine partition values into a single dict with lists
 93        grouped_partition = {
 94            key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
 95        }
 96
 97        # Aggregate extra fields into a dict with list values
 98        extra_fields_dict = (
 99            {
100                key: [p.extra_fields.get(key) for p in batch]
101                for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields))
102            }
103            if any(p.extra_fields for p in batch)
104            else {}
105        )
106        return StreamSlice(
107            partition=grouped_partition,
108            cursor_slice={},  # Cursor is managed by the underlying router or incremental sync
109            extra_fields=extra_fields_dict,
110        )
111
112    def get_request_params(
113        self,
114        stream_state: Optional[StreamState] = None,
115        stream_slice: Optional[StreamSlice] = None,
116        next_page_token: Optional[Mapping[str, Any]] = None,
117    ) -> Mapping[str, Any]:
118        return {}
119
120    def get_request_headers(
121        self,
122        stream_state: Optional[StreamState] = None,
123        stream_slice: Optional[StreamSlice] = None,
124        next_page_token: Optional[Mapping[str, Any]] = None,
125    ) -> Mapping[str, Any]:
126        return {}
127
128    def get_request_body_data(
129        self,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return {}
135
136    def get_request_body_json(
137        self,
138        stream_state: Optional[StreamState] = None,
139        stream_slice: Optional[StreamSlice] = None,
140        next_page_token: Optional[Mapping[str, Any]] = None,
141    ) -> Mapping[str, Any]:
142        return {}
143
144    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
145        """Delegate state retrieval to the underlying partition router."""
146        return self._state

A partition router that groups partitions from an underlying partition router into batches of a specified size. This is useful for APIs that support filtering by multiple partition keys in a single request.

Attributes:
  • group_size (int): The number of partitions to include in each group.
  • underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
  • deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
  • config (Config): The connector configuration.
  • parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
GroupingPartitionRouter( group_size: int, underlying_partition_router: PartitionRouter, config: Mapping[str, Any], deduplicate: bool = True)
group_size: int
underlying_partition_router: PartitionRouter
config: Mapping[str, Any]
deduplicate: bool = True
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
35    def stream_slices(self) -> Iterable[StreamSlice]:
36        """
37        Lazily groups partitions from the underlying partition router into batches of size `group_size`.
38
39        This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
40        When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
41        If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
42
43        Yields:
44            Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
45        """
46        batch = []
47        seen_keys = set()
48
49        # Iterate over partitions lazily from the underlying router
50        for partition in self.underlying_partition_router.stream_slices():
51            # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
52            partition_keys = list(partition.partition.keys())
53            # skip parent_slice as it is part of SubstreamPartitionRouter partition
54            if "parent_slice" in partition_keys:
55                partition_keys.remove("parent_slice")
56            if len(partition_keys) != 1:
57                raise ValueError(
58                    f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}"
59                )
60            key = partition.partition[partition_keys[0]]
61
62            # Skip duplicates if deduplication is enabled
63            if self.deduplicate and key in seen_keys:
64                continue
65
66            # Add partition to the batch
67            batch.append(partition)
68            if self.deduplicate:
69                seen_keys.add(key)
70
71            # Yield the batch when it reaches the group_size
72            if len(batch) == self.group_size:
73                self._state = self.underlying_partition_router.get_stream_state()
74                yield self._create_grouped_slice(batch)
75                batch = []  # Reset the batch
76
77        self._state = self.underlying_partition_router.get_stream_state()
78        # Yield any remaining partitions if the batch isn't empty
79        if batch:
80            yield self._create_grouped_slice(batch)

Lazily groups partitions from the underlying partition router into batches of size group_size.

This method processes partitions one at a time from the underlying router, maintaining a batch buffer. When the buffer reaches group_size or the underlying router is exhausted, it yields a grouped slice. If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.

Yields:

Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.

def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
112    def get_request_params(
113        self,
114        stream_state: Optional[StreamState] = None,
115        stream_slice: Optional[StreamSlice] = None,
116        next_page_token: Optional[Mapping[str, Any]] = None,
117    ) -> Mapping[str, Any]:
118        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
120    def get_request_headers(
121        self,
122        stream_state: Optional[StreamState] = None,
123        stream_slice: Optional[StreamSlice] = None,
124        next_page_token: Optional[Mapping[str, Any]] = None,
125    ) -> Mapping[str, Any]:
126        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
128    def get_request_body_data(
129        self,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
136    def get_request_body_json(
137        self,
138        stream_state: Optional[StreamState] = None,
139        stream_slice: Optional[StreamSlice] = None,
140        next_page_token: Optional[Mapping[str, Any]] = None,
141    ) -> Mapping[str, Any]:
142        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
144    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
145        """Delegate state retrieval to the underlying partition router."""
146        return self._state

Delegate state retrieval to the underlying partition router.

@dataclass
class ListPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 18@dataclass
 19class ListPartitionRouter(PartitionRouter):
 20    """
 21    Partition router that iterates over the values of a list
 22    If values is a string, then evaluate it as literal and assert the resulting literal is a list
 23
 24    Attributes:
 25        values (Union[str, List[str]]): The values to iterate over
 26        cursor_field (Union[InterpolatedString, str]): The name of the cursor field
 27        config (Config): The user-provided configuration as specified by the source's spec
 28        request_option (Optional[RequestOption]): The request option to configure the HTTP request
 29    """
 30
 31    values: Union[str, List[str]]
 32    cursor_field: Union[InterpolatedString, str]
 33    config: Config
 34    parameters: InitVar[Mapping[str, Any]]
 35    request_option: Optional[RequestOption] = None
 36
 37    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 38        if isinstance(self.values, str):
 39            self.values = InterpolatedString.create(self.values, parameters=parameters).eval(
 40                self.config
 41            )
 42        self._cursor_field = (
 43            InterpolatedString(string=self.cursor_field, parameters=parameters)
 44            if isinstance(self.cursor_field, str)
 45            else self.cursor_field
 46        )
 47
 48        self._cursor = None
 49
 50    def get_request_params(
 51        self,
 52        stream_state: Optional[StreamState] = None,
 53        stream_slice: Optional[StreamSlice] = None,
 54        next_page_token: Optional[Mapping[str, Any]] = None,
 55    ) -> Mapping[str, Any]:
 56        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 57        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
 58
 59    def get_request_headers(
 60        self,
 61        stream_state: Optional[StreamState] = None,
 62        stream_slice: Optional[StreamSlice] = None,
 63        next_page_token: Optional[Mapping[str, Any]] = None,
 64    ) -> Mapping[str, Any]:
 65        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 66        return self._get_request_option(RequestOptionType.header, stream_slice)
 67
 68    def get_request_body_data(
 69        self,
 70        stream_state: Optional[StreamState] = None,
 71        stream_slice: Optional[StreamSlice] = None,
 72        next_page_token: Optional[Mapping[str, Any]] = None,
 73    ) -> Mapping[str, Any]:
 74        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 75        return self._get_request_option(RequestOptionType.body_data, stream_slice)
 76
 77    def get_request_body_json(
 78        self,
 79        stream_state: Optional[StreamState] = None,
 80        stream_slice: Optional[StreamSlice] = None,
 81        next_page_token: Optional[Mapping[str, Any]] = None,
 82    ) -> Mapping[str, Any]:
 83        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 84        return self._get_request_option(RequestOptionType.body_json, stream_slice)
 85
 86    def stream_slices(self) -> Iterable[StreamSlice]:
 87        return [
 88            StreamSlice(
 89                partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={}
 90            )
 91            for slice_value in self.values
 92        ]
 93
 94    def _get_request_option(
 95        self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
 96    ) -> Mapping[str, Any]:
 97        if (
 98            self.request_option
 99            and self.request_option.inject_into == request_option_type
100            and stream_slice
101        ):
102            slice_value = stream_slice.get(self._cursor_field.eval(self.config))
103            if slice_value:
104                options: MutableMapping[str, Any] = {}
105                self.request_option.inject_into_request(options, slice_value, self.config)
106                return options
107            else:
108                return {}
109        else:
110            return {}
111
112    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
113        """
114        ListPartitionRouter doesn't have parent streams
115        """
116        pass

Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list

Attributes:
  • values (Union[str, List[str]]): The values to iterate over
  • cursor_field (Union[InterpolatedString, str]): The name of the cursor field
  • config (Config): The user-provided configuration as specified by the source's spec
  • request_option (Optional[RequestOption]): The request option to configure the HTTP request
ListPartitionRouter( values: Union[str, List[str]], cursor_field: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], request_option: Optional[airbyte_cdk.RequestOption] = None)
values: Union[str, List[str]]
cursor_field: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
request_option: Optional[airbyte_cdk.RequestOption] = None
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
50    def get_request_params(
51        self,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Mapping[str, Any]:
56        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
57        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
59    def get_request_headers(
60        self,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
66        return self._get_request_option(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
68    def get_request_body_data(
69        self,
70        stream_state: Optional[StreamState] = None,
71        stream_slice: Optional[StreamSlice] = None,
72        next_page_token: Optional[Mapping[str, Any]] = None,
73    ) -> Mapping[str, Any]:
74        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
75        return self._get_request_option(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
77    def get_request_body_json(
78        self,
79        stream_state: Optional[StreamState] = None,
80        stream_slice: Optional[StreamSlice] = None,
81        next_page_token: Optional[Mapping[str, Any]] = None,
82    ) -> Mapping[str, Any]:
83        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
84        return self._get_request_option(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
86    def stream_slices(self) -> Iterable[StreamSlice]:
87        return [
88            StreamSlice(
89                partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={}
90            )
91            for slice_value in self.values
92        ]

Defines stream slices

Returns

An iterable of stream slices

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
112    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
113        """
114        ListPartitionRouter doesn't have parent streams
115        """
116        pass

ListPartitionRouter doesn't have parent streams

@dataclass
class SinglePartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
13@dataclass
14class SinglePartitionRouter(PartitionRouter):
15    """Partition router returning only a stream slice"""
16
17    parameters: InitVar[Mapping[str, Any]]
18
19    def get_request_params(
20        self,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23        next_page_token: Optional[Mapping[str, Any]] = None,
24    ) -> Mapping[str, Any]:
25        return {}
26
27    def get_request_headers(
28        self,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        return {}
34
35    def get_request_body_data(
36        self,
37        stream_state: Optional[StreamState] = None,
38        stream_slice: Optional[StreamSlice] = None,
39        next_page_token: Optional[Mapping[str, Any]] = None,
40    ) -> Mapping[str, Any]:
41        return {}
42
43    def get_request_body_json(
44        self,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return {}
50
51    def stream_slices(self) -> Iterable[StreamSlice]:
52        yield StreamSlice(partition={}, cursor_slice={})
53
54    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
55        """
56        SinglePartitionRouter doesn't have parent streams
57        """
58        pass

Partition router returning only a stream slice

SinglePartitionRouter(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
19    def get_request_params(
20        self,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23        next_page_token: Optional[Mapping[str, Any]] = None,
24    ) -> Mapping[str, Any]:
25        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
27    def get_request_headers(
28        self,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
35    def get_request_body_data(
36        self,
37        stream_state: Optional[StreamState] = None,
38        stream_slice: Optional[StreamSlice] = None,
39        next_page_token: Optional[Mapping[str, Any]] = None,
40    ) -> Mapping[str, Any]:
41        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
43    def get_request_body_json(
44        self,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
51    def stream_slices(self) -> Iterable[StreamSlice]:
52        yield StreamSlice(partition={}, cursor_slice={})

Defines stream slices

Returns

An iterable of stream slices

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
54    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
55        """
56        SinglePartitionRouter doesn't have parent streams
57        """
58        pass

SinglePartitionRouter doesn't have parent streams

@dataclass
class SubstreamPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
107@dataclass
108class SubstreamPartitionRouter(PartitionRouter):
109    """
110    Partition router that iterates over the parent's stream records and emits slices
111    Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components
112
113    Attributes:
114        parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
115    """
116
117    parent_stream_configs: List[ParentStreamConfig]
118    config: Config
119    parameters: InitVar[Mapping[str, Any]]
120
121    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
122        if not self.parent_stream_configs:
123            raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream")
124        self._parameters = parameters
125
126    def get_request_params(
127        self,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
133        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
134
135    def get_request_headers(
136        self,
137        stream_state: Optional[StreamState] = None,
138        stream_slice: Optional[StreamSlice] = None,
139        next_page_token: Optional[Mapping[str, Any]] = None,
140    ) -> Mapping[str, Any]:
141        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
142        return self._get_request_option(RequestOptionType.header, stream_slice)
143
144    def get_request_body_data(
145        self,
146        stream_state: Optional[StreamState] = None,
147        stream_slice: Optional[StreamSlice] = None,
148        next_page_token: Optional[Mapping[str, Any]] = None,
149    ) -> Mapping[str, Any]:
150        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
151        return self._get_request_option(RequestOptionType.body_data, stream_slice)
152
153    def get_request_body_json(
154        self,
155        stream_state: Optional[StreamState] = None,
156        stream_slice: Optional[StreamSlice] = None,
157        next_page_token: Optional[Mapping[str, Any]] = None,
158    ) -> Mapping[str, Any]:
159        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
160        return self._get_request_option(RequestOptionType.body_json, stream_slice)
161
162    def _get_request_option(
163        self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
164    ) -> Mapping[str, Any]:
165        params: MutableMapping[str, Any] = {}
166        if stream_slice:
167            for parent_config in self.parent_stream_configs:
168                if (
169                    parent_config.request_option
170                    and parent_config.request_option.inject_into == option_type
171                ):
172                    key = parent_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
173                    value = stream_slice.get(key)
174                    if value:
175                        parent_config.request_option.inject_into_request(params, value, self.config)
176        return params
177
178    def stream_slices(self) -> Iterable[StreamSlice]:
179        """
180        Iterate over each parent stream's record and create a StreamSlice for each record.
181
182        For each stream, iterate over its stream_slices.
183        For each stream slice, iterate over each record.
184        yield a stream slice for each such records.
185
186        If a parent slice contains no record, emit a slice with parent_record=None.
187
188        The template string can interpolate the following values:
189        - parent_stream_slice: mapping representing the parent's stream slice
190        - parent_record: mapping representing the parent record
191        - parent_stream_name: string representing the parent stream name
192        """
193        if not self.parent_stream_configs:
194            yield from []
195        else:
196            for parent_stream_config in self.parent_stream_configs:
197                parent_stream = parent_stream_config.stream
198                parent_field = parent_stream_config.parent_key.eval(self.config)  # type: ignore # parent_key is always casted to an interpolated string
199                partition_field = parent_stream_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
200                extra_fields = None
201                if parent_stream_config.extra_fields:
202                    extra_fields = [
203                        [field_path_part.eval(self.config) for field_path_part in field_path]  # type: ignore [union-attr]
204                        for field_path in parent_stream_config.extra_fields
205                    ]
206
207                for partition, is_last_slice in iterate_with_last_flag(
208                    parent_stream.generate_partitions()
209                ):
210                    for parent_record, is_last_record_in_slice in iterate_with_last_flag(
211                        partition.read()
212                    ):
213                        # In the previous CDK implementation, state management was done internally by the stream.
214                        # However, this could cause issues when doing availability check for example as the availability
215                        # check would progress the state so state management was moved outside of the read method.
216                        # Hence, we need to call the cursor here.
217                        # Note that we call observe and close_partition before emitting the associated record as the
218                        # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
219                        # record was consumed.
220                        parent_stream.cursor.observe(parent_record)
221                        parent_partition = (
222                            parent_record.associated_slice.partition
223                            if parent_record.associated_slice
224                            else {}
225                        )
226                        record_data = parent_record.data
227
228                        try:
229                            partition_value = dpath.get(
230                                record_data,  # type: ignore [arg-type]
231                                parent_field,
232                            )
233                        except KeyError:
234                            # FIXME a log here would go a long way for debugging
235                            continue
236
237                        # Add extra fields
238                        extracted_extra_fields = self._extract_extra_fields(
239                            record_data, extra_fields
240                        )
241
242                        if parent_stream_config.lazy_read_pointer:
243                            extracted_extra_fields = {
244                                "child_response": self._extract_child_response(
245                                    record_data,
246                                    parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
247                                ),
248                                **extracted_extra_fields,
249                            }
250
251                        if is_last_record_in_slice:
252                            parent_stream.cursor.close_partition(partition)
253                            if is_last_slice:
254                                parent_stream.cursor.ensure_at_least_one_state_emitted()
255
256                        yield StreamSlice(
257                            partition={
258                                partition_field: partition_value,
259                                "parent_slice": parent_partition or {},
260                            },
261                            cursor_slice={},
262                            extra_fields=extracted_extra_fields,
263                        )
264
265                yield from []
266
267    def _extract_child_response(
268        self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
269    ) -> requests.Response:
270        """Extract child records from a parent record based on lazy pointers."""
271
272        def _create_response(data: MutableMapping[str, Any]) -> SafeResponse:
273            """Create a SafeResponse with the given data."""
274            response = SafeResponse()
275            response.content = json.dumps(data).encode("utf-8")
276            response.status_code = 200
277            return response
278
279        path = [path.eval(self.config) for path in pointer]
280        return _create_response(dpath.get(parent_record, path, default=[]))  # type: ignore # argunet will be a MutableMapping, given input data structure
281
282    def _extract_extra_fields(
283        self,
284        parent_record: Mapping[str, Any] | AirbyteMessage,
285        extra_fields: Optional[List[List[str]]] = None,
286    ) -> Mapping[str, Any]:
287        """
288        Extracts additional fields specified by their paths from the parent record.
289
290        Args:
291            parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from.
292            extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record.
293
294        Returns:
295            Mapping[str, Any]: A dictionary containing the extracted fields.
296                               The keys are the joined field paths, and the values are the corresponding extracted values.
297        """
298        extracted_extra_fields = {}
299        if extra_fields:
300            for extra_field_path in extra_fields:
301                try:
302                    extra_field_value = dpath.get(
303                        parent_record,  # type: ignore [arg-type]
304                        extra_field_path,
305                    )
306                    self.logger.debug(
307                        f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}"
308                    )
309                except KeyError:
310                    self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}")
311                    extra_field_value = None
312                extracted_extra_fields[".".join(extra_field_path)] = extra_field_value
313        return extracted_extra_fields
314
315    def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState:
316        """
317        Migrate the child or global stream state into the parent stream's state format.
318
319        This method converts the child stream state—or, if present, the global state—into a format that is
320        compatible with parent streams that use incremental synchronization. The migration occurs only for
321        parent streams with incremental dependencies. It filters out per-partition states and retains only the
322        global state in the form {cursor_field: cursor_value}.
323
324        The method supports multiple input formats:
325          - A simple global state, e.g.:
326                {"updated_at": "2023-05-27T00:00:00Z"}
327          - A state object that contains a "state" key (which is assumed to hold the global state), e.g.:
328                {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
329            In this case, the migration uses the first value from the "state" dictionary.
330          - Any per-partition state formats or other non-simple structures are ignored during migration.
331
332        Args:
333            stream_state (StreamState): The state to migrate. Expected formats include:
334                - {"updated_at": "2023-05-27T00:00:00Z"}
335                - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
336                  (In this format, only the first global state value is used, and per-partition states are ignored.)
337
338        Returns:
339            StreamState: A migrated state for parent streams in the format:
340                {
341                    "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
342                }
343            where each parent stream with an incremental dependency is assigned its corresponding cursor value.
344
345        Example:
346            Input: {"updated_at": "2023-05-27T00:00:00Z"}
347            Output: {
348                "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
349            }
350        """
351        substream_state_values = list(stream_state.values())
352        substream_state = substream_state_values[0] if substream_state_values else {}
353
354        # Ignore per-partition states or invalid formats.
355        if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1:
356            # If a global state is present under the key "state", use its first value.
357            if (
358                "state" in stream_state
359                and isinstance(stream_state["state"], dict)
360                and stream_state["state"] != {}
361            ):
362                substream_state = list(stream_state["state"].values())[0]
363            else:
364                return {}
365
366        # Build the parent state for all parent streams with incremental dependencies.
367        parent_state = {}
368        if substream_state:
369            for parent_config in self.parent_stream_configs:
370                if parent_config.incremental_dependency:
371                    parent_state[parent_config.stream.name] = {
372                        parent_config.stream.cursor_field: substream_state
373                    }
374
375        return parent_state
376
377    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
378        """
379        Get the state of the parent streams.
380
381        Returns:
382            StreamState: The current state of the parent streams.
383
384        Example of state format:
385        {
386            "parent_stream_name1": {
387                "last_updated": "2023-05-27T00:00:00Z"
388            },
389            "parent_stream_name2": {
390                "last_updated": "2023-05-27T00:00:00Z"
391            }
392        }
393        """
394        parent_state = {}
395        for parent_config in self.parent_stream_configs:
396            if parent_config.incremental_dependency:
397                parent_state[parent_config.stream.name] = copy.deepcopy(
398                    parent_config.stream.cursor.state
399                )
400        return parent_state
401
402    @property
403    def logger(self) -> logging.Logger:
404        return logging.getLogger("airbyte.SubstreamPartitionRouter")

Partition router that iterates over the parent's stream records and emits slices Will populate the state with partition_field and parent_slice so they can be accessed by other components

Attributes:
  • parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
SubstreamPartitionRouter( parent_stream_configs: List[airbyte_cdk.ParentStreamConfig], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parent_stream_configs: List[airbyte_cdk.ParentStreamConfig]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
126    def get_request_params(
127        self,
128        stream_state: Optional[StreamState] = None,
129        stream_slice: Optional[StreamSlice] = None,
130        next_page_token: Optional[Mapping[str, Any]] = None,
131    ) -> Mapping[str, Any]:
132        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
133        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
135    def get_request_headers(
136        self,
137        stream_state: Optional[StreamState] = None,
138        stream_slice: Optional[StreamSlice] = None,
139        next_page_token: Optional[Mapping[str, Any]] = None,
140    ) -> Mapping[str, Any]:
141        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
142        return self._get_request_option(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
144    def get_request_body_data(
145        self,
146        stream_state: Optional[StreamState] = None,
147        stream_slice: Optional[StreamSlice] = None,
148        next_page_token: Optional[Mapping[str, Any]] = None,
149    ) -> Mapping[str, Any]:
150        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
151        return self._get_request_option(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
153    def get_request_body_json(
154        self,
155        stream_state: Optional[StreamState] = None,
156        stream_slice: Optional[StreamSlice] = None,
157        next_page_token: Optional[Mapping[str, Any]] = None,
158    ) -> Mapping[str, Any]:
159        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
160        return self._get_request_option(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
178    def stream_slices(self) -> Iterable[StreamSlice]:
179        """
180        Iterate over each parent stream's record and create a StreamSlice for each record.
181
182        For each stream, iterate over its stream_slices.
183        For each stream slice, iterate over each record.
184        yield a stream slice for each such records.
185
186        If a parent slice contains no record, emit a slice with parent_record=None.
187
188        The template string can interpolate the following values:
189        - parent_stream_slice: mapping representing the parent's stream slice
190        - parent_record: mapping representing the parent record
191        - parent_stream_name: string representing the parent stream name
192        """
193        if not self.parent_stream_configs:
194            yield from []
195        else:
196            for parent_stream_config in self.parent_stream_configs:
197                parent_stream = parent_stream_config.stream
198                parent_field = parent_stream_config.parent_key.eval(self.config)  # type: ignore # parent_key is always casted to an interpolated string
199                partition_field = parent_stream_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
200                extra_fields = None
201                if parent_stream_config.extra_fields:
202                    extra_fields = [
203                        [field_path_part.eval(self.config) for field_path_part in field_path]  # type: ignore [union-attr]
204                        for field_path in parent_stream_config.extra_fields
205                    ]
206
207                for partition, is_last_slice in iterate_with_last_flag(
208                    parent_stream.generate_partitions()
209                ):
210                    for parent_record, is_last_record_in_slice in iterate_with_last_flag(
211                        partition.read()
212                    ):
213                        # In the previous CDK implementation, state management was done internally by the stream.
214                        # However, this could cause issues when doing availability check for example as the availability
215                        # check would progress the state so state management was moved outside of the read method.
216                        # Hence, we need to call the cursor here.
217                        # Note that we call observe and close_partition before emitting the associated record as the
218                        # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
219                        # record was consumed.
220                        parent_stream.cursor.observe(parent_record)
221                        parent_partition = (
222                            parent_record.associated_slice.partition
223                            if parent_record.associated_slice
224                            else {}
225                        )
226                        record_data = parent_record.data
227
228                        try:
229                            partition_value = dpath.get(
230                                record_data,  # type: ignore [arg-type]
231                                parent_field,
232                            )
233                        except KeyError:
234                            # FIXME a log here would go a long way for debugging
235                            continue
236
237                        # Add extra fields
238                        extracted_extra_fields = self._extract_extra_fields(
239                            record_data, extra_fields
240                        )
241
242                        if parent_stream_config.lazy_read_pointer:
243                            extracted_extra_fields = {
244                                "child_response": self._extract_child_response(
245                                    record_data,
246                                    parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
247                                ),
248                                **extracted_extra_fields,
249                            }
250
251                        if is_last_record_in_slice:
252                            parent_stream.cursor.close_partition(partition)
253                            if is_last_slice:
254                                parent_stream.cursor.ensure_at_least_one_state_emitted()
255
256                        yield StreamSlice(
257                            partition={
258                                partition_field: partition_value,
259                                "parent_slice": parent_partition or {},
260                            },
261                            cursor_slice={},
262                            extra_fields=extracted_extra_fields,
263                        )
264
265                yield from []

Iterate over each parent stream's record and create a StreamSlice for each record.

For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.

If a parent slice contains no record, emit a slice with parent_record=None.

The template string can interpolate the following values:

  • parent_stream_slice: mapping representing the parent's stream slice
  • parent_record: mapping representing the parent record
  • parent_stream_name: string representing the parent stream name
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
377    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
378        """
379        Get the state of the parent streams.
380
381        Returns:
382            StreamState: The current state of the parent streams.
383
384        Example of state format:
385        {
386            "parent_stream_name1": {
387                "last_updated": "2023-05-27T00:00:00Z"
388            },
389            "parent_stream_name2": {
390                "last_updated": "2023-05-27T00:00:00Z"
391            }
392        }
393        """
394        parent_state = {}
395        for parent_config in self.parent_stream_configs:
396            if parent_config.incremental_dependency:
397                parent_state[parent_config.stream.name] = copy.deepcopy(
398                    parent_config.stream.cursor.state
399                )
400        return parent_state

Get the state of the parent streams.

Returns:

StreamState: The current state of the parent streams.

Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }

logger: logging.Logger
402    @property
403    def logger(self) -> logging.Logger:
404        return logging.getLogger("airbyte.SubstreamPartitionRouter")
@dataclass
class PartitionRouter(airbyte_cdk.sources.declarative.stream_slicers.stream_slicer.StreamSlicer):
14@dataclass
15class PartitionRouter(StreamSlicer):
16    """
17    Base class for partition routers.
18    Methods:
19        get_stream_state(): Get the state of the parent streams.
20    """
21
22    @abstractmethod
23    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
24        """
25        Get the state of the parent streams.
26
27        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
28        incrementally using the state.
29
30        Returns:
31            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
32                 The returned format will be:
33                 {
34                     "parent_stream_name1": {
35                         "last_updated": "2023-05-27T00:00:00Z"
36                     },
37                     "parent_stream_name2": {
38                         "last_updated": "2023-05-27T00:00:00Z"
39                     }
40                 }
41        """

Base class for partition routers.

Methods:

get_stream_state(): Get the state of the parent streams.

@abstractmethod
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
22    @abstractmethod
23    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
24        """
25        Get the state of the parent streams.
26
27        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
28        incrementally using the state.
29
30        Returns:
31            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
32                 The returned format will be:
33                 {
34                     "parent_stream_name1": {
35                         "last_updated": "2023-05-27T00:00:00Z"
36                     },
37                     "parent_stream_name2": {
38                         "last_updated": "2023-05-27T00:00:00Z"
39                     }
40                 }
41        """

Get the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.

Returns:

Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }