airbyte_cdk.sources.declarative.partition_routers

 1#
 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
 6    AsyncJobPartitionRouter,
 7)
 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import (
 9    CartesianProductStreamSlicer,
10)
11from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
12    GroupingPartitionRouter,
13)
14from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import (
15    ListPartitionRouter,
16)
17from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
18from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import (
19    SinglePartitionRouter,
20)
21from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
22    SubstreamPartitionRouter,
23)
24
25__all__ = [
26    "AsyncJobPartitionRouter",
27    "CartesianProductStreamSlicer",
28    "GroupingPartitionRouter",
29    "ListPartitionRouter",
30    "SinglePartitionRouter",
31    "SubstreamPartitionRouter",
32    "PartitionRouter",
33]
@dataclass
class AsyncJobPartitionRouter(airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer.StreamSlicer):
20@dataclass
21class AsyncJobPartitionRouter(StreamSlicer):
22    """
23    Partition router that creates async jobs in a source API, periodically polls for job
24    completion, and supplies the completed job URL locations as stream slices so that
25    records can be extracted.
26    """
27
28    config: Config
29    parameters: InitVar[Mapping[str, Any]]
30    job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator]
31    stream_slicer: StreamSlicer = field(
32        default_factory=lambda: SinglePartitionRouter(parameters={})
33    )
34
35    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
36        self._job_orchestrator_factory = self.job_orchestrator_factory
37        self._job_orchestrator: Optional[AsyncJobOrchestrator] = None
38        self._parameters = parameters
39
40    def stream_slices(self) -> Iterable[StreamSlice]:
41        slices = self.stream_slicer.stream_slices()
42        self._job_orchestrator = self._job_orchestrator_factory(slices)
43
44        for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
45            yield StreamSlice(
46                partition=dict(completed_partition.stream_slice.partition),
47                cursor_slice=completed_partition.stream_slice.cursor_slice,
48                extra_fields={"jobs": list(completed_partition.jobs)},
49            )
50
51    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
52        """
53        This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
54        be responsible for. However, this was added in because the JobOrchestrator is required to
55        retrieve records. And without defining fetch_records() on this class, we're stuck with either
56        passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
57        """
58
59        if not self._job_orchestrator:
60            raise AirbyteTracedException(
61                message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
62                internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
63                failure_type=FailureType.system_error,
64            )
65
66        return self._job_orchestrator.fetch_records(async_jobs=async_jobs)

Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.

AsyncJobPartitionRouter( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], job_orchestrator_factory: Callable[[Iterable[airbyte_cdk.StreamSlice]], airbyte_cdk.sources.declarative.async_job.job_orchestrator.AsyncJobOrchestrator], stream_slicer: airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer.StreamSlicer = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
40    def stream_slices(self) -> Iterable[StreamSlice]:
41        slices = self.stream_slicer.stream_slices()
42        self._job_orchestrator = self._job_orchestrator_factory(slices)
43
44        for completed_partition in self._job_orchestrator.create_and_get_completed_partitions():
45            yield StreamSlice(
46                partition=dict(completed_partition.stream_slice.partition),
47                cursor_slice=completed_partition.stream_slice.cursor_slice,
48                extra_fields={"jobs": list(completed_partition.jobs)},
49            )

Defines stream slices

Returns

An iterable of stream slices

def fetch_records( self, async_jobs: Iterable[airbyte_cdk.sources.declarative.async_job.job.AsyncJob]) -> Iterable[Mapping[str, Any]]:
51    def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]:
52        """
53        This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should
54        be responsible for. However, this was added in because the JobOrchestrator is required to
55        retrieve records. And without defining fetch_records() on this class, we're stuck with either
56        passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
57        """
58
59        if not self._job_orchestrator:
60            raise AirbyteTracedException(
61                message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support",
62                internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`",
63                failure_type=FailureType.system_error,
64            )
65
66        return self._job_orchestrator.fetch_records(async_jobs=async_jobs)

This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.

@dataclass
class CartesianProductStreamSlicer(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 40@dataclass
 41class CartesianProductStreamSlicer(PartitionRouter):
 42    """
 43    Stream slicers that iterates over the cartesian product of input stream slicers
 44    Given 2 stream slicers with the following slices:
 45    A: [{"i": 0}, {"i": 1}, {"i": 2}]
 46    B: [{"s": "hello"}, {"s": "world"}]
 47    the resulting stream slices are
 48    [
 49        {"i": 0, "s": "hello"},
 50        {"i": 0, "s": "world"},
 51        {"i": 1, "s": "hello"},
 52        {"i": 1, "s": "world"},
 53        {"i": 2, "s": "hello"},
 54        {"i": 2, "s": "world"},
 55    ]
 56
 57    Attributes:
 58        stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
 59    """
 60
 61    stream_slicers: List[PartitionRouter]
 62    parameters: InitVar[Mapping[str, Any]]
 63
 64    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 65        check_for_substream_in_slicers(self.stream_slicers, self.logger.warning)
 66
 67    def get_request_params(
 68        self,
 69        *,
 70        stream_state: Optional[StreamState] = None,
 71        stream_slice: Optional[StreamSlice] = None,
 72        next_page_token: Optional[Mapping[str, Any]] = None,
 73    ) -> Mapping[str, Any]:
 74        return dict(
 75            ChainMap(
 76                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 77                    s.get_request_params(
 78                        stream_state=stream_state,
 79                        stream_slice=stream_slice,
 80                        next_page_token=next_page_token,
 81                    )
 82                    for s in self.stream_slicers
 83                ]
 84            )
 85        )
 86
 87    def get_request_headers(
 88        self,
 89        *,
 90        stream_state: Optional[StreamState] = None,
 91        stream_slice: Optional[StreamSlice] = None,
 92        next_page_token: Optional[Mapping[str, Any]] = None,
 93    ) -> Mapping[str, Any]:
 94        return dict(
 95            ChainMap(
 96                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 97                    s.get_request_headers(
 98                        stream_state=stream_state,
 99                        stream_slice=stream_slice,
100                        next_page_token=next_page_token,
101                    )
102                    for s in self.stream_slicers
103                ]
104            )
105        )
106
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        return dict(
115            ChainMap(
116                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
117                    s.get_request_body_data(
118                        stream_state=stream_state,
119                        stream_slice=stream_slice,
120                        next_page_token=next_page_token,
121                    )
122                    for s in self.stream_slicers
123                ]
124            )
125        )
126
127    def get_request_body_json(
128        self,
129        *,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return dict(
135            ChainMap(
136                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
137                    s.get_request_body_json(
138                        stream_state=stream_state,
139                        stream_slice=stream_slice,
140                        next_page_token=next_page_token,
141                    )
142                    for s in self.stream_slicers
143                ]
144            )
145        )
146
147    def stream_slices(self) -> Iterable[StreamSlice]:
148        sub_slices = (s.stream_slices() for s in self.stream_slicers)
149        product = itertools.product(*sub_slices)
150        for stream_slice_tuple in product:
151            partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
152            cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice]
153            extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
154            if len(cursor_slices) > 1:
155                raise ValueError(
156                    f"There should only be a single cursor slice. Found {cursor_slices}"
157                )
158            if cursor_slices:
159                cursor_slice = cursor_slices[0]
160            else:
161                cursor_slice = {}
162            yield StreamSlice(
163                partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields
164            )
165
166    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
167        """
168        Parent stream states are not supported for cartesian product stream slicer
169        """
170        pass
171
172    @property
173    def logger(self) -> logging.Logger:
174        return logging.getLogger("airbyte.CartesianProductStreamSlicer")

Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]

Attributes:
  • stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
CartesianProductStreamSlicer( stream_slicers: List[PartitionRouter], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_slicers: List[PartitionRouter]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
67    def get_request_params(
68        self,
69        *,
70        stream_state: Optional[StreamState] = None,
71        stream_slice: Optional[StreamSlice] = None,
72        next_page_token: Optional[Mapping[str, Any]] = None,
73    ) -> Mapping[str, Any]:
74        return dict(
75            ChainMap(
76                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
77                    s.get_request_params(
78                        stream_state=stream_state,
79                        stream_slice=stream_slice,
80                        next_page_token=next_page_token,
81                    )
82                    for s in self.stream_slicers
83                ]
84            )
85        )

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
 87    def get_request_headers(
 88        self,
 89        *,
 90        stream_state: Optional[StreamState] = None,
 91        stream_slice: Optional[StreamSlice] = None,
 92        next_page_token: Optional[Mapping[str, Any]] = None,
 93    ) -> Mapping[str, Any]:
 94        return dict(
 95            ChainMap(
 96                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
 97                    s.get_request_headers(
 98                        stream_state=stream_state,
 99                        stream_slice=stream_slice,
100                        next_page_token=next_page_token,
101                    )
102                    for s in self.stream_slicers
103                ]
104            )
105        )

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
107    def get_request_body_data(
108        self,
109        *,
110        stream_state: Optional[StreamState] = None,
111        stream_slice: Optional[StreamSlice] = None,
112        next_page_token: Optional[Mapping[str, Any]] = None,
113    ) -> Mapping[str, Any]:
114        return dict(
115            ChainMap(
116                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
117                    s.get_request_body_data(
118                        stream_state=stream_state,
119                        stream_slice=stream_slice,
120                        next_page_token=next_page_token,
121                    )
122                    for s in self.stream_slicers
123                ]
124            )
125        )

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, *, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
127    def get_request_body_json(
128        self,
129        *,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return dict(
135            ChainMap(
136                *[  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
137                    s.get_request_body_json(
138                        stream_state=stream_state,
139                        stream_slice=stream_slice,
140                        next_page_token=next_page_token,
141                    )
142                    for s in self.stream_slicers
143                ]
144            )
145        )

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
147    def stream_slices(self) -> Iterable[StreamSlice]:
148        sub_slices = (s.stream_slices() for s in self.stream_slicers)
149        product = itertools.product(*sub_slices)
150        for stream_slice_tuple in product:
151            partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
152            cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice]
153            extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple]))  # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons
154            if len(cursor_slices) > 1:
155                raise ValueError(
156                    f"There should only be a single cursor slice. Found {cursor_slices}"
157                )
158            if cursor_slices:
159                cursor_slice = cursor_slices[0]
160            else:
161                cursor_slice = {}
162            yield StreamSlice(
163                partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields
164            )

Defines stream slices

Returns

An iterable of stream slices

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
166    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
167        """
168        Parent stream states are not supported for cartesian product stream slicer
169        """
170        pass

Parent stream states are not supported for cartesian product stream slicer

logger: logging.Logger
172    @property
173    def logger(self) -> logging.Logger:
174        return logging.getLogger("airbyte.CartesianProductStreamSlicer")
@dataclass
class GroupingPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 13@dataclass
 14class GroupingPartitionRouter(PartitionRouter):
 15    """
 16    A partition router that groups partitions from an underlying partition router into batches of a specified size.
 17    This is useful for APIs that support filtering by multiple partition keys in a single request.
 18
 19    Attributes:
 20        group_size (int): The number of partitions to include in each group.
 21        underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
 22        deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
 23        config (Config): The connector configuration.
 24        parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
 25    """
 26
 27    group_size: int
 28    underlying_partition_router: PartitionRouter
 29    config: Config
 30    deduplicate: bool = True
 31
 32    def __post_init__(self) -> None:
 33        self._state: Optional[Mapping[str, StreamState]] = {}
 34
 35    def stream_slices(self) -> Iterable[StreamSlice]:
 36        """
 37        Lazily groups partitions from the underlying partition router into batches of size `group_size`.
 38
 39        This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
 40        When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
 41        If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
 42
 43        Yields:
 44            Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
 45        """
 46        batch = []
 47        seen_keys = set()
 48
 49        # Iterate over partitions lazily from the underlying router
 50        for partition in self.underlying_partition_router.stream_slices():
 51            # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
 52            partition_keys = list(partition.partition.keys())
 53            # skip parent_slice as it is part of SubstreamPartitionRouter partition
 54            if "parent_slice" in partition_keys:
 55                partition_keys.remove("parent_slice")
 56            if len(partition_keys) != 1:
 57                raise ValueError(
 58                    f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}"
 59                )
 60            key = partition.partition[partition_keys[0]]
 61
 62            # Skip duplicates if deduplication is enabled
 63            if self.deduplicate and key in seen_keys:
 64                continue
 65
 66            # Add partition to the batch
 67            batch.append(partition)
 68            if self.deduplicate:
 69                seen_keys.add(key)
 70
 71            # Yield the batch when it reaches the group_size
 72            if len(batch) == self.group_size:
 73                self._state = self.underlying_partition_router.get_stream_state()
 74                yield self._create_grouped_slice(batch)
 75                batch = []  # Reset the batch
 76
 77        self._state = self.underlying_partition_router.get_stream_state()
 78        # Yield any remaining partitions if the batch isn't empty
 79        if batch:
 80            yield self._create_grouped_slice(batch)
 81
 82    def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice:
 83        """
 84        Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values.
 85
 86        Args:
 87            batch (list[StreamSlice]): A list of StreamSlice objects to group.
 88
 89        Returns:
 90            StreamSlice: A single StreamSlice with combined partition and extra field values.
 91        """
 92        # Combine partition values into a single dict with lists
 93        grouped_partition = {
 94            key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys()
 95        }
 96
 97        # Aggregate extra fields into a dict with list values
 98        extra_fields_dict = (
 99            {
100                key: [p.extra_fields.get(key) for p in batch]
101                for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields))
102            }
103            if any(p.extra_fields for p in batch)
104            else {}
105        )
106        return StreamSlice(
107            partition=grouped_partition,
108            cursor_slice={},  # Cursor is managed by the underlying router or incremental sync
109            extra_fields=extra_fields_dict,
110        )
111
112    def get_request_params(
113        self,
114        stream_state: Optional[StreamState] = None,
115        stream_slice: Optional[StreamSlice] = None,
116        next_page_token: Optional[Mapping[str, Any]] = None,
117    ) -> Mapping[str, Any]:
118        return {}
119
120    def get_request_headers(
121        self,
122        stream_state: Optional[StreamState] = None,
123        stream_slice: Optional[StreamSlice] = None,
124        next_page_token: Optional[Mapping[str, Any]] = None,
125    ) -> Mapping[str, Any]:
126        return {}
127
128    def get_request_body_data(
129        self,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return {}
135
136    def get_request_body_json(
137        self,
138        stream_state: Optional[StreamState] = None,
139        stream_slice: Optional[StreamSlice] = None,
140        next_page_token: Optional[Mapping[str, Any]] = None,
141    ) -> Mapping[str, Any]:
142        return {}
143
144    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
145        """Delegate state retrieval to the underlying partition router."""
146        return self._state

A partition router that groups partitions from an underlying partition router into batches of a specified size. This is useful for APIs that support filtering by multiple partition keys in a single request.

Attributes:
  • group_size (int): The number of partitions to include in each group.
  • underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
  • deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
  • config (Config): The connector configuration.
  • parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
GroupingPartitionRouter( group_size: int, underlying_partition_router: PartitionRouter, config: Mapping[str, Any], deduplicate: bool = True)
group_size: int
underlying_partition_router: PartitionRouter
config: Mapping[str, Any]
deduplicate: bool = True
def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
35    def stream_slices(self) -> Iterable[StreamSlice]:
36        """
37        Lazily groups partitions from the underlying partition router into batches of size `group_size`.
38
39        This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
40        When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice.
41        If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
42
43        Yields:
44            Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
45        """
46        batch = []
47        seen_keys = set()
48
49        # Iterate over partitions lazily from the underlying router
50        for partition in self.underlying_partition_router.stream_slices():
51            # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value})
52            partition_keys = list(partition.partition.keys())
53            # skip parent_slice as it is part of SubstreamPartitionRouter partition
54            if "parent_slice" in partition_keys:
55                partition_keys.remove("parent_slice")
56            if len(partition_keys) != 1:
57                raise ValueError(
58                    f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}"
59                )
60            key = partition.partition[partition_keys[0]]
61
62            # Skip duplicates if deduplication is enabled
63            if self.deduplicate and key in seen_keys:
64                continue
65
66            # Add partition to the batch
67            batch.append(partition)
68            if self.deduplicate:
69                seen_keys.add(key)
70
71            # Yield the batch when it reaches the group_size
72            if len(batch) == self.group_size:
73                self._state = self.underlying_partition_router.get_stream_state()
74                yield self._create_grouped_slice(batch)
75                batch = []  # Reset the batch
76
77        self._state = self.underlying_partition_router.get_stream_state()
78        # Yield any remaining partitions if the batch isn't empty
79        if batch:
80            yield self._create_grouped_slice(batch)

Lazily groups partitions from the underlying partition router into batches of size group_size.

This method processes partitions one at a time from the underlying router, maintaining a batch buffer. When the buffer reaches group_size or the underlying router is exhausted, it yields a grouped slice. If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.

Yields:

Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.

def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
112    def get_request_params(
113        self,
114        stream_state: Optional[StreamState] = None,
115        stream_slice: Optional[StreamSlice] = None,
116        next_page_token: Optional[Mapping[str, Any]] = None,
117    ) -> Mapping[str, Any]:
118        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
120    def get_request_headers(
121        self,
122        stream_state: Optional[StreamState] = None,
123        stream_slice: Optional[StreamSlice] = None,
124        next_page_token: Optional[Mapping[str, Any]] = None,
125    ) -> Mapping[str, Any]:
126        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
128    def get_request_body_data(
129        self,
130        stream_state: Optional[StreamState] = None,
131        stream_slice: Optional[StreamSlice] = None,
132        next_page_token: Optional[Mapping[str, Any]] = None,
133    ) -> Mapping[str, Any]:
134        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
136    def get_request_body_json(
137        self,
138        stream_state: Optional[StreamState] = None,
139        stream_slice: Optional[StreamSlice] = None,
140        next_page_token: Optional[Mapping[str, Any]] = None,
141    ) -> Mapping[str, Any]:
142        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
144    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
145        """Delegate state retrieval to the underlying partition router."""
146        return self._state

Delegate state retrieval to the underlying partition router.

@dataclass
class ListPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
 18@dataclass
 19class ListPartitionRouter(PartitionRouter):
 20    """
 21    Partition router that iterates over the values of a list
 22    If values is a string, then evaluate it as literal and assert the resulting literal is a list
 23
 24    Attributes:
 25        values (Union[str, List[str]]): The values to iterate over
 26        cursor_field (Union[InterpolatedString, str]): The name of the cursor field
 27        config (Config): The user-provided configuration as specified by the source's spec
 28        request_option (Optional[RequestOption]): The request option to configure the HTTP request
 29    """
 30
 31    values: Union[str, List[str]]
 32    cursor_field: Union[InterpolatedString, str]
 33    config: Config
 34    parameters: InitVar[Mapping[str, Any]]
 35    request_option: Optional[RequestOption] = None
 36
 37    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 38        if isinstance(self.values, str):
 39            self.values = InterpolatedString.create(self.values, parameters=parameters).eval(
 40                self.config
 41            )
 42        self._cursor_field = (
 43            InterpolatedString(string=self.cursor_field, parameters=parameters)
 44            if isinstance(self.cursor_field, str)
 45            else self.cursor_field
 46        )
 47
 48        self._cursor = None
 49
 50    def get_request_params(
 51        self,
 52        stream_state: Optional[StreamState] = None,
 53        stream_slice: Optional[StreamSlice] = None,
 54        next_page_token: Optional[Mapping[str, Any]] = None,
 55    ) -> Mapping[str, Any]:
 56        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 57        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
 58
 59    def get_request_headers(
 60        self,
 61        stream_state: Optional[StreamState] = None,
 62        stream_slice: Optional[StreamSlice] = None,
 63        next_page_token: Optional[Mapping[str, Any]] = None,
 64    ) -> Mapping[str, Any]:
 65        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 66        return self._get_request_option(RequestOptionType.header, stream_slice)
 67
 68    def get_request_body_data(
 69        self,
 70        stream_state: Optional[StreamState] = None,
 71        stream_slice: Optional[StreamSlice] = None,
 72        next_page_token: Optional[Mapping[str, Any]] = None,
 73    ) -> Mapping[str, Any]:
 74        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 75        return self._get_request_option(RequestOptionType.body_data, stream_slice)
 76
 77    def get_request_body_json(
 78        self,
 79        stream_state: Optional[StreamState] = None,
 80        stream_slice: Optional[StreamSlice] = None,
 81        next_page_token: Optional[Mapping[str, Any]] = None,
 82    ) -> Mapping[str, Any]:
 83        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
 84        return self._get_request_option(RequestOptionType.body_json, stream_slice)
 85
 86    def stream_slices(self) -> Iterable[StreamSlice]:
 87        return [
 88            StreamSlice(
 89                partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={}
 90            )
 91            for slice_value in self.values
 92        ]
 93
 94    def _get_request_option(
 95        self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
 96    ) -> Mapping[str, Any]:
 97        if (
 98            self.request_option
 99            and self.request_option.inject_into == request_option_type
100            and stream_slice
101        ):
102            slice_value = stream_slice.get(self._cursor_field.eval(self.config))
103            if slice_value:
104                options: MutableMapping[str, Any] = {}
105                self.request_option.inject_into_request(options, slice_value, self.config)
106                return options
107            else:
108                return {}
109        else:
110            return {}
111
112    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
113        """
114        ListPartitionRouter doesn't have parent streams
115        """
116        pass

Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list

Attributes:
  • values (Union[str, List[str]]): The values to iterate over
  • cursor_field (Union[InterpolatedString, str]): The name of the cursor field
  • config (Config): The user-provided configuration as specified by the source's spec
  • request_option (Optional[RequestOption]): The request option to configure the HTTP request
ListPartitionRouter( values: Union[str, List[str]], cursor_field: Union[airbyte_cdk.InterpolatedString, str], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], request_option: Optional[airbyte_cdk.RequestOption] = None)
values: Union[str, List[str]]
cursor_field: Union[airbyte_cdk.InterpolatedString, str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
request_option: Optional[airbyte_cdk.RequestOption] = None
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
50    def get_request_params(
51        self,
52        stream_state: Optional[StreamState] = None,
53        stream_slice: Optional[StreamSlice] = None,
54        next_page_token: Optional[Mapping[str, Any]] = None,
55    ) -> Mapping[str, Any]:
56        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
57        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
59    def get_request_headers(
60        self,
61        stream_state: Optional[StreamState] = None,
62        stream_slice: Optional[StreamSlice] = None,
63        next_page_token: Optional[Mapping[str, Any]] = None,
64    ) -> Mapping[str, Any]:
65        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
66        return self._get_request_option(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
68    def get_request_body_data(
69        self,
70        stream_state: Optional[StreamState] = None,
71        stream_slice: Optional[StreamSlice] = None,
72        next_page_token: Optional[Mapping[str, Any]] = None,
73    ) -> Mapping[str, Any]:
74        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
75        return self._get_request_option(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
77    def get_request_body_json(
78        self,
79        stream_state: Optional[StreamState] = None,
80        stream_slice: Optional[StreamSlice] = None,
81        next_page_token: Optional[Mapping[str, Any]] = None,
82    ) -> Mapping[str, Any]:
83        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
84        return self._get_request_option(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
86    def stream_slices(self) -> Iterable[StreamSlice]:
87        return [
88            StreamSlice(
89                partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={}
90            )
91            for slice_value in self.values
92        ]

Defines stream slices

Returns

An iterable of stream slices

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
112    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
113        """
114        ListPartitionRouter doesn't have parent streams
115        """
116        pass

ListPartitionRouter doesn't have parent streams

@dataclass
class SinglePartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
13@dataclass
14class SinglePartitionRouter(PartitionRouter):
15    """Partition router returning only a stream slice"""
16
17    parameters: InitVar[Mapping[str, Any]]
18
19    def get_request_params(
20        self,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23        next_page_token: Optional[Mapping[str, Any]] = None,
24    ) -> Mapping[str, Any]:
25        return {}
26
27    def get_request_headers(
28        self,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        return {}
34
35    def get_request_body_data(
36        self,
37        stream_state: Optional[StreamState] = None,
38        stream_slice: Optional[StreamSlice] = None,
39        next_page_token: Optional[Mapping[str, Any]] = None,
40    ) -> Mapping[str, Any]:
41        return {}
42
43    def get_request_body_json(
44        self,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return {}
50
51    def stream_slices(self) -> Iterable[StreamSlice]:
52        yield StreamSlice(partition={}, cursor_slice={})
53
54    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
55        """
56        SinglePartitionRouter doesn't have parent streams
57        """
58        pass

Partition router returning only a stream slice

SinglePartitionRouter(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
19    def get_request_params(
20        self,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23        next_page_token: Optional[Mapping[str, Any]] = None,
24    ) -> Mapping[str, Any]:
25        return {}

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
27    def get_request_headers(
28        self,
29        stream_state: Optional[StreamState] = None,
30        stream_slice: Optional[StreamSlice] = None,
31        next_page_token: Optional[Mapping[str, Any]] = None,
32    ) -> Mapping[str, Any]:
33        return {}

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
35    def get_request_body_data(
36        self,
37        stream_state: Optional[StreamState] = None,
38        stream_slice: Optional[StreamSlice] = None,
39        next_page_token: Optional[Mapping[str, Any]] = None,
40    ) -> Mapping[str, Any]:
41        return {}

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
43    def get_request_body_json(
44        self,
45        stream_state: Optional[StreamState] = None,
46        stream_slice: Optional[StreamSlice] = None,
47        next_page_token: Optional[Mapping[str, Any]] = None,
48    ) -> Mapping[str, Any]:
49        return {}

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
51    def stream_slices(self) -> Iterable[StreamSlice]:
52        yield StreamSlice(partition={}, cursor_slice={})

Defines stream slices

Returns

An iterable of stream slices

def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
54    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
55        """
56        SinglePartitionRouter doesn't have parent streams
57        """
58        pass

SinglePartitionRouter doesn't have parent streams

@dataclass
class SubstreamPartitionRouter(airbyte_cdk.sources.declarative.partition_routers.PartitionRouter):
108@dataclass
109class SubstreamPartitionRouter(PartitionRouter):
110    """
111    Partition router that iterates over the parent's stream records and emits slices
112    Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components
113
114    Attributes:
115        parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
116    """
117
118    parent_stream_configs: List[ParentStreamConfig]
119    config: Config
120    parameters: InitVar[Mapping[str, Any]]
121
122    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
123        if not self.parent_stream_configs:
124            raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream")
125        self._parameters = parameters
126
127    def get_request_params(
128        self,
129        stream_state: Optional[StreamState] = None,
130        stream_slice: Optional[StreamSlice] = None,
131        next_page_token: Optional[Mapping[str, Any]] = None,
132    ) -> Mapping[str, Any]:
133        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
134        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
135
136    def get_request_headers(
137        self,
138        stream_state: Optional[StreamState] = None,
139        stream_slice: Optional[StreamSlice] = None,
140        next_page_token: Optional[Mapping[str, Any]] = None,
141    ) -> Mapping[str, Any]:
142        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
143        return self._get_request_option(RequestOptionType.header, stream_slice)
144
145    def get_request_body_data(
146        self,
147        stream_state: Optional[StreamState] = None,
148        stream_slice: Optional[StreamSlice] = None,
149        next_page_token: Optional[Mapping[str, Any]] = None,
150    ) -> Mapping[str, Any]:
151        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
152        return self._get_request_option(RequestOptionType.body_data, stream_slice)
153
154    def get_request_body_json(
155        self,
156        stream_state: Optional[StreamState] = None,
157        stream_slice: Optional[StreamSlice] = None,
158        next_page_token: Optional[Mapping[str, Any]] = None,
159    ) -> Mapping[str, Any]:
160        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
161        return self._get_request_option(RequestOptionType.body_json, stream_slice)
162
163    def _get_request_option(
164        self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice]
165    ) -> Mapping[str, Any]:
166        params: MutableMapping[str, Any] = {}
167        if stream_slice:
168            for parent_config in self.parent_stream_configs:
169                if (
170                    parent_config.request_option
171                    and parent_config.request_option.inject_into == option_type
172                ):
173                    key = parent_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
174                    value = stream_slice.get(key)
175                    if value:
176                        parent_config.request_option.inject_into_request(params, value, self.config)
177        return params
178
179    def stream_slices(self) -> Iterable[StreamSlice]:
180        """
181        Iterate over each parent stream's record and create a StreamSlice for each record.
182
183        For each stream, iterate over its stream_slices.
184        For each stream slice, iterate over each record.
185        yield a stream slice for each such records.
186
187        If a parent slice contains no record, emit a slice with parent_record=None.
188
189        The template string can interpolate the following values:
190        - parent_stream_slice: mapping representing the parent's stream slice
191        - parent_record: mapping representing the parent record
192        - parent_stream_name: string representing the parent stream name
193        """
194        if not self.parent_stream_configs:
195            yield from []
196        else:
197            for parent_stream_config in self.parent_stream_configs:
198                parent_stream = parent_stream_config.stream
199                parent_field = parent_stream_config.parent_key.eval(self.config)  # type: ignore # parent_key is always casted to an interpolated string
200                partition_field = parent_stream_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
201                extra_fields = None
202                if parent_stream_config.extra_fields:
203                    extra_fields = [
204                        [field_path_part.eval(self.config) for field_path_part in field_path]  # type: ignore [union-attr]
205                        for field_path in parent_stream_config.extra_fields
206                    ]
207
208                for partition, is_last_slice in iterate_with_last_flag(
209                    parent_stream.generate_partitions()
210                ):
211                    if partition is None:
212                        break
213                    for parent_record, is_last_record_in_slice in iterate_with_last_flag(
214                        partition.read()
215                    ):
216                        emit_slice = parent_record is not None
217                        if parent_record is not None:
218                            # In the previous CDK implementation, state management was done internally by the stream.
219                            # However, this could cause issues when doing availability check for example as the availability
220                            # check would progress the state so state management was moved outside of the read method.
221                            # Hence, we need to call the cursor here.
222                            # Note that we call observe and close_partition before emitting the associated record as the
223                            # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
224                            # record was consumed.
225                            parent_stream.cursor.observe(parent_record)
226                            parent_partition = (
227                                parent_record.associated_slice.partition
228                                if parent_record.associated_slice
229                                else {}
230                            )
231                            record_data = parent_record.data
232
233                            try:
234                                partition_value = dpath.get(
235                                    record_data,  # type: ignore [arg-type]
236                                    parent_field,
237                                )
238                            except KeyError:
239                                # FIXME a log here would go a long way for debugging
240                                emit_slice = False
241
242                            if emit_slice:
243                                # Add extra fields
244                                extracted_extra_fields = self._extract_extra_fields(
245                                    record_data, extra_fields
246                                )
247
248                                if parent_stream_config.lazy_read_pointer:
249                                    extracted_extra_fields = {
250                                        "child_response": self._extract_child_response(
251                                            record_data,
252                                            parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
253                                        ),
254                                        **extracted_extra_fields,
255                                    }
256
257                        if is_last_record_in_slice:
258                            parent_stream.cursor.close_partition(partition)
259                            if is_last_slice:
260                                parent_stream.cursor.ensure_at_least_one_state_emitted()
261
262                        if emit_slice:
263                            yield StreamSlice(
264                                partition={
265                                    partition_field: partition_value,
266                                    "parent_slice": parent_partition or {},
267                                },
268                                cursor_slice={},
269                                extra_fields=extracted_extra_fields,
270                            )
271
272                yield from []
273
274    def _extract_child_response(
275        self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
276    ) -> requests.Response:
277        """Extract child records from a parent record based on lazy pointers."""
278
279        def _create_response(data: MutableMapping[str, Any]) -> SafeResponse:
280            """Create a SafeResponse with the given data."""
281            response = SafeResponse()
282            response.content = json.dumps(data).encode("utf-8")
283            response.status_code = 200
284            return response
285
286        path = [path.eval(self.config) for path in pointer]
287        return _create_response(dpath.get(parent_record, path, default=[]))  # type: ignore # argunet will be a MutableMapping, given input data structure
288
289    def _extract_extra_fields(
290        self,
291        parent_record: Mapping[str, Any] | AirbyteMessage,
292        extra_fields: Optional[List[List[str]]] = None,
293    ) -> Mapping[str, Any]:
294        """
295        Extracts additional fields specified by their paths from the parent record.
296
297        Args:
298            parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from.
299            extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record.
300
301        Returns:
302            Mapping[str, Any]: A dictionary containing the extracted fields.
303                               The keys are the joined field paths, and the values are the corresponding extracted values.
304        """
305        extracted_extra_fields = {}
306        if extra_fields:
307            for extra_field_path in extra_fields:
308                try:
309                    extra_field_value = dpath.get(
310                        parent_record,  # type: ignore [arg-type]
311                        extra_field_path,
312                    )
313                    self.logger.debug(
314                        f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}"
315                    )
316                except KeyError:
317                    self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}")
318                    extra_field_value = None
319                extracted_extra_fields[".".join(extra_field_path)] = extra_field_value
320        return extracted_extra_fields
321
322    def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState:
323        """
324        Migrate the child or global stream state into the parent stream's state format.
325
326        This method converts the child stream state—or, if present, the global state—into a format that is
327        compatible with parent streams that use incremental synchronization. The migration occurs only for
328        parent streams with incremental dependencies. It filters out per-partition states and retains only the
329        global state in the form {cursor_field: cursor_value}.
330
331        The method supports multiple input formats:
332          - A simple global state, e.g.:
333                {"updated_at": "2023-05-27T00:00:00Z"}
334          - A state object that contains a "state" key (which is assumed to hold the global state), e.g.:
335                {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
336            In this case, the migration uses the first value from the "state" dictionary.
337          - Any per-partition state formats or other non-simple structures are ignored during migration.
338
339        Args:
340            stream_state (StreamState): The state to migrate. Expected formats include:
341                - {"updated_at": "2023-05-27T00:00:00Z"}
342                - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...}
343                  (In this format, only the first global state value is used, and per-partition states are ignored.)
344
345        Returns:
346            StreamState: A migrated state for parent streams in the format:
347                {
348                    "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
349                }
350            where each parent stream with an incremental dependency is assigned its corresponding cursor value.
351
352        Example:
353            Input: {"updated_at": "2023-05-27T00:00:00Z"}
354            Output: {
355                "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"}
356            }
357        """
358        substream_state_values = list(stream_state.values())
359        substream_state = substream_state_values[0] if substream_state_values else {}
360
361        # Ignore per-partition states or invalid formats.
362        if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1:
363            # If a global state is present under the key "state", use its first value.
364            if (
365                "state" in stream_state
366                and isinstance(stream_state["state"], dict)
367                and stream_state["state"] != {}
368            ):
369                substream_state = list(stream_state["state"].values())[0]
370            else:
371                return {}
372
373        # Build the parent state for all parent streams with incremental dependencies.
374        parent_state = {}
375        if substream_state:
376            for parent_config in self.parent_stream_configs:
377                if parent_config.incremental_dependency:
378                    parent_state[parent_config.stream.name] = {
379                        parent_config.stream.cursor_field: substream_state
380                    }
381
382        return parent_state
383
384    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
385        """
386        Get the state of the parent streams.
387
388        Returns:
389            StreamState: The current state of the parent streams.
390
391        Example of state format:
392        {
393            "parent_stream_name1": {
394                "last_updated": "2023-05-27T00:00:00Z"
395            },
396            "parent_stream_name2": {
397                "last_updated": "2023-05-27T00:00:00Z"
398            }
399        }
400        """
401        parent_state = {}
402        for parent_config in self.parent_stream_configs:
403            if parent_config.incremental_dependency:
404                parent_state[parent_config.stream.name] = copy.deepcopy(
405                    parent_config.stream.cursor.state
406                )
407        return parent_state
408
409    @property
410    def logger(self) -> logging.Logger:
411        return logging.getLogger("airbyte.SubstreamPartitionRouter")

Partition router that iterates over the parent's stream records and emits slices Will populate the state with partition_field and parent_slice so they can be accessed by other components

Attributes:
  • parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
SubstreamPartitionRouter( parent_stream_configs: List[airbyte_cdk.ParentStreamConfig], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parent_stream_configs: List[airbyte_cdk.ParentStreamConfig]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_params( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
127    def get_request_params(
128        self,
129        stream_state: Optional[StreamState] = None,
130        stream_slice: Optional[StreamSlice] = None,
131        next_page_token: Optional[Mapping[str, Any]] = None,
132    ) -> Mapping[str, Any]:
133        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
134        return self._get_request_option(RequestOptionType.request_parameter, stream_slice)

Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.

E.g: you might want to define query parameters for paging if next_page_token is not None.

def get_request_headers( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
136    def get_request_headers(
137        self,
138        stream_state: Optional[StreamState] = None,
139        stream_slice: Optional[StreamSlice] = None,
140        next_page_token: Optional[Mapping[str, Any]] = None,
141    ) -> Mapping[str, Any]:
142        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
143        return self._get_request_option(RequestOptionType.header, stream_slice)

Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.

def get_request_body_data( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
145    def get_request_body_data(
146        self,
147        stream_state: Optional[StreamState] = None,
148        stream_slice: Optional[StreamSlice] = None,
149        next_page_token: Optional[Mapping[str, Any]] = None,
150    ) -> Mapping[str, Any]:
151        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
152        return self._get_request_option(RequestOptionType.body_data, stream_slice)

Specifies how to populate the body of the request with a non-JSON payload.

If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def get_request_body_json( self, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Mapping[str, Any]:
154    def get_request_body_json(
155        self,
156        stream_state: Optional[StreamState] = None,
157        stream_slice: Optional[StreamSlice] = None,
158        next_page_token: Optional[Mapping[str, Any]] = None,
159    ) -> Mapping[str, Any]:
160        # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response
161        return self._get_request_option(RequestOptionType.body_json, stream_slice)

Specifies how to populate the body of the request with a JSON payload.

At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.

def stream_slices(self) -> Iterable[airbyte_cdk.StreamSlice]:
179    def stream_slices(self) -> Iterable[StreamSlice]:
180        """
181        Iterate over each parent stream's record and create a StreamSlice for each record.
182
183        For each stream, iterate over its stream_slices.
184        For each stream slice, iterate over each record.
185        yield a stream slice for each such records.
186
187        If a parent slice contains no record, emit a slice with parent_record=None.
188
189        The template string can interpolate the following values:
190        - parent_stream_slice: mapping representing the parent's stream slice
191        - parent_record: mapping representing the parent record
192        - parent_stream_name: string representing the parent stream name
193        """
194        if not self.parent_stream_configs:
195            yield from []
196        else:
197            for parent_stream_config in self.parent_stream_configs:
198                parent_stream = parent_stream_config.stream
199                parent_field = parent_stream_config.parent_key.eval(self.config)  # type: ignore # parent_key is always casted to an interpolated string
200                partition_field = parent_stream_config.partition_field.eval(self.config)  # type: ignore # partition_field is always casted to an interpolated string
201                extra_fields = None
202                if parent_stream_config.extra_fields:
203                    extra_fields = [
204                        [field_path_part.eval(self.config) for field_path_part in field_path]  # type: ignore [union-attr]
205                        for field_path in parent_stream_config.extra_fields
206                    ]
207
208                for partition, is_last_slice in iterate_with_last_flag(
209                    parent_stream.generate_partitions()
210                ):
211                    if partition is None:
212                        break
213                    for parent_record, is_last_record_in_slice in iterate_with_last_flag(
214                        partition.read()
215                    ):
216                        emit_slice = parent_record is not None
217                        if parent_record is not None:
218                            # In the previous CDK implementation, state management was done internally by the stream.
219                            # However, this could cause issues when doing availability check for example as the availability
220                            # check would progress the state so state management was moved outside of the read method.
221                            # Hence, we need to call the cursor here.
222                            # Note that we call observe and close_partition before emitting the associated record as the
223                            # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the
224                            # record was consumed.
225                            parent_stream.cursor.observe(parent_record)
226                            parent_partition = (
227                                parent_record.associated_slice.partition
228                                if parent_record.associated_slice
229                                else {}
230                            )
231                            record_data = parent_record.data
232
233                            try:
234                                partition_value = dpath.get(
235                                    record_data,  # type: ignore [arg-type]
236                                    parent_field,
237                                )
238                            except KeyError:
239                                # FIXME a log here would go a long way for debugging
240                                emit_slice = False
241
242                            if emit_slice:
243                                # Add extra fields
244                                extracted_extra_fields = self._extract_extra_fields(
245                                    record_data, extra_fields
246                                )
247
248                                if parent_stream_config.lazy_read_pointer:
249                                    extracted_extra_fields = {
250                                        "child_response": self._extract_child_response(
251                                            record_data,
252                                            parent_stream_config.lazy_read_pointer,  # type: ignore[arg-type]  # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
253                                        ),
254                                        **extracted_extra_fields,
255                                    }
256
257                        if is_last_record_in_slice:
258                            parent_stream.cursor.close_partition(partition)
259                            if is_last_slice:
260                                parent_stream.cursor.ensure_at_least_one_state_emitted()
261
262                        if emit_slice:
263                            yield StreamSlice(
264                                partition={
265                                    partition_field: partition_value,
266                                    "parent_slice": parent_partition or {},
267                                },
268                                cursor_slice={},
269                                extra_fields=extracted_extra_fields,
270                            )
271
272                yield from []

Iterate over each parent stream's record and create a StreamSlice for each record.

For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.

If a parent slice contains no record, emit a slice with parent_record=None.

The template string can interpolate the following values:

  • parent_stream_slice: mapping representing the parent's stream slice
  • parent_record: mapping representing the parent record
  • parent_stream_name: string representing the parent stream name
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
384    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
385        """
386        Get the state of the parent streams.
387
388        Returns:
389            StreamState: The current state of the parent streams.
390
391        Example of state format:
392        {
393            "parent_stream_name1": {
394                "last_updated": "2023-05-27T00:00:00Z"
395            },
396            "parent_stream_name2": {
397                "last_updated": "2023-05-27T00:00:00Z"
398            }
399        }
400        """
401        parent_state = {}
402        for parent_config in self.parent_stream_configs:
403            if parent_config.incremental_dependency:
404                parent_state[parent_config.stream.name] = copy.deepcopy(
405                    parent_config.stream.cursor.state
406                )
407        return parent_state

Get the state of the parent streams.

Returns:

StreamState: The current state of the parent streams.

Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }

logger: logging.Logger
409    @property
410    def logger(self) -> logging.Logger:
411        return logging.getLogger("airbyte.SubstreamPartitionRouter")
@dataclass
class PartitionRouter(airbyte_cdk.sources.declarative.stream_slicers.stream_slicer.StreamSlicer):
14@dataclass
15class PartitionRouter(StreamSlicer):
16    """
17    Base class for partition routers.
18    Methods:
19        get_stream_state(): Get the state of the parent streams.
20    """
21
22    @abstractmethod
23    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
24        """
25        Get the state of the parent streams.
26
27        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
28        incrementally using the state.
29
30        Returns:
31            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
32                 The returned format will be:
33                 {
34                     "parent_stream_name1": {
35                         "last_updated": "2023-05-27T00:00:00Z"
36                     },
37                     "parent_stream_name2": {
38                         "last_updated": "2023-05-27T00:00:00Z"
39                     }
40                 }
41        """

Base class for partition routers.

Methods:

get_stream_state(): Get the state of the parent streams.

@abstractmethod
def get_stream_state(self) -> Optional[Mapping[str, Mapping[str, Any]]]:
22    @abstractmethod
23    def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
24        """
25        Get the state of the parent streams.
26
27        This method should only be implemented if the slicer is based on some parent stream and needs to read this stream
28        incrementally using the state.
29
30        Returns:
31            Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format.
32                 The returned format will be:
33                 {
34                     "parent_stream_name1": {
35                         "last_updated": "2023-05-27T00:00:00Z"
36                     },
37                     "parent_stream_name2": {
38                         "last_updated": "2023-05-27T00:00:00Z"
39                     }
40                 }
41        """

Get the state of the parent streams.

This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.

Returns:

Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }