airbyte_cdk.sources.declarative.partition_routers
1# 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( 6 AsyncJobPartitionRouter, 7) 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import ( 9 CartesianProductStreamSlicer, 10) 11from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( 12 GroupingPartitionRouter, 13) 14from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ( 15 ListPartitionRouter, 16) 17from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter 18from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( 19 SinglePartitionRouter, 20) 21from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 22 SubstreamPartitionRouter, 23) 24 25__all__ = [ 26 "AsyncJobPartitionRouter", 27 "CartesianProductStreamSlicer", 28 "GroupingPartitionRouter", 29 "ListPartitionRouter", 30 "SinglePartitionRouter", 31 "SubstreamPartitionRouter", 32 "PartitionRouter", 33]
20@dataclass 21class AsyncJobPartitionRouter(StreamSlicer): 22 """ 23 Partition router that creates async jobs in a source API, periodically polls for job 24 completion, and supplies the completed job URL locations as stream slices so that 25 records can be extracted. 26 """ 27 28 config: Config 29 parameters: InitVar[Mapping[str, Any]] 30 job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator] 31 stream_slicer: StreamSlicer = field( 32 default_factory=lambda: SinglePartitionRouter(parameters={}) 33 ) 34 35 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 36 self._job_orchestrator_factory = self.job_orchestrator_factory 37 self._job_orchestrator: Optional[AsyncJobOrchestrator] = None 38 self._parameters = parameters 39 40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 ) 50 51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.
40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 )
Defines stream slices
Returns
An iterable of stream slices
51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
40@dataclass 41class CartesianProductStreamSlicer(PartitionRouter): 42 """ 43 Stream slicers that iterates over the cartesian product of input stream slicers 44 Given 2 stream slicers with the following slices: 45 A: [{"i": 0}, {"i": 1}, {"i": 2}] 46 B: [{"s": "hello"}, {"s": "world"}] 47 the resulting stream slices are 48 [ 49 {"i": 0, "s": "hello"}, 50 {"i": 0, "s": "world"}, 51 {"i": 1, "s": "hello"}, 52 {"i": 1, "s": "world"}, 53 {"i": 2, "s": "hello"}, 54 {"i": 2, "s": "world"}, 55 ] 56 57 Attributes: 58 stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer. 59 """ 60 61 stream_slicers: List[PartitionRouter] 62 parameters: InitVar[Mapping[str, Any]] 63 64 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 65 check_for_substream_in_slicers(self.stream_slicers, self.logger.warning) 66 67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 ) 86 87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 ) 106 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 ) 126 127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 ) 146 147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 154 if len(cursor_slices) > 1: 155 raise ValueError( 156 f"There should only be a single cursor slice. Found {cursor_slices}" 157 ) 158 if cursor_slices: 159 cursor_slice = cursor_slices[0] 160 else: 161 cursor_slice = {} 162 yield StreamSlice( 163 partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields 164 ) 165 166 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 167 """ 168 Parent stream states are not supported for cartesian product stream slicer 169 """ 170 pass 171 172 @property 173 def logger(self) -> logging.Logger: 174 return logging.getLogger("airbyte.CartesianProductStreamSlicer")
Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]
Attributes:
- stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 extra_fields = dict(ChainMap(*[s.extra_fields for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 154 if len(cursor_slices) > 1: 155 raise ValueError( 156 f"There should only be a single cursor slice. Found {cursor_slices}" 157 ) 158 if cursor_slices: 159 cursor_slice = cursor_slices[0] 160 else: 161 cursor_slice = {} 162 yield StreamSlice( 163 partition=partition, cursor_slice=cursor_slice, extra_fields=extra_fields 164 )
Defines stream slices
Returns
An iterable of stream slices
166 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 167 """ 168 Parent stream states are not supported for cartesian product stream slicer 169 """ 170 pass
Parent stream states are not supported for cartesian product stream slicer
13@dataclass 14class GroupingPartitionRouter(PartitionRouter): 15 """ 16 A partition router that groups partitions from an underlying partition router into batches of a specified size. 17 This is useful for APIs that support filtering by multiple partition keys in a single request. 18 19 Attributes: 20 group_size (int): The number of partitions to include in each group. 21 underlying_partition_router (PartitionRouter): The partition router whose output will be grouped. 22 deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key. 23 config (Config): The connector configuration. 24 parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration. 25 """ 26 27 group_size: int 28 underlying_partition_router: PartitionRouter 29 config: Config 30 deduplicate: bool = True 31 32 def __post_init__(self) -> None: 33 self._state: Optional[Mapping[str, StreamState]] = {} 34 35 def stream_slices(self) -> Iterable[StreamSlice]: 36 """ 37 Lazily groups partitions from the underlying partition router into batches of size `group_size`. 38 39 This method processes partitions one at a time from the underlying router, maintaining a batch buffer. 40 When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. 41 If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. 42 43 Yields: 44 Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. 45 """ 46 batch = [] 47 seen_keys = set() 48 49 # Iterate over partitions lazily from the underlying router 50 for partition in self.underlying_partition_router.stream_slices(): 51 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) 52 partition_keys = list(partition.partition.keys()) 53 # skip parent_slice as it is part of SubstreamPartitionRouter partition 54 if "parent_slice" in partition_keys: 55 partition_keys.remove("parent_slice") 56 if len(partition_keys) != 1: 57 raise ValueError( 58 f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" 59 ) 60 key = partition.partition[partition_keys[0]] 61 62 # Skip duplicates if deduplication is enabled 63 if self.deduplicate and key in seen_keys: 64 continue 65 66 # Add partition to the batch 67 batch.append(partition) 68 if self.deduplicate: 69 seen_keys.add(key) 70 71 # Yield the batch when it reaches the group_size 72 if len(batch) == self.group_size: 73 self._state = self.underlying_partition_router.get_stream_state() 74 yield self._create_grouped_slice(batch) 75 batch = [] # Reset the batch 76 77 self._state = self.underlying_partition_router.get_stream_state() 78 # Yield any remaining partitions if the batch isn't empty 79 if batch: 80 yield self._create_grouped_slice(batch) 81 82 def _create_grouped_slice(self, batch: list[StreamSlice]) -> StreamSlice: 83 """ 84 Creates a grouped StreamSlice from a batch of partitions, aggregating extra fields into a dictionary with list values. 85 86 Args: 87 batch (list[StreamSlice]): A list of StreamSlice objects to group. 88 89 Returns: 90 StreamSlice: A single StreamSlice with combined partition and extra field values. 91 """ 92 # Combine partition values into a single dict with lists 93 grouped_partition = { 94 key: [p.partition.get(key) for p in batch] for key in batch[0].partition.keys() 95 } 96 97 # Aggregate extra fields into a dict with list values 98 extra_fields_dict = ( 99 { 100 key: [p.extra_fields.get(key) for p in batch] 101 for key in set().union(*(p.extra_fields.keys() for p in batch if p.extra_fields)) 102 } 103 if any(p.extra_fields for p in batch) 104 else {} 105 ) 106 return StreamSlice( 107 partition=grouped_partition, 108 cursor_slice={}, # Cursor is managed by the underlying router or incremental sync 109 extra_fields=extra_fields_dict, 110 ) 111 112 def get_request_params( 113 self, 114 stream_state: Optional[StreamState] = None, 115 stream_slice: Optional[StreamSlice] = None, 116 next_page_token: Optional[Mapping[str, Any]] = None, 117 ) -> Mapping[str, Any]: 118 return {} 119 120 def get_request_headers( 121 self, 122 stream_state: Optional[StreamState] = None, 123 stream_slice: Optional[StreamSlice] = None, 124 next_page_token: Optional[Mapping[str, Any]] = None, 125 ) -> Mapping[str, Any]: 126 return {} 127 128 def get_request_body_data( 129 self, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return {} 135 136 def get_request_body_json( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 return {} 143 144 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 145 """Delegate state retrieval to the underlying partition router.""" 146 return self._state
A partition router that groups partitions from an underlying partition router into batches of a specified size. This is useful for APIs that support filtering by multiple partition keys in a single request.
Attributes:
- group_size (int): The number of partitions to include in each group.
- underlying_partition_router (PartitionRouter): The partition router whose output will be grouped.
- deduplicate (bool): If True, ensures unique partitions within each group by removing duplicates based on the partition key.
- config (Config): The connector configuration.
- parameters (Mapping[str, Any]): Additional parameters for interpolation and configuration.
35 def stream_slices(self) -> Iterable[StreamSlice]: 36 """ 37 Lazily groups partitions from the underlying partition router into batches of size `group_size`. 38 39 This method processes partitions one at a time from the underlying router, maintaining a batch buffer. 40 When the buffer reaches `group_size` or the underlying router is exhausted, it yields a grouped slice. 41 If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch. 42 43 Yields: 44 Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values. 45 """ 46 batch = [] 47 seen_keys = set() 48 49 # Iterate over partitions lazily from the underlying router 50 for partition in self.underlying_partition_router.stream_slices(): 51 # Extract the partition key (assuming single key-value pair, e.g., {"board_ids": value}) 52 partition_keys = list(partition.partition.keys()) 53 # skip parent_slice as it is part of SubstreamPartitionRouter partition 54 if "parent_slice" in partition_keys: 55 partition_keys.remove("parent_slice") 56 if len(partition_keys) != 1: 57 raise ValueError( 58 f"GroupingPartitionRouter expects a single partition key-value pair. Got {partition.partition}" 59 ) 60 key = partition.partition[partition_keys[0]] 61 62 # Skip duplicates if deduplication is enabled 63 if self.deduplicate and key in seen_keys: 64 continue 65 66 # Add partition to the batch 67 batch.append(partition) 68 if self.deduplicate: 69 seen_keys.add(key) 70 71 # Yield the batch when it reaches the group_size 72 if len(batch) == self.group_size: 73 self._state = self.underlying_partition_router.get_stream_state() 74 yield self._create_grouped_slice(batch) 75 batch = [] # Reset the batch 76 77 self._state = self.underlying_partition_router.get_stream_state() 78 # Yield any remaining partitions if the batch isn't empty 79 if batch: 80 yield self._create_grouped_slice(batch)
Lazily groups partitions from the underlying partition router into batches of size group_size
.
This method processes partitions one at a time from the underlying router, maintaining a batch buffer.
When the buffer reaches group_size
or the underlying router is exhausted, it yields a grouped slice.
If deduplication is enabled, it tracks seen partition keys to ensure uniqueness within the current batch.
Yields:
Iterable[StreamSlice]: An iterable of StreamSlice objects, where each slice contains a batch of partition values.
112 def get_request_params( 113 self, 114 stream_state: Optional[StreamState] = None, 115 stream_slice: Optional[StreamSlice] = None, 116 next_page_token: Optional[Mapping[str, Any]] = None, 117 ) -> Mapping[str, Any]: 118 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
120 def get_request_headers( 121 self, 122 stream_state: Optional[StreamState] = None, 123 stream_slice: Optional[StreamSlice] = None, 124 next_page_token: Optional[Mapping[str, Any]] = None, 125 ) -> Mapping[str, Any]: 126 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
128 def get_request_body_data( 129 self, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
136 def get_request_body_json( 137 self, 138 stream_state: Optional[StreamState] = None, 139 stream_slice: Optional[StreamSlice] = None, 140 next_page_token: Optional[Mapping[str, Any]] = None, 141 ) -> Mapping[str, Any]: 142 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
18@dataclass 19class ListPartitionRouter(PartitionRouter): 20 """ 21 Partition router that iterates over the values of a list 22 If values is a string, then evaluate it as literal and assert the resulting literal is a list 23 24 Attributes: 25 values (Union[str, List[str]]): The values to iterate over 26 cursor_field (Union[InterpolatedString, str]): The name of the cursor field 27 config (Config): The user-provided configuration as specified by the source's spec 28 request_option (Optional[RequestOption]): The request option to configure the HTTP request 29 """ 30 31 values: Union[str, List[str]] 32 cursor_field: Union[InterpolatedString, str] 33 config: Config 34 parameters: InitVar[Mapping[str, Any]] 35 request_option: Optional[RequestOption] = None 36 37 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 38 if isinstance(self.values, str): 39 self.values = InterpolatedString.create(self.values, parameters=parameters).eval( 40 self.config 41 ) 42 self._cursor_field = ( 43 InterpolatedString(string=self.cursor_field, parameters=parameters) 44 if isinstance(self.cursor_field, str) 45 else self.cursor_field 46 ) 47 48 self._cursor = None 49 50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 58 59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice) 67 68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice) 76 77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice) 85 86 def stream_slices(self) -> Iterable[StreamSlice]: 87 return [ 88 StreamSlice( 89 partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={} 90 ) 91 for slice_value in self.values 92 ] 93 94 def _get_request_option( 95 self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 96 ) -> Mapping[str, Any]: 97 if ( 98 self.request_option 99 and self.request_option.inject_into == request_option_type 100 and stream_slice 101 ): 102 slice_value = stream_slice.get(self._cursor_field.eval(self.config)) 103 if slice_value: 104 options: MutableMapping[str, Any] = {} 105 self.request_option.inject_into_request(options, slice_value, self.config) 106 return options 107 else: 108 return {} 109 else: 110 return {} 111 112 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 113 """ 114 ListPartitionRouter doesn't have parent streams 115 """ 116 pass
Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list
Attributes:
- values (Union[str, List[str]]): The values to iterate over
- cursor_field (Union[InterpolatedString, str]): The name of the cursor field
- config (Config): The user-provided configuration as specified by the source's spec
- request_option (Optional[RequestOption]): The request option to configure the HTTP request
50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
13@dataclass 14class SinglePartitionRouter(PartitionRouter): 15 """Partition router returning only a stream slice""" 16 17 parameters: InitVar[Mapping[str, Any]] 18 19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {} 26 27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {} 34 35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {} 42 43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {} 50 51 def stream_slices(self) -> Iterable[StreamSlice]: 52 yield StreamSlice(partition={}, cursor_slice={}) 53 54 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 55 """ 56 SinglePartitionRouter doesn't have parent streams 57 """ 58 pass
Partition router returning only a stream slice
19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
107@dataclass 108class SubstreamPartitionRouter(PartitionRouter): 109 """ 110 Partition router that iterates over the parent's stream records and emits slices 111 Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components 112 113 Attributes: 114 parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config 115 """ 116 117 parent_stream_configs: List[ParentStreamConfig] 118 config: Config 119 parameters: InitVar[Mapping[str, Any]] 120 121 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 122 if not self.parent_stream_configs: 123 raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream") 124 self._parameters = parameters 125 126 def get_request_params( 127 self, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 133 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 134 135 def get_request_headers( 136 self, 137 stream_state: Optional[StreamState] = None, 138 stream_slice: Optional[StreamSlice] = None, 139 next_page_token: Optional[Mapping[str, Any]] = None, 140 ) -> Mapping[str, Any]: 141 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 142 return self._get_request_option(RequestOptionType.header, stream_slice) 143 144 def get_request_body_data( 145 self, 146 stream_state: Optional[StreamState] = None, 147 stream_slice: Optional[StreamSlice] = None, 148 next_page_token: Optional[Mapping[str, Any]] = None, 149 ) -> Mapping[str, Any]: 150 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 151 return self._get_request_option(RequestOptionType.body_data, stream_slice) 152 153 def get_request_body_json( 154 self, 155 stream_state: Optional[StreamState] = None, 156 stream_slice: Optional[StreamSlice] = None, 157 next_page_token: Optional[Mapping[str, Any]] = None, 158 ) -> Mapping[str, Any]: 159 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 160 return self._get_request_option(RequestOptionType.body_json, stream_slice) 161 162 def _get_request_option( 163 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 164 ) -> Mapping[str, Any]: 165 params: MutableMapping[str, Any] = {} 166 if stream_slice: 167 for parent_config in self.parent_stream_configs: 168 if ( 169 parent_config.request_option 170 and parent_config.request_option.inject_into == option_type 171 ): 172 key = parent_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 173 value = stream_slice.get(key) 174 if value: 175 parent_config.request_option.inject_into_request(params, value, self.config) 176 return params 177 178 def stream_slices(self) -> Iterable[StreamSlice]: 179 """ 180 Iterate over each parent stream's record and create a StreamSlice for each record. 181 182 For each stream, iterate over its stream_slices. 183 For each stream slice, iterate over each record. 184 yield a stream slice for each such records. 185 186 If a parent slice contains no record, emit a slice with parent_record=None. 187 188 The template string can interpolate the following values: 189 - parent_stream_slice: mapping representing the parent's stream slice 190 - parent_record: mapping representing the parent record 191 - parent_stream_name: string representing the parent stream name 192 """ 193 if not self.parent_stream_configs: 194 yield from [] 195 else: 196 for parent_stream_config in self.parent_stream_configs: 197 parent_stream = parent_stream_config.stream 198 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 199 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 200 extra_fields = None 201 if parent_stream_config.extra_fields: 202 extra_fields = [ 203 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 204 for field_path in parent_stream_config.extra_fields 205 ] 206 207 for partition, is_last_slice in iterate_with_last_flag( 208 parent_stream.generate_partitions() 209 ): 210 for parent_record, is_last_record_in_slice in iterate_with_last_flag( 211 partition.read() 212 ): 213 # In the previous CDK implementation, state management was done internally by the stream. 214 # However, this could cause issues when doing availability check for example as the availability 215 # check would progress the state so state management was moved outside of the read method. 216 # Hence, we need to call the cursor here. 217 # Note that we call observe and close_partition before emitting the associated record as the 218 # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the 219 # record was consumed. 220 parent_stream.cursor.observe(parent_record) 221 parent_partition = ( 222 parent_record.associated_slice.partition 223 if parent_record.associated_slice 224 else {} 225 ) 226 record_data = parent_record.data 227 228 try: 229 partition_value = dpath.get( 230 record_data, # type: ignore [arg-type] 231 parent_field, 232 ) 233 except KeyError: 234 # FIXME a log here would go a long way for debugging 235 continue 236 237 # Add extra fields 238 extracted_extra_fields = self._extract_extra_fields( 239 record_data, extra_fields 240 ) 241 242 if parent_stream_config.lazy_read_pointer: 243 extracted_extra_fields = { 244 "child_response": self._extract_child_response( 245 record_data, 246 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 247 ), 248 **extracted_extra_fields, 249 } 250 251 if is_last_record_in_slice: 252 parent_stream.cursor.close_partition(partition) 253 if is_last_slice: 254 parent_stream.cursor.ensure_at_least_one_state_emitted() 255 256 yield StreamSlice( 257 partition={ 258 partition_field: partition_value, 259 "parent_slice": parent_partition or {}, 260 }, 261 cursor_slice={}, 262 extra_fields=extracted_extra_fields, 263 ) 264 265 yield from [] 266 267 def _extract_child_response( 268 self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString] 269 ) -> requests.Response: 270 """Extract child records from a parent record based on lazy pointers.""" 271 272 def _create_response(data: MutableMapping[str, Any]) -> SafeResponse: 273 """Create a SafeResponse with the given data.""" 274 response = SafeResponse() 275 response.content = json.dumps(data).encode("utf-8") 276 response.status_code = 200 277 return response 278 279 path = [path.eval(self.config) for path in pointer] 280 return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure 281 282 def _extract_extra_fields( 283 self, 284 parent_record: Mapping[str, Any] | AirbyteMessage, 285 extra_fields: Optional[List[List[str]]] = None, 286 ) -> Mapping[str, Any]: 287 """ 288 Extracts additional fields specified by their paths from the parent record. 289 290 Args: 291 parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from. 292 extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record. 293 294 Returns: 295 Mapping[str, Any]: A dictionary containing the extracted fields. 296 The keys are the joined field paths, and the values are the corresponding extracted values. 297 """ 298 extracted_extra_fields = {} 299 if extra_fields: 300 for extra_field_path in extra_fields: 301 try: 302 extra_field_value = dpath.get( 303 parent_record, # type: ignore [arg-type] 304 extra_field_path, 305 ) 306 self.logger.debug( 307 f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}" 308 ) 309 except KeyError: 310 self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}") 311 extra_field_value = None 312 extracted_extra_fields[".".join(extra_field_path)] = extra_field_value 313 return extracted_extra_fields 314 315 def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState: 316 """ 317 Migrate the child or global stream state into the parent stream's state format. 318 319 This method converts the child stream state—or, if present, the global state—into a format that is 320 compatible with parent streams that use incremental synchronization. The migration occurs only for 321 parent streams with incremental dependencies. It filters out per-partition states and retains only the 322 global state in the form {cursor_field: cursor_value}. 323 324 The method supports multiple input formats: 325 - A simple global state, e.g.: 326 {"updated_at": "2023-05-27T00:00:00Z"} 327 - A state object that contains a "state" key (which is assumed to hold the global state), e.g.: 328 {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 329 In this case, the migration uses the first value from the "state" dictionary. 330 - Any per-partition state formats or other non-simple structures are ignored during migration. 331 332 Args: 333 stream_state (StreamState): The state to migrate. Expected formats include: 334 - {"updated_at": "2023-05-27T00:00:00Z"} 335 - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 336 (In this format, only the first global state value is used, and per-partition states are ignored.) 337 338 Returns: 339 StreamState: A migrated state for parent streams in the format: 340 { 341 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 342 } 343 where each parent stream with an incremental dependency is assigned its corresponding cursor value. 344 345 Example: 346 Input: {"updated_at": "2023-05-27T00:00:00Z"} 347 Output: { 348 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 349 } 350 """ 351 substream_state_values = list(stream_state.values()) 352 substream_state = substream_state_values[0] if substream_state_values else {} 353 354 # Ignore per-partition states or invalid formats. 355 if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: 356 # If a global state is present under the key "state", use its first value. 357 if ( 358 "state" in stream_state 359 and isinstance(stream_state["state"], dict) 360 and stream_state["state"] != {} 361 ): 362 substream_state = list(stream_state["state"].values())[0] 363 else: 364 return {} 365 366 # Build the parent state for all parent streams with incremental dependencies. 367 parent_state = {} 368 if substream_state: 369 for parent_config in self.parent_stream_configs: 370 if parent_config.incremental_dependency: 371 parent_state[parent_config.stream.name] = { 372 parent_config.stream.cursor_field: substream_state 373 } 374 375 return parent_state 376 377 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 378 """ 379 Get the state of the parent streams. 380 381 Returns: 382 StreamState: The current state of the parent streams. 383 384 Example of state format: 385 { 386 "parent_stream_name1": { 387 "last_updated": "2023-05-27T00:00:00Z" 388 }, 389 "parent_stream_name2": { 390 "last_updated": "2023-05-27T00:00:00Z" 391 } 392 } 393 """ 394 parent_state = {} 395 for parent_config in self.parent_stream_configs: 396 if parent_config.incremental_dependency: 397 parent_state[parent_config.stream.name] = copy.deepcopy( 398 parent_config.stream.cursor.state 399 ) 400 return parent_state 401 402 @property 403 def logger(self) -> logging.Logger: 404 return logging.getLogger("airbyte.SubstreamPartitionRouter")
Partition router that iterates over the parent's stream records and emits slices
Will populate the state with partition_field
and parent_slice
so they can be accessed by other components
Attributes:
- parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
126 def get_request_params( 127 self, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 133 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
135 def get_request_headers( 136 self, 137 stream_state: Optional[StreamState] = None, 138 stream_slice: Optional[StreamSlice] = None, 139 next_page_token: Optional[Mapping[str, Any]] = None, 140 ) -> Mapping[str, Any]: 141 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 142 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
144 def get_request_body_data( 145 self, 146 stream_state: Optional[StreamState] = None, 147 stream_slice: Optional[StreamSlice] = None, 148 next_page_token: Optional[Mapping[str, Any]] = None, 149 ) -> Mapping[str, Any]: 150 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 151 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
153 def get_request_body_json( 154 self, 155 stream_state: Optional[StreamState] = None, 156 stream_slice: Optional[StreamSlice] = None, 157 next_page_token: Optional[Mapping[str, Any]] = None, 158 ) -> Mapping[str, Any]: 159 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 160 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
178 def stream_slices(self) -> Iterable[StreamSlice]: 179 """ 180 Iterate over each parent stream's record and create a StreamSlice for each record. 181 182 For each stream, iterate over its stream_slices. 183 For each stream slice, iterate over each record. 184 yield a stream slice for each such records. 185 186 If a parent slice contains no record, emit a slice with parent_record=None. 187 188 The template string can interpolate the following values: 189 - parent_stream_slice: mapping representing the parent's stream slice 190 - parent_record: mapping representing the parent record 191 - parent_stream_name: string representing the parent stream name 192 """ 193 if not self.parent_stream_configs: 194 yield from [] 195 else: 196 for parent_stream_config in self.parent_stream_configs: 197 parent_stream = parent_stream_config.stream 198 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 199 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 200 extra_fields = None 201 if parent_stream_config.extra_fields: 202 extra_fields = [ 203 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 204 for field_path in parent_stream_config.extra_fields 205 ] 206 207 for partition, is_last_slice in iterate_with_last_flag( 208 parent_stream.generate_partitions() 209 ): 210 for parent_record, is_last_record_in_slice in iterate_with_last_flag( 211 partition.read() 212 ): 213 # In the previous CDK implementation, state management was done internally by the stream. 214 # However, this could cause issues when doing availability check for example as the availability 215 # check would progress the state so state management was moved outside of the read method. 216 # Hence, we need to call the cursor here. 217 # Note that we call observe and close_partition before emitting the associated record as the 218 # ConcurrentPerPartitionCursor will associate a record with the state of the stream after the 219 # record was consumed. 220 parent_stream.cursor.observe(parent_record) 221 parent_partition = ( 222 parent_record.associated_slice.partition 223 if parent_record.associated_slice 224 else {} 225 ) 226 record_data = parent_record.data 227 228 try: 229 partition_value = dpath.get( 230 record_data, # type: ignore [arg-type] 231 parent_field, 232 ) 233 except KeyError: 234 # FIXME a log here would go a long way for debugging 235 continue 236 237 # Add extra fields 238 extracted_extra_fields = self._extract_extra_fields( 239 record_data, extra_fields 240 ) 241 242 if parent_stream_config.lazy_read_pointer: 243 extracted_extra_fields = { 244 "child_response": self._extract_child_response( 245 record_data, 246 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 247 ), 248 **extracted_extra_fields, 249 } 250 251 if is_last_record_in_slice: 252 parent_stream.cursor.close_partition(partition) 253 if is_last_slice: 254 parent_stream.cursor.ensure_at_least_one_state_emitted() 255 256 yield StreamSlice( 257 partition={ 258 partition_field: partition_value, 259 "parent_slice": parent_partition or {}, 260 }, 261 cursor_slice={}, 262 extra_fields=extracted_extra_fields, 263 ) 264 265 yield from []
Iterate over each parent stream's record and create a StreamSlice for each record.
For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
The template string can interpolate the following values:
- parent_stream_slice: mapping representing the parent's stream slice
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
377 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 378 """ 379 Get the state of the parent streams. 380 381 Returns: 382 StreamState: The current state of the parent streams. 383 384 Example of state format: 385 { 386 "parent_stream_name1": { 387 "last_updated": "2023-05-27T00:00:00Z" 388 }, 389 "parent_stream_name2": { 390 "last_updated": "2023-05-27T00:00:00Z" 391 } 392 } 393 """ 394 parent_state = {} 395 for parent_config in self.parent_stream_configs: 396 if parent_config.incremental_dependency: 397 parent_state[parent_config.stream.name] = copy.deepcopy( 398 parent_config.stream.cursor.state 399 ) 400 return parent_state
Get the state of the parent streams.
Returns:
StreamState: The current state of the parent streams.
Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }
14@dataclass 15class PartitionRouter(StreamSlicer): 16 """ 17 Base class for partition routers. 18 Methods: 19 get_stream_state(): Get the state of the parent streams. 20 """ 21 22 @abstractmethod 23 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 24 """ 25 Get the state of the parent streams. 26 27 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 28 incrementally using the state. 29 30 Returns: 31 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 32 The returned format will be: 33 { 34 "parent_stream_name1": { 35 "last_updated": "2023-05-27T00:00:00Z" 36 }, 37 "parent_stream_name2": { 38 "last_updated": "2023-05-27T00:00:00Z" 39 } 40 } 41 """
Base class for partition routers.
Methods:
get_stream_state(): Get the state of the parent streams.
22 @abstractmethod 23 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 24 """ 25 Get the state of the parent streams. 26 27 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 28 incrementally using the state. 29 30 Returns: 31 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 32 The returned format will be: 33 { 34 "parent_stream_name1": { 35 "last_updated": "2023-05-27T00:00:00Z" 36 }, 37 "parent_stream_name2": { 38 "last_updated": "2023-05-27T00:00:00Z" 39 } 40 } 41 """
Get the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Returns:
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }