airbyte_cdk.sources.declarative.partition_routers
1# 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( 6 AsyncJobPartitionRouter, 7) 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import ( 9 CartesianProductStreamSlicer, 10) 11from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( 12 GroupingPartitionRouter, 13) 14from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ( 15 ListPartitionRouter, 16) 17from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter 18from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( 19 SinglePartitionRouter, 20) 21from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 22 SubstreamPartitionRouter, 23) 24 25__all__ = [ 26 "AsyncJobPartitionRouter", 27 "CartesianProductStreamSlicer", 28 "GroupingPartitionRouter", 29 "ListPartitionRouter", 30 "SinglePartitionRouter", 31 "SubstreamPartitionRouter", 32 "PartitionRouter", 33]
20@dataclass 21class AsyncJobPartitionRouter(StreamSlicer): 22 """ 23 Partition router that creates async jobs in a source API, periodically polls for job 24 completion, and supplies the completed job URL locations as stream slices so that 25 records can be extracted. 26 """ 27 28 config: Config 29 parameters: InitVar[Mapping[str, Any]] 30 job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator] 31 stream_slicer: StreamSlicer = field( 32 default_factory=lambda: SinglePartitionRouter(parameters={}) 33 ) 34 35 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 36 self._job_orchestrator_factory = self.job_orchestrator_factory 37 self._job_orchestrator: Optional[AsyncJobOrchestrator] = None 38 self._parameters = parameters 39 40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 ) 50 51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.
40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 )
Defines stream slices
Returns
An iterable of stream slices
51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
40@dataclass 41class CartesianProductStreamSlicer(PartitionRouter): 42 """ 43 Stream slicers that iterates over the cartesian product of input stream slicers 44 Given 2 stream slicers with the following slices: 45 A: [{"i": 0}, {"i": 1}, {"i": 2}] 46 B: [{"s": "hello"}, {"s": "world"}] 47 the resulting stream slices are 48 [ 49 {"i": 0, "s": "hello"}, 50 {"i": 0, "s": "world"}, 51 {"i": 1, "s": "hello"}, 52 {"i": 1, "s": "world"}, 53 {"i": 2, "s": "hello"}, 54 {"i": 2, "s": "world"}, 55 ] 56 57 Attributes: 58 stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer. 59 """ 60 61 stream_slicers: List[PartitionRouter] 62 parameters: InitVar[Mapping[str, Any]] 63 64 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 65 check_for_substream_in_slicers(self.stream_slicers, self.logger.warning) 66 67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 ) 86 87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 ) 106 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 ) 126 127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 ) 146 147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 if len(cursor_slices) > 1: 154 raise ValueError( 155 f"There should only be a single cursor slice. Found {cursor_slices}" 156 ) 157 if cursor_slices: 158 cursor_slice = cursor_slices[0] 159 else: 160 cursor_slice = {} 161 yield StreamSlice(partition=partition, cursor_slice=cursor_slice) 162 163 def set_initial_state(self, stream_state: StreamState) -> None: 164 """ 165 Parent stream states are not supported for cartesian product stream slicer 166 """ 167 pass 168 169 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 170 """ 171 Parent stream states are not supported for cartesian product stream slicer 172 """ 173 pass 174 175 @property 176 def logger(self) -> logging.Logger: 177 return logging.getLogger("airbyte.CartesianProductStreamSlicer")
Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]
Attributes:
- stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 if len(cursor_slices) > 1: 154 raise ValueError( 155 f"There should only be a single cursor slice. Found {cursor_slices}" 156 ) 157 if cursor_slices: 158 cursor_slice = cursor_slices[0] 159 else: 160 cursor_slice = {} 161 yield StreamSlice(partition=partition, cursor_slice=cursor_slice)
Defines stream slices
Returns
An iterable of stream slices
163 def set_initial_state(self, stream_state: StreamState) -> None: 164 """ 165 Parent stream states are not supported for cartesian product stream slicer 166 """ 167 pass
Parent stream states are not supported for cartesian product stream slicer
169 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 170 """ 171 Parent stream states are not supported for cartesian product stream slicer 172 """ 173 pass
Parent stream states are not supported for cartesian product stream slicer
13@dataclass 14class GroupingPartitionRouter(PartitionRouter): 15 """ 16 A partition router that groups partitions from an underlying partition router into batches of a specified size. 17 This is useful for APIs that support filtering by multiple partition keys in a single request. 18 19 Attributes: 20 group_size (int): The number of partitions to include in each group. 21 underlying_partition_router (PartitionRouter): The partition router whose output will be grouped. 22 deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key. 23 config (Config): The connector configuration. 24 parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration. 25 """ 26 27 group_size: int 28 underlying_partition_router: PartitionRouter 29 config: Config 30 deduplicate: bool = True 31 32 def __post_init__(self) -> None: 33 self._state: Optional[Mapping[str, StreamState]] = {} 34 35 def stream_slices(self) -> Iterable[StreamSlice]: 36 """ 37 Lazily groups partitions from the underlying partition router into batches of size `group_size`. 38 39 This method processes partitions one at a time from the underlying router, maintaining a batch buffer. 40 When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. 41 If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. 42 43 Yields: 44 Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. 45 """ 46 batch = [] 47 seen_keys = set() 48 49 # Iterate over partitions lazily from the underlying router 50 for partition in self.underlying_partition_router.stream_slices(): 51 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) 52 partition_keys = list(partition.partition.keys()) 53 # skip parent_slice as it is part of SubstreamPartitionRouter partition 54 if "parent_slice" in partition_keys: 55 partition_keys.remove("parent_slice") 56 if len(partition_keys) != 1: 57 raise ValueError( 58 f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" 59 ) 60 key = partition.partition[partition_keys[0]] 61 62 # Skip duplicates if deduplication is enabled 63 if self.deduplicate and key in seen_keys: 64 continue 65 66 # Add partition to the batch 67 batch.append(partition) 68 if self.deduplicate: 69 seen_keys.add(key) 70 71 # Yield the batch when it reaches the group_size 72 if len(batch) == self.group_size: 73 self._state = self.underlying_partition_router.get_stream_state() 74 yield self._create_grouped_slice(batch) 75 batch = [] # Reset the batch 76 77 self._state = self.underlying_partition_router.get_stream_state() 78 # Yield any remaining partitions if the batch isn't empty 79 if batch: 80 yield self._create_grouped_slice(batch) 81 82 def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: 83 """ 84 Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values. 85 86 Args: 87 batch (list[StreamSlice]): A list of StreamSlice objects to group. 88 89 Returns: 90 StreamSlice: A single StreamSlice with combined partition and extra field values. 91 """ 92 # Combine partition values into a single dict with lists 93 grouped_partition = { 94 key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() 95 } 96 97 # Aggregate extra fields into a dict with list values 98 extra_fields_dict = ( 99 { 100 key: [p.extra_fields.get(key) for p in batch] 101 for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields)) 102 } 103 if any(p.extra_fields for p in batch) 104 else {} 105 ) 106 return StreamSlice( 107 partition=grouped_partition, 108 cursor_slice={}, # Cursor is managed by the underlying router or incremental sync 109 extra_fields=extra_fields_dict, 110 ) 111 112 def get_request_params( 113 self, 114 stream_state: Optional[StreamState] = None, 115 stream_slice: Optional[StreamSlice] = None, 116 next_page_token: Optional[Mapping[str, Any]] = None, 117 ) -> Mapping[str, Any]: 118 return {} 119 120 def get_request_headers( 121 self, 122 stream_state: Optional[StreamState] = None, 123 stream_slice: Optional[StreamSlice] = None, 124 next_page_token: Optional[Mapping[str, Any]] = None, 125 ) -> Mapping[str, Any]: 126 return {} 127 128 def get_request_body_data( 129 self, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return {} 135 136 def get_request_body_json( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 return {} 143 144 def set_initial_state(self, stream_state: StreamState) -> None: 145 """Delegate state initialization to the underlying partition router.""" 146 self.underlying_partition_router.set_initial_state(stream_state) 147 self._state = self.underlying_partition_router.get_stream_state() 148 149 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 150 """Delegate state retrieval to the underlying partition router.""" 151 return self._state
A partition router that groups partitions from an underlying partition router into batches of a specified size. This is useful for APIs that support filtering by multiple partition keys in a single request.
Attributes:
- group_size (int): The number of partitions to include in each group.
- underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
- deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
- config (Config): The connector configuration.
- parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
35 def stream_slices(self) -> Iterable[StreamSlice]: 36 """ 37 Lazily groups partitions from the underlying partition router into batches of size `group_size`. 38 39 This method processes partitions one at a time from the underlying router, maintaining a batch buffer. 40 When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. 41 If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. 42 43 Yields: 44 Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. 45 """ 46 batch = [] 47 seen_keys = set() 48 49 # Iterate over partitions lazily from the underlying router 50 for partition in self.underlying_partition_router.stream_slices(): 51 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) 52 partition_keys = list(partition.partition.keys()) 53 # skip parent_slice as it is part of SubstreamPartitionRouter partition 54 if "parent_slice" in partition_keys: 55 partition_keys.remove("parent_slice") 56 if len(partition_keys) != 1: 57 raise ValueError( 58 f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" 59 ) 60 key = partition.partition[partition_keys[0]] 61 62 # Skip duplicates if deduplication is enabled 63 if self.deduplicate and key in seen_keys: 64 continue 65 66 # Add partition to the batch 67 batch.append(partition) 68 if self.deduplicate: 69 seen_keys.add(key) 70 71 # Yield the batch when it reaches the group_size 72 if len(batch) == self.group_size: 73 self._state = self.underlying_partition_router.get_stream_state() 74 yield self._create_grouped_slice(batch) 75 batch = [] # Reset the batch 76 77 self._state = self.underlying_partition_router.get_stream_state() 78 # Yield any remaining partitions if the batch isn't empty 79 if batch: 80 yield self._create_grouped_slice(batch)
Lazily groups partitions from the underlying partition router into batches of size group_size
.
This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
When the buffer reaches group_size
or the underlying router is exhausted, it yields a grouped slice.
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
Yields:
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
112 def get_request_params( 113 self, 114 stream_state: Optional[StreamState] = None, 115 stream_slice: Optional[StreamSlice] = None, 116 next_page_token: Optional[Mapping[str, Any]] = None, 117 ) -> Mapping[str, Any]: 118 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
120 def get_request_headers( 121 self, 122 stream_state: Optional[StreamState] = None, 123 stream_slice: Optional[StreamSlice] = None, 124 next_page_token: Optional[Mapping[str, Any]] = None, 125 ) -> Mapping[str, Any]: 126 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
128 def get_request_body_data( 129 self, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136 def get_request_body_json( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
144 def set_initial_state(self, stream_state: StreamState) -> None: 145 """Delegate state initialization to the underlying partition router.""" 146 self.underlying_partition_router.set_initial_state(stream_state) 147 self._state = self.underlying_partition_router.get_stream_state()
Delegate state initialization to the underlying partition router.
18@dataclass 19class ListPartitionRouter(PartitionRouter): 20 """ 21 Partition router that iterates over the values of a list 22 If values is a string, then evaluate it as literal and assert the resulting literal is a list 23 24 Attributes: 25 values (Union[str, List[str]]): The values to iterate over 26 cursor_field (Union[InterpolatedString, str]): The name of the cursor field 27 config (Config): The user-provided configuration as specified by the source's spec 28 request_option (Optional[RequestOption]): The request option to configure the HTTP request 29 """ 30 31 values: Union[str, List[str]] 32 cursor_field: Union[InterpolatedString, str] 33 config: Config 34 parameters: InitVar[Mapping[str, Any]] 35 request_option: Optional[RequestOption] = None 36 37 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 38 if isinstance(self.values, str): 39 self.values = InterpolatedString.create(self.values, parameters=parameters).eval( 40 self.config 41 ) 42 self._cursor_field = ( 43 InterpolatedString(string=self.cursor_field, parameters=parameters) 44 if isinstance(self.cursor_field, str) 45 else self.cursor_field 46 ) 47 48 self._cursor = None 49 50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 58 59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice) 67 68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice) 76 77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice) 85 86 def stream_slices(self) -> Iterable[StreamSlice]: 87 return [ 88 StreamSlice( 89 partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={} 90 ) 91 for slice_value in self.values 92 ] 93 94 def _get_request_option( 95 self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 96 ) -> Mapping[str, Any]: 97 if ( 98 self.request_option 99 and self.request_option.inject_into == request_option_type 100 and stream_slice 101 ): 102 slice_value = stream_slice.get(self._cursor_field.eval(self.config)) 103 if slice_value: 104 options: MutableMapping[str, Any] = {} 105 self.request_option.inject_into_request(options, slice_value, self.config) 106 return options 107 else: 108 return {} 109 else: 110 return {} 111 112 def set_initial_state(self, stream_state: StreamState) -> None: 113 """ 114 ListPartitionRouter doesn't have parent streams 115 """ 116 pass 117 118 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 119 """ 120 ListPartitionRouter doesn't have parent streams 121 """ 122 pass
Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list
Attributes:
- values (Union[str, List[str]]): The values to iterate over
- cursor_field (Union[InterpolatedString, str]): The name of the cursor field
- config (Config): The user-provided configuration as specified by the source's spec
- request_option (Optional[RequestOption]): The request option to configure the HTTP request
50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
86 def stream_slices(self) -> Iterable[StreamSlice]: 87 return [ 88 StreamSlice( 89 partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={} 90 ) 91 for slice_value in self.values 92 ]
Defines stream slices
Returns
An iterable of stream slices
13@dataclass 14class SinglePartitionRouter(PartitionRouter): 15 """Partition router returning only a stream slice""" 16 17 parameters: InitVar[Mapping[str, Any]] 18 19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {} 26 27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {} 34 35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {} 42 43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {} 50 51 def stream_slices(self) -> Iterable[StreamSlice]: 52 yield StreamSlice(partition={}, cursor_slice={}) 53 54 def set_initial_state(self, stream_state: StreamState) -> None: 55 """ 56 SinglePartitionRouter doesn't have parent streams 57 """ 58 pass 59 60 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 61 """ 62 SinglePartitionRouter doesn't have parent streams 63 """ 64 pass
Partition router returning only a stream slice
19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
51 def stream_slices(self) -> Iterable[StreamSlice]: 52 yield StreamSlice(partition={}, cursor_slice={})
Defines stream slices
Returns
An iterable of stream slices
80@dataclass 81class SubstreamPartitionRouter(PartitionRouter): 82 """ 83 Partition router that iterates over the parent's stream records and emits slices 84 Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components 85 86 Attributes: 87 parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config 88 """ 89 90 parent_stream_configs: List[ParentStreamConfig] 91 config: Config 92 parameters: InitVar[Mapping[str, Any]] 93 94 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 95 if not self.parent_stream_configs: 96 raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream") 97 self._parameters = parameters 98 99 def get_request_params( 100 self, 101 stream_state: Optional[StreamState] = None, 102 stream_slice: Optional[StreamSlice] = None, 103 next_page_token: Optional[Mapping[str, Any]] = None, 104 ) -> Mapping[str, Any]: 105 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 106 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 107 108 def get_request_headers( 109 self, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 115 return self._get_request_option(RequestOptionType.header, stream_slice) 116 117 def get_request_body_data( 118 self, 119 stream_state: Optional[StreamState] = None, 120 stream_slice: Optional[StreamSlice] = None, 121 next_page_token: Optional[Mapping[str, Any]] = None, 122 ) -> Mapping[str, Any]: 123 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 124 return self._get_request_option(RequestOptionType.body_data, stream_slice) 125 126 def get_request_body_json( 127 self, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 133 return self._get_request_option(RequestOptionType.body_json, stream_slice) 134 135 def _get_request_option( 136 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 137 ) -> Mapping[str, Any]: 138 params: MutableMapping[str, Any] = {} 139 if stream_slice: 140 for parent_config in self.parent_stream_configs: 141 if ( 142 parent_config.request_option 143 and parent_config.request_option.inject_into == option_type 144 ): 145 key = parent_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 146 value = stream_slice.get(key) 147 if value: 148 parent_config.request_option.inject_into_request(params, value, self.config) 149 return params 150 151 def stream_slices(self) -> Iterable[StreamSlice]: 152 """ 153 Iterate over each parent stream's record and create a StreamSlice for each record. 154 155 For each stream, iterate over its stream_slices. 156 For each stream slice, iterate over each record. 157 yield a stream slice for each such records. 158 159 If a parent slice contains no record, emit a slice with parent_record=None. 160 161 The template string can interpolate the following values: 162 - parent_stream_slice: mapping representing the parent's stream slice 163 - parent_record: mapping representing the parent record 164 - parent_stream_name: string representing the parent stream name 165 """ 166 if not self.parent_stream_configs: 167 yield from [] 168 else: 169 for parent_stream_config in self.parent_stream_configs: 170 parent_stream = parent_stream_config.stream 171 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 172 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 173 extra_fields = None 174 if parent_stream_config.extra_fields: 175 extra_fields = [ 176 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 177 for field_path in parent_stream_config.extra_fields 178 ] 179 180 # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does 181 # not support either substreams or RFR, but something that needs to be considered once we do 182 for parent_record in parent_stream.read_only_records(): 183 parent_partition = None 184 # Skip non-records (eg AirbyteLogMessage) 185 if isinstance(parent_record, AirbyteMessage): 186 self.logger.warning( 187 f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state." 188 ) 189 if parent_record.type == MessageType.RECORD: 190 parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record 191 else: 192 continue 193 elif isinstance(parent_record, Record): 194 parent_partition = ( 195 parent_record.associated_slice.partition 196 if parent_record.associated_slice 197 else {} 198 ) 199 parent_record = parent_record.data 200 elif not isinstance(parent_record, Mapping): 201 # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid 202 raise AirbyteTracedException( 203 message=f"Parent stream returned records as invalid type {type(parent_record)}" 204 ) 205 try: 206 partition_value = dpath.get( 207 parent_record, # type: ignore [arg-type] 208 parent_field, 209 ) 210 except KeyError: 211 continue 212 213 # Add extra fields 214 extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields) 215 216 if parent_stream_config.lazy_read_pointer: 217 extracted_extra_fields = { 218 "child_response": self._extract_child_response( 219 parent_record, 220 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 221 ), 222 **extracted_extra_fields, 223 } 224 225 yield StreamSlice( 226 partition={ 227 partition_field: partition_value, 228 "parent_slice": parent_partition or {}, 229 }, 230 cursor_slice={}, 231 extra_fields=extracted_extra_fields, 232 ) 233 234 def _extract_child_response( 235 self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString] 236 ) -> requests.Response: 237 """Extract child records from a parent record based on lazy pointers.""" 238 239 def _create_response(data: MutableMapping[str, Any]) -> SafeResponse: 240 """Create a SafeResponse with the given data.""" 241 response = SafeResponse() 242 response.content = json.dumps(data).encode("utf-8") 243 response.status_code = 200 244 return response 245 246 path = [path.eval(self.config) for path in pointer] 247 return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure 248 249 def _extract_extra_fields( 250 self, 251 parent_record: Mapping[str, Any] | AirbyteMessage, 252 extra_fields: Optional[List[List[str]]] = None, 253 ) -> Mapping[str, Any]: 254 """ 255 Extracts additional fields specified by their paths from the parent record. 256 257 Args: 258 parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from. 259 extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record. 260 261 Returns: 262 Mapping[str, Any]: A dictionary containing the extracted fields. 263 The keys are the joined field paths, and the values are the corresponding extracted values. 264 """ 265 extracted_extra_fields = {} 266 if extra_fields: 267 for extra_field_path in extra_fields: 268 try: 269 extra_field_value = dpath.get( 270 parent_record, # type: ignore [arg-type] 271 extra_field_path, 272 ) 273 self.logger.debug( 274 f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}" 275 ) 276 except KeyError: 277 self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}") 278 extra_field_value = None 279 extracted_extra_fields[".".join(extra_field_path)] = extra_field_value 280 return extracted_extra_fields 281 282 def set_initial_state(self, stream_state: StreamState) -> None: 283 """ 284 Set the state of the parent streams. 285 286 If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format. 287 This migration applies only to parent streams with incremental dependencies. 288 289 Args: 290 stream_state (StreamState): The state of the streams to be set. 291 292 Example of state format: 293 { 294 "parent_state": { 295 "parent_stream_name1": { 296 "last_updated": "2023-05-27T00:00:00Z" 297 }, 298 "parent_stream_name2": { 299 "last_updated": "2023-05-27T00:00:00Z" 300 } 301 } 302 } 303 304 Example of migrating to parent state format: 305 - Initial state: 306 { 307 "updated_at": "2023-05-27T00:00:00Z" 308 } 309 - After migration: 310 { 311 "updated_at": "2023-05-27T00:00:00Z", 312 "parent_state": { 313 "parent_stream_name": { 314 "parent_stream_cursor": "2023-05-27T00:00:00Z" 315 } 316 } 317 } 318 """ 319 if not stream_state: 320 return 321 322 parent_state = stream_state.get("parent_state", {}) 323 324 # Set state for each parent stream with an incremental dependency 325 for parent_config in self.parent_stream_configs: 326 if ( 327 not parent_state.get(parent_config.stream.name, {}) 328 and parent_config.incremental_dependency 329 ): 330 # Migrate child state to parent state format 331 parent_state = self._migrate_child_state_to_parent_state(stream_state) 332 333 if parent_config.incremental_dependency: 334 parent_config.stream.state = parent_state.get(parent_config.stream.name, {}) 335 336 def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState: 337 """ 338 Migrate the child or global stream state into the parent stream's state format. 339 340 This method converts the child stream state—or, if present, the global state—into a format that is 341 compatible with parent streams that use incremental synchronization. The migration occurs only for 342 parent streams with incremental dependencies. It filters out per-partition states and retains only the 343 global state in the form {cursor_field: cursor_value}. 344 345 The method supports multiple input formats: 346 - A simple global state, e.g.: 347 {"updated_at": "2023-05-27T00:00:00Z"} 348 - A state object that contains a "state" key (which is assumed to hold the global state), e.g.: 349 {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 350 In this case, the migration uses the first value from the "state" dictionary. 351 - Any per-partition state formats or other non-simple structures are ignored during migration. 352 353 Args: 354 stream_state (StreamState): The state to migrate. Expected formats include: 355 - {"updated_at": "2023-05-27T00:00:00Z"} 356 - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 357 (In this format, only the first global state value is used, and per-partition states are ignored.) 358 359 Returns: 360 StreamState: A migrated state for parent streams in the format: 361 { 362 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 363 } 364 where each parent stream with an incremental dependency is assigned its corresponding cursor value. 365 366 Example: 367 Input: {"updated_at": "2023-05-27T00:00:00Z"} 368 Output: { 369 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 370 } 371 """ 372 substream_state_values = list(stream_state.values()) 373 substream_state = substream_state_values[0] if substream_state_values else {} 374 375 # Ignore per-partition states or invalid formats. 376 if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: 377 # If a global state is present under the key "state", use its first value. 378 if ( 379 "state" in stream_state 380 and isinstance(stream_state["state"], dict) 381 and stream_state["state"] != {} 382 ): 383 substream_state = list(stream_state["state"].values())[0] 384 else: 385 return {} 386 387 # Build the parent state for all parent streams with incremental dependencies. 388 parent_state = {} 389 if substream_state: 390 for parent_config in self.parent_stream_configs: 391 if parent_config.incremental_dependency: 392 parent_state[parent_config.stream.name] = { 393 parent_config.stream.cursor_field: substream_state 394 } 395 396 return parent_state 397 398 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 399 """ 400 Get the state of the parent streams. 401 402 Returns: 403 StreamState: The current state of the parent streams. 404 405 Example of state format: 406 { 407 "parent_stream_name1": { 408 "last_updated": "2023-05-27T00:00:00Z" 409 }, 410 "parent_stream_name2": { 411 "last_updated": "2023-05-27T00:00:00Z" 412 } 413 } 414 """ 415 parent_state = {} 416 for parent_config in self.parent_stream_configs: 417 if parent_config.incremental_dependency: 418 parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state) 419 return parent_state 420 421 @property 422 def logger(self) -> logging.Logger: 423 return logging.getLogger("airbyte.SubstreamPartitionRouter")
Partition router that iterates over the parent's stream records and emits slices
Will populate the state with partition_field
and parent_slice
so they can be accessed by other components
Attributes:
- parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
99 def get_request_params( 100 self, 101 stream_state: Optional[StreamState] = None, 102 stream_slice: Optional[StreamSlice] = None, 103 next_page_token: Optional[Mapping[str, Any]] = None, 104 ) -> Mapping[str, Any]: 105 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 106 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
108 def get_request_headers( 109 self, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 115 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
117 def get_request_body_data( 118 self, 119 stream_state: Optional[StreamState] = None, 120 stream_slice: Optional[StreamSlice] = None, 121 next_page_token: Optional[Mapping[str, Any]] = None, 122 ) -> Mapping[str, Any]: 123 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 124 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
126 def get_request_body_json( 127 self, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 133 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
151 def stream_slices(self) -> Iterable[StreamSlice]: 152 """ 153 Iterate over each parent stream's record and create a StreamSlice for each record. 154 155 For each stream, iterate over its stream_slices. 156 For each stream slice, iterate over each record. 157 yield a stream slice for each such records. 158 159 If a parent slice contains no record, emit a slice with parent_record=None. 160 161 The template string can interpolate the following values: 162 - parent_stream_slice: mapping representing the parent's stream slice 163 - parent_record: mapping representing the parent record 164 - parent_stream_name: string representing the parent stream name 165 """ 166 if not self.parent_stream_configs: 167 yield from [] 168 else: 169 for parent_stream_config in self.parent_stream_configs: 170 parent_stream = parent_stream_config.stream 171 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 172 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 173 extra_fields = None 174 if parent_stream_config.extra_fields: 175 extra_fields = [ 176 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 177 for field_path in parent_stream_config.extra_fields 178 ] 179 180 # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does 181 # not support either substreams or RFR, but something that needs to be considered once we do 182 for parent_record in parent_stream.read_only_records(): 183 parent_partition = None 184 # Skip non-records (eg AirbyteLogMessage) 185 if isinstance(parent_record, AirbyteMessage): 186 self.logger.warning( 187 f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state." 188 ) 189 if parent_record.type == MessageType.RECORD: 190 parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record 191 else: 192 continue 193 elif isinstance(parent_record, Record): 194 parent_partition = ( 195 parent_record.associated_slice.partition 196 if parent_record.associated_slice 197 else {} 198 ) 199 parent_record = parent_record.data 200 elif not isinstance(parent_record, Mapping): 201 # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid 202 raise AirbyteTracedException( 203 message=f"Parent stream returned records as invalid type {type(parent_record)}" 204 ) 205 try: 206 partition_value = dpath.get( 207 parent_record, # type: ignore [arg-type] 208 parent_field, 209 ) 210 except KeyError: 211 continue 212 213 # Add extra fields 214 extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields) 215 216 if parent_stream_config.lazy_read_pointer: 217 extracted_extra_fields = { 218 "child_response": self._extract_child_response( 219 parent_record, 220 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 221 ), 222 **extracted_extra_fields, 223 } 224 225 yield StreamSlice( 226 partition={ 227 partition_field: partition_value, 228 "parent_slice": parent_partition or {}, 229 }, 230 cursor_slice={}, 231 extra_fields=extracted_extra_fields, 232 )
Iterate over each parent stream's record and create a StreamSlice for each record.
For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
The template string can interpolate the following values:
- parent_stream_slice: mapping representing the parent's stream slice
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
282 def set_initial_state(self, stream_state: StreamState) -> None: 283 """ 284 Set the state of the parent streams. 285 286 If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format. 287 This migration applies only to parent streams with incremental dependencies. 288 289 Args: 290 stream_state (StreamState): The state of the streams to be set. 291 292 Example of state format: 293 { 294 "parent_state": { 295 "parent_stream_name1": { 296 "last_updated": "2023-05-27T00:00:00Z" 297 }, 298 "parent_stream_name2": { 299 "last_updated": "2023-05-27T00:00:00Z" 300 } 301 } 302 } 303 304 Example of migrating to parent state format: 305 - Initial state: 306 { 307 "updated_at": "2023-05-27T00:00:00Z" 308 } 309 - After migration: 310 { 311 "updated_at": "2023-05-27T00:00:00Z", 312 "parent_state": { 313 "parent_stream_name": { 314 "parent_stream_cursor": "2023-05-27T00:00:00Z" 315 } 316 } 317 } 318 """ 319 if not stream_state: 320 return 321 322 parent_state = stream_state.get("parent_state", {}) 323 324 # Set state for each parent stream with an incremental dependency 325 for parent_config in self.parent_stream_configs: 326 if ( 327 not parent_state.get(parent_config.stream.name, {}) 328 and parent_config.incremental_dependency 329 ): 330 # Migrate child state to parent state format 331 parent_state = self._migrate_child_state_to_parent_state(stream_state) 332 333 if parent_config.incremental_dependency: 334 parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
Set the state of the parent streams.
If the parent_state
key is missing from stream_state
, migrate the child stream state to the parent stream's state format.
This migration applies only to parent streams with incremental dependencies.
Arguments:
- stream_state (StreamState): The state of the streams to be set.
Example of state format: { "parent_state": { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } } }
Example of migrating to parent state format:
- Initial state: { "updated_at": "2023-05-27T00:00:00Z" }
- After migration: { "updated_at": "2023-05-27T00:00:00Z", "parent_state": { "parent_stream_name": { "parent_stream_cursor": "2023-05-27T00:00:00Z" } } }
398 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 399 """ 400 Get the state of the parent streams. 401 402 Returns: 403 StreamState: The current state of the parent streams. 404 405 Example of state format: 406 { 407 "parent_stream_name1": { 408 "last_updated": "2023-05-27T00:00:00Z" 409 }, 410 "parent_stream_name2": { 411 "last_updated": "2023-05-27T00:00:00Z" 412 } 413 } 414 """ 415 parent_state = {} 416 for parent_config in self.parent_stream_configs: 417 if parent_config.incremental_dependency: 418 parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state) 419 return parent_state
Get the state of the parent streams.
Returns:
StreamState: The current state of the parent streams.
Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }
14@dataclass 15class PartitionRouter(StreamSlicer): 16 """ 17 Base class for partition routers. 18 Methods: 19 set_parent_state(stream_state): Set the state of the parent streams. 20 get_parent_state(): Get the state of the parent streams. 21 """ 22 23 @abstractmethod 24 def set_initial_state(self, stream_state: StreamState) -> None: 25 """ 26 Set the state of the parent streams. 27 28 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 29 incrementally using the state. 30 31 Args: 32 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 33 'parent_state' which is a dictionary of parent state names to their corresponding state. 34 Example: 35 { 36 "parent_state": { 37 "parent_stream_name_1": { ... }, 38 "parent_stream_name_2": { ... }, 39 ... 40 } 41 } 42 """ 43 44 @abstractmethod 45 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 46 """ 47 Get the state of the parent streams. 48 49 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 50 incrementally using the state. 51 52 Returns: 53 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 54 The returned format will be: 55 { 56 "parent_stream_name1": { 57 "last_updated": "2023-05-27T00:00:00Z" 58 }, 59 "parent_stream_name2": { 60 "last_updated": "2023-05-27T00:00:00Z" 61 } 62 } 63 """
Base class for partition routers.
Methods:
set_parent_state(stream_state): Set the state of the parent streams. get_parent_state(): Get the state of the parent streams.
23 @abstractmethod 24 def set_initial_state(self, stream_state: StreamState) -> None: 25 """ 26 Set the state of the parent streams. 27 28 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 29 incrementally using the state. 30 31 Args: 32 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 33 'parent_state' which is a dictionary of parent state names to their corresponding state. 34 Example: 35 { 36 "parent_state": { 37 "parent_stream_name_1": { ... }, 38 "parent_stream_name_2": { ... }, 39 ... 40 } 41 } 42 """
Set the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 'parent_state' which is a dictionary of parent state names to their corresponding state. Example: { "parent_state": { "parent_stream_name_1": { ... }, "parent_stream_name_2": { ... }, ... } }
44 @abstractmethod 45 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 46 """ 47 Get the state of the parent streams. 48 49 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 50 incrementally using the state. 51 52 Returns: 53 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 54 The returned format will be: 55 { 56 "parent_stream_name1": { 57 "last_updated": "2023-05-27T00:00:00Z" 58 }, 59 "parent_stream_name2": { 60 "last_updated": "2023-05-27T00:00:00Z" 61 } 62 } 63 """
Get the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Returns:
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }