airbyte_cdk.sources.declarative.partition_routers
1# 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( 6 AsyncJobPartitionRouter, 7) 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import ( 9 CartesianProductStreamSlicer, 10) 11from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( 12 GroupingPartitionRouter, 13) 14from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ( 15 ListPartitionRouter, 16) 17from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter 18from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( 19 SinglePartitionRouter, 20) 21from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 22 SubstreamPartitionRouter, 23) 24 25__all__ = [ 26 "AsyncJobPartitionRouter", 27 "CartesianProductStreamSlicer", 28 "GroupingPartitionRouter", 29 "ListPartitionRouter", 30 "SinglePartitionRouter", 31 "SubstreamPartitionRouter", 32 "PartitionRouter", 33]
20@dataclass 21class AsyncJobPartitionRouter(StreamSlicer): 22 """ 23 Partition router that creates async jobs in a source API, periodically polls for job 24 completion, and supplies the completed job URL locations as stream slices so that 25 records can be extracted. 26 """ 27 28 config: Config 29 parameters: InitVar[Mapping[str, Any]] 30 job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator] 31 stream_slicer: StreamSlicer = field( 32 default_factory=lambda: SinglePartitionRouter(parameters={}) 33 ) 34 35 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 36 self._job_orchestrator_factory = self.job_orchestrator_factory 37 self._job_orchestrator: Optional[AsyncJobOrchestrator] = None 38 self._parameters = parameters 39 40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 ) 50 51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.
40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 )
Defines stream slices
Returns
An iterable of stream slices
51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
40@dataclass 41class CartesianProductStreamSlicer(PartitionRouter): 42 """ 43 Stream slicers that iterates over the cartesian product of input stream slicers 44 Given 2 stream slicers with the following slices: 45 A: [{"i": 0}, {"i": 1}, {"i": 2}] 46 B: [{"s": "hello"}, {"s": "world"}] 47 the resulting stream slices are 48 [ 49 {"i": 0, "s": "hello"}, 50 {"i": 0, "s": "world"}, 51 {"i": 1, "s": "hello"}, 52 {"i": 1, "s": "world"}, 53 {"i": 2, "s": "hello"}, 54 {"i": 2, "s": "world"}, 55 ] 56 57 Attributes: 58 stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer. 59 """ 60 61 stream_slicers: List[PartitionRouter] 62 parameters: InitVar[Mapping[str, Any]] 63 64 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 65 check_for_substream_in_slicers(self.stream_slicers, self.logger.warning) 66 67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 ) 86 87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 ) 106 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 ) 126 127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 ) 146 147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 154 if len(cursor_slices) > 1: 155 raise ValueError( 156 f"There should only be a single cursor slice. Found {cursor_slices}" 157 ) 158 if cursor_slices: 159 cursor_slice = cursor_slices[0] 160 else: 161 cursor_slice = {} 162 yield StreamSlice( 163 partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields 164 ) 165 166 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 167 """ 168 Parent stream states are not supported for cartesian product stream slicer 169 """ 170 pass 171 172 @property 173 def logger(self) -> logging.Logger: 174 return logging.getLogger("airbyte.CartesianProductStreamSlicer")
Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]
Attributes:
- stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 154 if len(cursor_slices) > 1: 155 raise ValueError( 156 f"There should only be a single cursor slice. Found {cursor_slices}" 157 ) 158 if cursor_slices: 159 cursor_slice = cursor_slices[0] 160 else: 161 cursor_slice = {} 162 yield StreamSlice( 163 partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields 164 )
Defines stream slices
Returns
An iterable of stream slices
166 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 167 """ 168 Parent stream states are not supported for cartesian product stream slicer 169 """ 170 pass
Parent stream states are not supported for cartesian product stream slicer
13@dataclass 14class GroupingPartitionRouter(PartitionRouter): 15 """ 16 A partition router that groups partitions from an underlying partition router into batches of a specified size. 17 This is useful for APIs that support filtering by multiple partition keys in a single request. 18 19 Attributes: 20 group_size (int): The number of partitions to include in each group. 21 underlying_partition_router (PartitionRouter): The partition router whose output will be grouped. 22 deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key. 23 config (Config): The connector configuration. 24 parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration. 25 """ 26 27 group_size: int 28 underlying_partition_router: PartitionRouter 29 config: Config 30 deduplicate: bool = True 31 32 def __post_init__(self) -> None: 33 self._state: Optional[Mapping[str, StreamState]] = {} 34 35 def stream_slices(self) -> Iterable[StreamSlice]: 36 """ 37 Lazily groups partitions from the underlying partition router into batches of size `group_size`. 38 39 This method processes partitions one at a time from the underlying router, maintaining a batch buffer. 40 When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. 41 If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. 42 43 Yields: 44 Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. 45 """ 46 batch = [] 47 seen_keys = set() 48 49 # Iterate over partitions lazily from the underlying router 50 for partition in self.underlying_partition_router.stream_slices(): 51 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) 52 partition_keys = list(partition.partition.keys()) 53 # skip parent_slice as it is part of SubstreamPartitionRouter partition 54 if "parent_slice" in partition_keys: 55 partition_keys.remove("parent_slice") 56 if len(partition_keys) != 1: 57 raise ValueError( 58 f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" 59 ) 60 key = partition.partition[partition_keys[0]] 61 62 # Skip duplicates if deduplication is enabled 63 if self.deduplicate and key in seen_keys: 64 continue 65 66 # Add partition to the batch 67 batch.append(partition) 68 if self.deduplicate: 69 seen_keys.add(key) 70 71 # Yield the batch when it reaches the group_size 72 if len(batch) == self.group_size: 73 self._state = self.underlying_partition_router.get_stream_state() 74 yield self._create_grouped_slice(batch) 75 batch = [] # Reset the batch 76 77 self._state = self.underlying_partition_router.get_stream_state() 78 # Yield any remaining partitions if the batch isn't empty 79 if batch: 80 yield self._create_grouped_slice(batch) 81 82 def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: 83 """ 84 Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values. 85 86 Args: 87 batch (list[StreamSlice]): A list of StreamSlice objects to group. 88 89 Returns: 90 StreamSlice: A single StreamSlice with combined partition and extra field values. 91 """ 92 # Combine partition values into a single dict with lists 93 grouped_partition = { 94 key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() 95 } 96 97 # Aggregate extra fields into a dict with list values 98 extra_fields_dict = ( 99 { 100 key: [p.extra_fields.get(key) for p in batch] 101 for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields)) 102 } 103 if any(p.extra_fields for p in batch) 104 else {} 105 ) 106 return StreamSlice( 107 partition=grouped_partition, 108 cursor_slice={}, # Cursor is managed by the underlying router or incremental sync 109 extra_fields=extra_fields_dict, 110 ) 111 112 def get_request_params( 113 self, 114 stream_state: Optional[StreamState] = None, 115 stream_slice: Optional[StreamSlice] = None, 116 next_page_token: Optional[Mapping[str, Any]] = None, 117 ) -> Mapping[str, Any]: 118 return {} 119 120 def get_request_headers( 121 self, 122 stream_state: Optional[StreamState] = None, 123 stream_slice: Optional[StreamSlice] = None, 124 next_page_token: Optional[Mapping[str, Any]] = None, 125 ) -> Mapping[str, Any]: 126 return {} 127 128 def get_request_body_data( 129 self, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return {} 135 136 def get_request_body_json( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 return {} 143 144 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 145 """Delegate state retrieval to the underlying partition router.""" 146 return self._state
A partition router that groups partitions from an underlying partition router into batches of a specified size. This is useful for APIs that support filtering by multiple partition keys in a single request.
Attributes:
- group_size (int): The number of partitions to include in each group.
- underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
- deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
- config (Config): The connector configuration.
- parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
35 def stream_slices(self) -> Iterable[StreamSlice]: 36 """ 37 Lazily groups partitions from the underlying partition router into batches of size `group_size`. 38 39 This method processes partitions one at a time from the underlying router, maintaining a batch buffer. 40 When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. 41 If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. 42 43 Yields: 44 Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. 45 """ 46 batch = [] 47 seen_keys = set() 48 49 # Iterate over partitions lazily from the underlying router 50 for partition in self.underlying_partition_router.stream_slices(): 51 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) 52 partition_keys = list(partition.partition.keys()) 53 # skip parent_slice as it is part of SubstreamPartitionRouter partition 54 if "parent_slice" in partition_keys: 55 partition_keys.remove("parent_slice") 56 if len(partition_keys) != 1: 57 raise ValueError( 58 f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" 59 ) 60 key = partition.partition[partition_keys[0]] 61 62 # Skip duplicates if deduplication is enabled 63 if self.deduplicate and key in seen_keys: 64 continue 65 66 # Add partition to the batch 67 batch.append(partition) 68 if self.deduplicate: 69 seen_keys.add(key) 70 71 # Yield the batch when it reaches the group_size 72 if len(batch) == self.group_size: 73 self._state = self.underlying_partition_router.get_stream_state() 74 yield self._create_grouped_slice(batch) 75 batch = [] # Reset the batch 76 77 self._state = self.underlying_partition_router.get_stream_state() 78 # Yield any remaining partitions if the batch isn't empty 79 if batch: 80 yield self._create_grouped_slice(batch)
Lazily groups partitions from the underlying partition router into batches of size group_size.
This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
When the buffer reaches group_size or the underlying router is exhausted, it yields a grouped slice.
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
Yields:
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
112 def get_request_params( 113 self, 114 stream_state: Optional[StreamState] = None, 115 stream_slice: Optional[StreamSlice] = None, 116 next_page_token: Optional[Mapping[str, Any]] = None, 117 ) -> Mapping[str, Any]: 118 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
120 def get_request_headers( 121 self, 122 stream_state: Optional[StreamState] = None, 123 stream_slice: Optional[StreamSlice] = None, 124 next_page_token: Optional[Mapping[str, Any]] = None, 125 ) -> Mapping[str, Any]: 126 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
128 def get_request_body_data( 129 self, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136 def get_request_body_json( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
18@dataclass 19class ListPartitionRouter(PartitionRouter): 20 """ 21 Partition router that iterates over the values of a list 22 If values is a string, then evaluate it as literal and assert the resulting literal is a list 23 24 Attributes: 25 values (Union[str, List[str]]): The values to iterate over 26 cursor_field (Union[InterpolatedString, str]): The name of the cursor field 27 config (Config): The user-provided configuration as specified by the source's spec 28 request_option (Optional[RequestOption]): The request option to configure the HTTP request 29 """ 30 31 values: Union[str, List[str]] 32 cursor_field: Union[InterpolatedString, str] 33 config: Config 34 parameters: InitVar[Mapping[str, Any]] 35 request_option: Optional[RequestOption] = None 36 37 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 38 if isinstance(self.values, str): 39 self.values = InterpolatedString.create(self.values, parameters=parameters).eval( 40 self.config 41 ) 42 self._cursor_field = ( 43 InterpolatedString(string=self.cursor_field, parameters=parameters) 44 if isinstance(self.cursor_field, str) 45 else self.cursor_field 46 ) 47 48 self._cursor = None 49 50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 58 59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice) 67 68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice) 76 77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice) 85 86 def stream_slices(self) -> Iterable[StreamSlice]: 87 return [ 88 StreamSlice( 89 partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={} 90 ) 91 for slice_value in self.values 92 ] 93 94 def _get_request_option( 95 self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 96 ) -> Mapping[str, Any]: 97 if ( 98 self.request_option 99 and self.request_option.inject_into == request_option_type 100 and stream_slice 101 ): 102 slice_value = stream_slice.get(self._cursor_field.eval(self.config)) 103 if slice_value: 104 options: MutableMapping[str, Any] = {} 105 self.request_option.inject_into_request(options, slice_value, self.config) 106 return options 107 else: 108 return {} 109 else: 110 return {} 111 112 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 113 """ 114 ListPartitionRouter doesn't have parent streams 115 """ 116 pass
Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list
Attributes:
- values (Union[str, List[str]]): The values to iterate over
- cursor_field (Union[InterpolatedString, str]): The name of the cursor field
- config (Config): The user-provided configuration as specified by the source's spec
- request_option (Optional[RequestOption]): The request option to configure the HTTP request
50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
13@dataclass 14class SinglePartitionRouter(PartitionRouter): 15 """Partition router returning only a stream slice""" 16 17 parameters: InitVar[Mapping[str, Any]] 18 19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {} 26 27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {} 34 35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {} 42 43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {} 50 51 def stream_slices(self) -> Iterable[StreamSlice]: 52 yield StreamSlice(partition={}, cursor_slice={}) 53 54 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 55 """ 56 SinglePartitionRouter doesn't have parent streams 57 """ 58 pass
Partition router returning only a stream slice
19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
108@dataclass 109class SubstreamPartitionRouter(PartitionRouter): 110 """ 111 Partition router that iterates over the parent's stream records and emits slices 112 Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components 113 114 Attributes: 115 parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config 116 """ 117 118 parent_stream_configs: List[ParentStreamConfig] 119 config: Config 120 parameters: InitVar[Mapping[str, Any]] 121 122 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 123 if not self.parent_stream_configs: 124 raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream") 125 self._parameters = parameters 126 127 def get_request_params( 128 self, 129 stream_state: Optional[StreamState] = None, 130 stream_slice: Optional[StreamSlice] = None, 131 next_page_token: Optional[Mapping[str, Any]] = None, 132 ) -> Mapping[str, Any]: 133 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 134 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 135 136 def get_request_headers( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 143 return self._get_request_option(RequestOptionType.header, stream_slice) 144 145 def get_request_body_data( 146 self, 147 stream_state: Optional[StreamState] = None, 148 stream_slice: Optional[StreamSlice] = None, 149 next_page_token: Optional[Mapping[str, Any]] = None, 150 ) -> Mapping[str, Any]: 151 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 152 return self._get_request_option(RequestOptionType.body_data, stream_slice) 153 154 def get_request_body_json( 155 self, 156 stream_state: Optional[StreamState] = None, 157 stream_slice: Optional[StreamSlice] = None, 158 next_page_token: Optional[Mapping[str, Any]] = None, 159 ) -> Mapping[str, Any]: 160 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 161 return self._get_request_option(RequestOptionType.body_json, stream_slice) 162 163 def _get_request_option( 164 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 165 ) -> Mapping[str, Any]: 166 params: MutableMapping[str, Any] = {} 167 if stream_slice: 168 for parent_config in self.parent_stream_configs: 169 if ( 170 parent_config.request_option 171 and parent_config.request_option.inject_into == option_type 172 ): 173 key = parent_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 174 value = stream_slice.get(key) 175 if value: 176 parent_config.request_option.inject_into_request(params, value, self.config) 177 return params 178 179 def stream_slices(self) -> Iterable[StreamSlice]: 180 """ 181 Iterate over each parent stream's record and create a StreamSlice for each record. 182 183 For each stream, iterate over its stream_slices. 184 For each stream slice, iterate over each record. 185 yield a stream slice for each such records. 186 187 If a parent slice contains no record, emit a slice with parent_record=None. 188 189 The template string can interpolate the following values: 190 - parent_stream_slice: mapping representing the parent's stream slice 191 - parent_record: mapping representing the parent record 192 - parent_stream_name: string representing the parent stream name 193 """ 194 if not self.parent_stream_configs: 195 yield from [] 196 else: 197 for parent_stream_config in self.parent_stream_configs: 198 parent_stream = parent_stream_config.stream 199 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 200 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 201 extra_fields = None 202 if parent_stream_config.extra_fields: 203 extra_fields = [ 204 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 205 for field_path in parent_stream_config.extra_fields 206 ] 207 208 for partition, is_last_slice in iterate_with_last_flag( 209 parent_stream.generate_partitions() 210 ): 211 if partition is None: 212 break 213 for parent_record, is_last_record_in_slice in iterate_with_last_flag( 214 partition.read() 215 ): 216 emit_slice = parent_record is not None 217 if parent_record is not None: 218 # In the previous CDK implementation, state management was done internally by the stream. 219 # However, this could cause issues when doing availability check for example as the availability 220 # check would progress the state so state management was moved outside of the read method. 221 # Hence, we need to call the cursor here. 222 # Note that we call observe and close_partition before emitting the associated record as the 223 # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the 224 # record was consumed. 225 parent_stream.cursor.observe(parent_record) 226 parent_partition = ( 227 parent_record.associated_slice.partition 228 if parent_record.associated_slice 229 else {} 230 ) 231 record_data = parent_record.data 232 233 try: 234 partition_value = dpath.get( 235 record_data, # type: ignore [arg-type] 236 parent_field, 237 ) 238 except KeyError: 239 # FIXME a log here would go a long way for debugging 240 emit_slice = False 241 242 if emit_slice: 243 # Add extra fields 244 extracted_extra_fields = self._extract_extra_fields( 245 record_data, extra_fields 246 ) 247 248 if parent_stream_config.lazy_read_pointer: 249 extracted_extra_fields = { 250 "child_response": self._extract_child_response( 251 record_data, 252 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 253 ), 254 **extracted_extra_fields, 255 } 256 257 if is_last_record_in_slice: 258 parent_stream.cursor.close_partition(partition) 259 if is_last_slice: 260 parent_stream.cursor.ensure_at_least_one_state_emitted() 261 262 if emit_slice: 263 yield StreamSlice( 264 partition={ 265 partition_field: partition_value, 266 "parent_slice": parent_partition or {}, 267 }, 268 cursor_slice={}, 269 extra_fields=extracted_extra_fields, 270 ) 271 272 yield from [] 273 274 def _extract_child_response( 275 self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString] 276 ) -> requests.Response: 277 """Extract child records from a parent record based on lazy pointers.""" 278 279 def _create_response(data: MutableMapping[str, Any]) -> SafeResponse: 280 """Create a SafeResponse with the given data.""" 281 response = SafeResponse() 282 response.content = json.dumps(data).encode("utf-8") 283 response.status_code = 200 284 return response 285 286 path = [path.eval(self.config) for path in pointer] 287 return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure 288 289 def _extract_extra_fields( 290 self, 291 parent_record: Mapping[str, Any] | AirbyteMessage, 292 extra_fields: Optional[List[List[str]]] = None, 293 ) -> Mapping[str, Any]: 294 """ 295 Extracts additional fields specified by their paths from the parent record. 296 297 Args: 298 parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from. 299 extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record. 300 301 Returns: 302 Mapping[str, Any]: A dictionary containing the extracted fields. 303 The keys are the joined field paths, and the values are the corresponding extracted values. 304 """ 305 extracted_extra_fields = {} 306 if extra_fields: 307 for extra_field_path in extra_fields: 308 try: 309 extra_field_value = dpath.get( 310 parent_record, # type: ignore [arg-type] 311 extra_field_path, 312 ) 313 self.logger.debug( 314 f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}" 315 ) 316 except KeyError: 317 self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}") 318 extra_field_value = None 319 extracted_extra_fields[".".join(extra_field_path)] = extra_field_value 320 return extracted_extra_fields 321 322 def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState: 323 """ 324 Migrate the child or global stream state into the parent stream's state format. 325 326 This method converts the child stream state—or, if present, the global state—into a format that is 327 compatible with parent streams that use incremental synchronization. The migration occurs only for 328 parent streams with incremental dependencies. It filters out per-partition states and retains only the 329 global state in the form {cursor_field: cursor_value}. 330 331 The method supports multiple input formats: 332 - A simple global state, e.g.: 333 {"updated_at": "2023-05-27T00:00:00Z"} 334 - A state object that contains a "state" key (which is assumed to hold the global state), e.g.: 335 {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 336 In this case, the migration uses the first value from the "state" dictionary. 337 - Any per-partition state formats or other non-simple structures are ignored during migration. 338 339 Args: 340 stream_state (StreamState): The state to migrate. Expected formats include: 341 - {"updated_at": "2023-05-27T00:00:00Z"} 342 - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 343 (In this format, only the first global state value is used, and per-partition states are ignored.) 344 345 Returns: 346 StreamState: A migrated state for parent streams in the format: 347 { 348 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 349 } 350 where each parent stream with an incremental dependency is assigned its corresponding cursor value. 351 352 Example: 353 Input: {"updated_at": "2023-05-27T00:00:00Z"} 354 Output: { 355 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 356 } 357 """ 358 substream_state_values = list(stream_state.values()) 359 substream_state = substream_state_values[0] if substream_state_values else {} 360 361 # Ignore per-partition states or invalid formats. 362 if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: 363 # If a global state is present under the key "state", use its first value. 364 if ( 365 "state" in stream_state 366 and isinstance(stream_state["state"], dict) 367 and stream_state["state"] != {} 368 ): 369 substream_state = list(stream_state["state"].values())[0] 370 else: 371 return {} 372 373 # Build the parent state for all parent streams with incremental dependencies. 374 parent_state = {} 375 if substream_state: 376 for parent_config in self.parent_stream_configs: 377 if parent_config.incremental_dependency: 378 parent_state[parent_config.stream.name] = { 379 parent_config.stream.cursor_field: substream_state 380 } 381 382 return parent_state 383 384 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 385 """ 386 Get the state of the parent streams. 387 388 Returns: 389 StreamState: The current state of the parent streams. 390 391 Example of state format: 392 { 393 "parent_stream_name1": { 394 "last_updated": "2023-05-27T00:00:00Z" 395 }, 396 "parent_stream_name2": { 397 "last_updated": "2023-05-27T00:00:00Z" 398 } 399 } 400 """ 401 parent_state = {} 402 for parent_config in self.parent_stream_configs: 403 if parent_config.incremental_dependency: 404 parent_state[parent_config.stream.name] = copy.deepcopy( 405 parent_config.stream.cursor.state 406 ) 407 return parent_state 408 409 @property 410 def logger(self) -> logging.Logger: 411 return logging.getLogger("airbyte.SubstreamPartitionRouter")
Partition router that iterates over the parent's stream records and emits slices
Will populate the state with partition_field and parent_slice so they can be accessed by other components
Attributes:
- parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
127 def get_request_params( 128 self, 129 stream_state: Optional[StreamState] = None, 130 stream_slice: Optional[StreamSlice] = None, 131 next_page_token: Optional[Mapping[str, Any]] = None, 132 ) -> Mapping[str, Any]: 133 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 134 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
136 def get_request_headers( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 143 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
145 def get_request_body_data( 146 self, 147 stream_state: Optional[StreamState] = None, 148 stream_slice: Optional[StreamSlice] = None, 149 next_page_token: Optional[Mapping[str, Any]] = None, 150 ) -> Mapping[str, Any]: 151 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 152 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
154 def get_request_body_json( 155 self, 156 stream_state: Optional[StreamState] = None, 157 stream_slice: Optional[StreamSlice] = None, 158 next_page_token: Optional[Mapping[str, Any]] = None, 159 ) -> Mapping[str, Any]: 160 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 161 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
179 def stream_slices(self) -> Iterable[StreamSlice]: 180 """ 181 Iterate over each parent stream's record and create a StreamSlice for each record. 182 183 For each stream, iterate over its stream_slices. 184 For each stream slice, iterate over each record. 185 yield a stream slice for each such records. 186 187 If a parent slice contains no record, emit a slice with parent_record=None. 188 189 The template string can interpolate the following values: 190 - parent_stream_slice: mapping representing the parent's stream slice 191 - parent_record: mapping representing the parent record 192 - parent_stream_name: string representing the parent stream name 193 """ 194 if not self.parent_stream_configs: 195 yield from [] 196 else: 197 for parent_stream_config in self.parent_stream_configs: 198 parent_stream = parent_stream_config.stream 199 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 200 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 201 extra_fields = None 202 if parent_stream_config.extra_fields: 203 extra_fields = [ 204 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 205 for field_path in parent_stream_config.extra_fields 206 ] 207 208 for partition, is_last_slice in iterate_with_last_flag( 209 parent_stream.generate_partitions() 210 ): 211 if partition is None: 212 break 213 for parent_record, is_last_record_in_slice in iterate_with_last_flag( 214 partition.read() 215 ): 216 emit_slice = parent_record is not None 217 if parent_record is not None: 218 # In the previous CDK implementation, state management was done internally by the stream. 219 # However, this could cause issues when doing availability check for example as the availability 220 # check would progress the state so state management was moved outside of the read method. 221 # Hence, we need to call the cursor here. 222 # Note that we call observe and close_partition before emitting the associated record as the 223 # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the 224 # record was consumed. 225 parent_stream.cursor.observe(parent_record) 226 parent_partition = ( 227 parent_record.associated_slice.partition 228 if parent_record.associated_slice 229 else {} 230 ) 231 record_data = parent_record.data 232 233 try: 234 partition_value = dpath.get( 235 record_data, # type: ignore [arg-type] 236 parent_field, 237 ) 238 except KeyError: 239 # FIXME a log here would go a long way for debugging 240 emit_slice = False 241 242 if emit_slice: 243 # Add extra fields 244 extracted_extra_fields = self._extract_extra_fields( 245 record_data, extra_fields 246 ) 247 248 if parent_stream_config.lazy_read_pointer: 249 extracted_extra_fields = { 250 "child_response": self._extract_child_response( 251 record_data, 252 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 253 ), 254 **extracted_extra_fields, 255 } 256 257 if is_last_record_in_slice: 258 parent_stream.cursor.close_partition(partition) 259 if is_last_slice: 260 parent_stream.cursor.ensure_at_least_one_state_emitted() 261 262 if emit_slice: 263 yield StreamSlice( 264 partition={ 265 partition_field: partition_value, 266 "parent_slice": parent_partition or {}, 267 }, 268 cursor_slice={}, 269 extra_fields=extracted_extra_fields, 270 ) 271 272 yield from []
Iterate over each parent stream's record and create a StreamSlice for each record.
For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
The template string can interpolate the following values:
- parent_stream_slice: mapping representing the parent's stream slice
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
384 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 385 """ 386 Get the state of the parent streams. 387 388 Returns: 389 StreamState: The current state of the parent streams. 390 391 Example of state format: 392 { 393 "parent_stream_name1": { 394 "last_updated": "2023-05-27T00:00:00Z" 395 }, 396 "parent_stream_name2": { 397 "last_updated": "2023-05-27T00:00:00Z" 398 } 399 } 400 """ 401 parent_state = {} 402 for parent_config in self.parent_stream_configs: 403 if parent_config.incremental_dependency: 404 parent_state[parent_config.stream.name] = copy.deepcopy( 405 parent_config.stream.cursor.state 406 ) 407 return parent_state
Get the state of the parent streams.
Returns:
StreamState: The current state of the parent streams.
Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }
14@dataclass 15class PartitionRouter(StreamSlicer): 16 """ 17 Base class for partition routers. 18 Methods: 19 get_stream_state(): Get the state of the parent streams. 20 """ 21 22 @abstractmethod 23 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 24 """ 25 Get the state of the parent streams. 26 27 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 28 incrementally using the state. 29 30 Returns: 31 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 32 The returned format will be: 33 { 34 "parent_stream_name1": { 35 "last_updated": "2023-05-27T00:00:00Z" 36 }, 37 "parent_stream_name2": { 38 "last_updated": "2023-05-27T00:00:00Z" 39 } 40 } 41 """
Base class for partition routers.
Methods:
get_stream_state(): Get the state of the parent streams.
22 @abstractmethod 23 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 24 """ 25 Get the state of the parent streams. 26 27 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 28 incrementally using the state. 29 30 Returns: 31 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 32 The returned format will be: 33 { 34 "parent_stream_name1": { 35 "last_updated": "2023-05-27T00:00:00Z" 36 }, 37 "parent_stream_name2": { 38 "last_updated": "2023-05-27T00:00:00Z" 39 } 40 } 41 """
Get the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Returns:
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }