airbyte_cdk.sources.declarative.partition_routers
1# 2# Copyright (c) 2022 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( 6 AsyncJobPartitionRouter, 7) 8from airbyte_cdk.sources.declarative.partition_routers.cartesian_product_stream_slicer import ( 9 CartesianProductStreamSlicer, 10) 11from airbyte_cdk.sources.declarative.partition_routers.list_partition_router import ( 12 ListPartitionRouter, 13) 14from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter 15from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( 16 SinglePartitionRouter, 17) 18from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( 19 SubstreamPartitionRouter, 20) 21 22__all__ = [ 23 "AsyncJobPartitionRouter", 24 "CartesianProductStreamSlicer", 25 "ListPartitionRouter", 26 "SinglePartitionRouter", 27 "SubstreamPartitionRouter", 28 "PartitionRouter", 29]
20@dataclass 21class AsyncJobPartitionRouter(StreamSlicer): 22 """ 23 Partition router that creates async jobs in a source API, periodically polls for job 24 completion, and supplies the completed job URL locations as stream slices so that 25 records can be extracted. 26 """ 27 28 config: Config 29 parameters: InitVar[Mapping[str, Any]] 30 job_orchestrator_factory: Callable[[Iterable[StreamSlice]], AsyncJobOrchestrator] 31 stream_slicer: StreamSlicer = field( 32 default_factory=lambda: SinglePartitionRouter(parameters={}) 33 ) 34 35 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 36 self._job_orchestrator_factory = self.job_orchestrator_factory 37 self._job_orchestrator: Optional[AsyncJobOrchestrator] = None 38 self._parameters = parameters 39 40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 ) 50 51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
Partition router that creates async jobs in a source API, periodically polls for job completion, and supplies the completed job URL locations as stream slices so that records can be extracted.
40 def stream_slices(self) -> Iterable[StreamSlice]: 41 slices = self.stream_slicer.stream_slices() 42 self._job_orchestrator = self._job_orchestrator_factory(slices) 43 44 for completed_partition in self._job_orchestrator.create_and_get_completed_partitions(): 45 yield StreamSlice( 46 partition=dict(completed_partition.stream_slice.partition), 47 cursor_slice=completed_partition.stream_slice.cursor_slice, 48 extra_fields={"jobs": list(completed_partition.jobs)}, 49 )
Defines stream slices
Returns
An iterable of stream slices
51 def fetch_records(self, async_jobs: Iterable[AsyncJob]) -> Iterable[Mapping[str, Any]]: 52 """ 53 This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should 54 be responsible for. However, this was added in because the JobOrchestrator is required to 55 retrieve records. And without defining fetch_records() on this class, we're stuck with either 56 passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes. 57 """ 58 59 if not self._job_orchestrator: 60 raise AirbyteTracedException( 61 message="Invalid state within AsyncJobRetriever. Please contact Airbyte Support", 62 internal_message="AsyncPartitionRepository is expected to be accessed only after `stream_slices`", 63 failure_type=FailureType.system_error, 64 ) 65 66 return self._job_orchestrator.fetch_records(async_jobs=async_jobs)
This method of fetching records extends beyond what a PartitionRouter/StreamSlicer should be responsible for. However, this was added in because the JobOrchestrator is required to retrieve records. And without defining fetch_records() on this class, we're stuck with either passing the JobOrchestrator to the AsyncRetriever or storing it on multiple classes.
40@dataclass 41class CartesianProductStreamSlicer(PartitionRouter): 42 """ 43 Stream slicers that iterates over the cartesian product of input stream slicers 44 Given 2 stream slicers with the following slices: 45 A: [{"i": 0}, {"i": 1}, {"i": 2}] 46 B: [{"s": "hello"}, {"s": "world"}] 47 the resulting stream slices are 48 [ 49 {"i": 0, "s": "hello"}, 50 {"i": 0, "s": "world"}, 51 {"i": 1, "s": "hello"}, 52 {"i": 1, "s": "world"}, 53 {"i": 2, "s": "hello"}, 54 {"i": 2, "s": "world"}, 55 ] 56 57 Attributes: 58 stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer. 59 """ 60 61 stream_slicers: List[PartitionRouter] 62 parameters: InitVar[Mapping[str, Any]] 63 64 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 65 check_for_substream_in_slicers(self.stream_slicers, self.logger.warning) 66 67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 ) 86 87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 ) 106 107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 ) 126 127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 ) 146 147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 if len(cursor_slices) > 1: 154 raise ValueError( 155 f"There should only be a single cursor slice. Found {cursor_slices}" 156 ) 157 if cursor_slices: 158 cursor_slice = cursor_slices[0] 159 else: 160 cursor_slice = {} 161 yield StreamSlice(partition=partition, cursor_slice=cursor_slice) 162 163 def set_initial_state(self, stream_state: StreamState) -> None: 164 """ 165 Parent stream states are not supported for cartesian product stream slicer 166 """ 167 pass 168 169 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 170 """ 171 Parent stream states are not supported for cartesian product stream slicer 172 """ 173 pass 174 175 @property 176 def logger(self) -> logging.Logger: 177 return logging.getLogger("airbyte.CartesianProductStreamSlicer")
Stream slicers that iterates over the cartesian product of input stream slicers Given 2 stream slicers with the following slices: A: [{"i": 0}, {"i": 1}, {"i": 2}] B: [{"s": "hello"}, {"s": "world"}] the resulting stream slices are [ {"i": 0, "s": "hello"}, {"i": 0, "s": "world"}, {"i": 1, "s": "hello"}, {"i": 1, "s": "world"}, {"i": 2, "s": "hello"}, {"i": 2, "s": "world"}, ]
Attributes:
- stream_slicers (List[PartitionRouter]): Underlying stream slicers. The RequestOptions (e.g: Request headers, parameters, etc..) returned by this slicer are the combination of the RequestOptions of its input slicers. If there are conflicts e.g: two slicers define the same header or request param, the conflict is resolved by taking the value from the first slicer, where ordering is determined by the order in which slicers were input to this composite slicer.
67 def get_request_params( 68 self, 69 *, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 return dict( 75 ChainMap( 76 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 77 s.get_request_params( 78 stream_state=stream_state, 79 stream_slice=stream_slice, 80 next_page_token=next_page_token, 81 ) 82 for s in self.stream_slicers 83 ] 84 ) 85 )
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
87 def get_request_headers( 88 self, 89 *, 90 stream_state: Optional[StreamState] = None, 91 stream_slice: Optional[StreamSlice] = None, 92 next_page_token: Optional[Mapping[str, Any]] = None, 93 ) -> Mapping[str, Any]: 94 return dict( 95 ChainMap( 96 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 97 s.get_request_headers( 98 stream_state=stream_state, 99 stream_slice=stream_slice, 100 next_page_token=next_page_token, 101 ) 102 for s in self.stream_slicers 103 ] 104 ) 105 )
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
107 def get_request_body_data( 108 self, 109 *, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 return dict( 115 ChainMap( 116 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 117 s.get_request_body_data( 118 stream_state=stream_state, 119 stream_slice=stream_slice, 120 next_page_token=next_page_token, 121 ) 122 for s in self.stream_slicers 123 ] 124 ) 125 )
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
127 def get_request_body_json( 128 self, 129 *, 130 stream_state: Optional[StreamState] = None, 131 stream_slice: Optional[StreamSlice] = None, 132 next_page_token: Optional[Mapping[str, Any]] = None, 133 ) -> Mapping[str, Any]: 134 return dict( 135 ChainMap( 136 *[ # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 137 s.get_request_body_json( 138 stream_state=stream_state, 139 stream_slice=stream_slice, 140 next_page_token=next_page_token, 141 ) 142 for s in self.stream_slicers 143 ] 144 ) 145 )
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
147 def stream_slices(self) -> Iterable[StreamSlice]: 148 sub_slices = (s.stream_slices() for s in self.stream_slicers) 149 product = itertools.product(*sub_slices) 150 for stream_slice_tuple in product: 151 partition = dict(ChainMap(*[s.partition for s in stream_slice_tuple])) # type: ignore # ChainMap expects a MutableMapping[Never, Never] for reasons 152 cursor_slices = [s.cursor_slice for s in stream_slice_tuple if s.cursor_slice] 153 if len(cursor_slices) > 1: 154 raise ValueError( 155 f"There should only be a single cursor slice. Found {cursor_slices}" 156 ) 157 if cursor_slices: 158 cursor_slice = cursor_slices[0] 159 else: 160 cursor_slice = {} 161 yield StreamSlice(partition=partition, cursor_slice=cursor_slice)
Defines stream slices
Returns
An iterable of stream slices
163 def set_initial_state(self, stream_state: StreamState) -> None: 164 """ 165 Parent stream states are not supported for cartesian product stream slicer 166 """ 167 pass
Parent stream states are not supported for cartesian product stream slicer
169 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 170 """ 171 Parent stream states are not supported for cartesian product stream slicer 172 """ 173 pass
Parent stream states are not supported for cartesian product stream slicer
18@dataclass 19class ListPartitionRouter(PartitionRouter): 20 """ 21 Partition router that iterates over the values of a list 22 If values is a string, then evaluate it as literal and assert the resulting literal is a list 23 24 Attributes: 25 values (Union[str, List[str]]): The values to iterate over 26 cursor_field (Union[InterpolatedString, str]): The name of the cursor field 27 config (Config): The user-provided configuration as specified by the source's spec 28 request_option (Optional[RequestOption]): The request option to configure the HTTP request 29 """ 30 31 values: Union[str, List[str]] 32 cursor_field: Union[InterpolatedString, str] 33 config: Config 34 parameters: InitVar[Mapping[str, Any]] 35 request_option: Optional[RequestOption] = None 36 37 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 38 if isinstance(self.values, str): 39 self.values = InterpolatedString.create(self.values, parameters=parameters).eval( 40 self.config 41 ) 42 self._cursor_field = ( 43 InterpolatedString(string=self.cursor_field, parameters=parameters) 44 if isinstance(self.cursor_field, str) 45 else self.cursor_field 46 ) 47 48 self._cursor = None 49 50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 58 59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice) 67 68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice) 76 77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice) 85 86 def stream_slices(self) -> Iterable[StreamSlice]: 87 return [ 88 StreamSlice( 89 partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={} 90 ) 91 for slice_value in self.values 92 ] 93 94 def _get_request_option( 95 self, request_option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 96 ) -> Mapping[str, Any]: 97 if ( 98 self.request_option 99 and self.request_option.inject_into == request_option_type 100 and stream_slice 101 ): 102 slice_value = stream_slice.get(self._cursor_field.eval(self.config)) 103 if slice_value: 104 options: MutableMapping[str, Any] = {} 105 self.request_option.inject_into_request(options, slice_value, self.config) 106 return options 107 else: 108 return {} 109 else: 110 return {} 111 112 def set_initial_state(self, stream_state: StreamState) -> None: 113 """ 114 ListPartitionRouter doesn't have parent streams 115 """ 116 pass 117 118 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 119 """ 120 ListPartitionRouter doesn't have parent streams 121 """ 122 pass
Partition router that iterates over the values of a list If values is a string, then evaluate it as literal and assert the resulting literal is a list
Attributes:
- values (Union[str, List[str]]): The values to iterate over
- cursor_field (Union[InterpolatedString, str]): The name of the cursor field
- config (Config): The user-provided configuration as specified by the source's spec
- request_option (Optional[RequestOption]): The request option to configure the HTTP request
50 def get_request_params( 51 self, 52 stream_state: Optional[StreamState] = None, 53 stream_slice: Optional[StreamSlice] = None, 54 next_page_token: Optional[Mapping[str, Any]] = None, 55 ) -> Mapping[str, Any]: 56 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 57 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
59 def get_request_headers( 60 self, 61 stream_state: Optional[StreamState] = None, 62 stream_slice: Optional[StreamSlice] = None, 63 next_page_token: Optional[Mapping[str, Any]] = None, 64 ) -> Mapping[str, Any]: 65 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 66 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
68 def get_request_body_data( 69 self, 70 stream_state: Optional[StreamState] = None, 71 stream_slice: Optional[StreamSlice] = None, 72 next_page_token: Optional[Mapping[str, Any]] = None, 73 ) -> Mapping[str, Any]: 74 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 75 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
77 def get_request_body_json( 78 self, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 next_page_token: Optional[Mapping[str, Any]] = None, 82 ) -> Mapping[str, Any]: 83 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 84 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
86 def stream_slices(self) -> Iterable[StreamSlice]: 87 return [ 88 StreamSlice( 89 partition={self._cursor_field.eval(self.config): slice_value}, cursor_slice={} 90 ) 91 for slice_value in self.values 92 ]
Defines stream slices
Returns
An iterable of stream slices
13@dataclass 14class SinglePartitionRouter(PartitionRouter): 15 """Partition router returning only a stream slice""" 16 17 parameters: InitVar[Mapping[str, Any]] 18 19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {} 26 27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {} 34 35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {} 42 43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {} 50 51 def stream_slices(self) -> Iterable[StreamSlice]: 52 yield StreamSlice(partition={}, cursor_slice={}) 53 54 def set_initial_state(self, stream_state: StreamState) -> None: 55 """ 56 SinglePartitionRouter doesn't have parent streams 57 """ 58 pass 59 60 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 61 """ 62 SinglePartitionRouter doesn't have parent streams 63 """ 64 pass
Partition router returning only a stream slice
19 def get_request_params( 20 self, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 next_page_token: Optional[Mapping[str, Any]] = None, 24 ) -> Mapping[str, Any]: 25 return {}
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
27 def get_request_headers( 28 self, 29 stream_state: Optional[StreamState] = None, 30 stream_slice: Optional[StreamSlice] = None, 31 next_page_token: Optional[Mapping[str, Any]] = None, 32 ) -> Mapping[str, Any]: 33 return {}
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
35 def get_request_body_data( 36 self, 37 stream_state: Optional[StreamState] = None, 38 stream_slice: Optional[StreamSlice] = None, 39 next_page_token: Optional[Mapping[str, Any]] = None, 40 ) -> Mapping[str, Any]: 41 return {}
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
43 def get_request_body_json( 44 self, 45 stream_state: Optional[StreamState] = None, 46 stream_slice: Optional[StreamSlice] = None, 47 next_page_token: Optional[Mapping[str, Any]] = None, 48 ) -> Mapping[str, Any]: 49 return {}
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
51 def stream_slices(self) -> Iterable[StreamSlice]: 52 yield StreamSlice(partition={}, cursor_slice={})
Defines stream slices
Returns
An iterable of stream slices
80@dataclass 81class SubstreamPartitionRouter(PartitionRouter): 82 """ 83 Partition router that iterates over the parent's stream records and emits slices 84 Will populate the state with `partition_field` and `parent_slice` so they can be accessed by other components 85 86 Attributes: 87 parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config 88 """ 89 90 parent_stream_configs: List[ParentStreamConfig] 91 config: Config 92 parameters: InitVar[Mapping[str, Any]] 93 94 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 95 if not self.parent_stream_configs: 96 raise ValueError("SubstreamPartitionRouter needs at least 1 parent stream") 97 self._parameters = parameters 98 99 def get_request_params( 100 self, 101 stream_state: Optional[StreamState] = None, 102 stream_slice: Optional[StreamSlice] = None, 103 next_page_token: Optional[Mapping[str, Any]] = None, 104 ) -> Mapping[str, Any]: 105 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 106 return self._get_request_option(RequestOptionType.request_parameter, stream_slice) 107 108 def get_request_headers( 109 self, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 115 return self._get_request_option(RequestOptionType.header, stream_slice) 116 117 def get_request_body_data( 118 self, 119 stream_state: Optional[StreamState] = None, 120 stream_slice: Optional[StreamSlice] = None, 121 next_page_token: Optional[Mapping[str, Any]] = None, 122 ) -> Mapping[str, Any]: 123 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 124 return self._get_request_option(RequestOptionType.body_data, stream_slice) 125 126 def get_request_body_json( 127 self, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 133 return self._get_request_option(RequestOptionType.body_json, stream_slice) 134 135 def _get_request_option( 136 self, option_type: RequestOptionType, stream_slice: Optional[StreamSlice] 137 ) -> Mapping[str, Any]: 138 params: MutableMapping[str, Any] = {} 139 if stream_slice: 140 for parent_config in self.parent_stream_configs: 141 if ( 142 parent_config.request_option 143 and parent_config.request_option.inject_into == option_type 144 ): 145 key = parent_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 146 value = stream_slice.get(key) 147 if value: 148 parent_config.request_option.inject_into_request(params, value, self.config) 149 return params 150 151 def stream_slices(self) -> Iterable[StreamSlice]: 152 """ 153 Iterate over each parent stream's record and create a StreamSlice for each record. 154 155 For each stream, iterate over its stream_slices. 156 For each stream slice, iterate over each record. 157 yield a stream slice for each such records. 158 159 If a parent slice contains no record, emit a slice with parent_record=None. 160 161 The template string can interpolate the following values: 162 - parent_stream_slice: mapping representing the parent's stream slice 163 - parent_record: mapping representing the parent record 164 - parent_stream_name: string representing the parent stream name 165 """ 166 if not self.parent_stream_configs: 167 yield from [] 168 else: 169 for parent_stream_config in self.parent_stream_configs: 170 parent_stream = parent_stream_config.stream 171 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 172 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 173 extra_fields = None 174 if parent_stream_config.extra_fields: 175 extra_fields = [ 176 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 177 for field_path in parent_stream_config.extra_fields 178 ] 179 180 # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does 181 # not support either substreams or RFR, but something that needs to be considered once we do 182 for parent_record in parent_stream.read_only_records(): 183 parent_partition = None 184 # Skip non-records (eg AirbyteLogMessage) 185 if isinstance(parent_record, AirbyteMessage): 186 self.logger.warning( 187 f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state." 188 ) 189 if parent_record.type == MessageType.RECORD: 190 parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record 191 else: 192 continue 193 elif isinstance(parent_record, Record): 194 parent_partition = ( 195 parent_record.associated_slice.partition 196 if parent_record.associated_slice 197 else {} 198 ) 199 parent_record = parent_record.data 200 elif not isinstance(parent_record, Mapping): 201 # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid 202 raise AirbyteTracedException( 203 message=f"Parent stream returned records as invalid type {type(parent_record)}" 204 ) 205 try: 206 partition_value = dpath.get( 207 parent_record, # type: ignore [arg-type] 208 parent_field, 209 ) 210 except KeyError: 211 continue 212 213 # Add extra fields 214 extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields) 215 216 if parent_stream_config.lazy_read_pointer: 217 extracted_extra_fields = { 218 "child_response": self._extract_child_response( 219 parent_record, 220 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 221 ), 222 **extracted_extra_fields, 223 } 224 225 yield StreamSlice( 226 partition={ 227 partition_field: partition_value, 228 "parent_slice": parent_partition or {}, 229 }, 230 cursor_slice={}, 231 extra_fields=extracted_extra_fields, 232 ) 233 234 def _extract_child_response( 235 self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString] 236 ) -> requests.Response: 237 """Extract child records from a parent record based on lazy pointers.""" 238 239 def _create_response(data: MutableMapping[str, Any]) -> SafeResponse: 240 """Create a SafeResponse with the given data.""" 241 response = SafeResponse() 242 response.content = json.dumps(data).encode("utf-8") 243 response.status_code = 200 244 return response 245 246 path = [path.eval(self.config) for path in pointer] 247 return _create_response(dpath.get(parent_record, path, default=[])) # type: ignore # argunet will be a MutableMapping, given input data structure 248 249 def _extract_extra_fields( 250 self, 251 parent_record: Mapping[str, Any] | AirbyteMessage, 252 extra_fields: Optional[List[List[str]]] = None, 253 ) -> Mapping[str, Any]: 254 """ 255 Extracts additional fields specified by their paths from the parent record. 256 257 Args: 258 parent_record (Mapping[str, Any]): The record from the parent stream to extract fields from. 259 extra_fields (Optional[List[List[str]]]): A list of field paths (as lists of strings) to extract from the parent record. 260 261 Returns: 262 Mapping[str, Any]: A dictionary containing the extracted fields. 263 The keys are the joined field paths, and the values are the corresponding extracted values. 264 """ 265 extracted_extra_fields = {} 266 if extra_fields: 267 for extra_field_path in extra_fields: 268 try: 269 extra_field_value = dpath.get( 270 parent_record, # type: ignore [arg-type] 271 extra_field_path, 272 ) 273 self.logger.debug( 274 f"Extracted extra_field_path: {extra_field_path} with value: {extra_field_value}" 275 ) 276 except KeyError: 277 self.logger.debug(f"Failed to extract extra_field_path: {extra_field_path}") 278 extra_field_value = None 279 extracted_extra_fields[".".join(extra_field_path)] = extra_field_value 280 return extracted_extra_fields 281 282 def set_initial_state(self, stream_state: StreamState) -> None: 283 """ 284 Set the state of the parent streams. 285 286 If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format. 287 This migration applies only to parent streams with incremental dependencies. 288 289 Args: 290 stream_state (StreamState): The state of the streams to be set. 291 292 Example of state format: 293 { 294 "parent_state": { 295 "parent_stream_name1": { 296 "last_updated": "2023-05-27T00:00:00Z" 297 }, 298 "parent_stream_name2": { 299 "last_updated": "2023-05-27T00:00:00Z" 300 } 301 } 302 } 303 304 Example of migrating to parent state format: 305 - Initial state: 306 { 307 "updated_at": "2023-05-27T00:00:00Z" 308 } 309 - After migration: 310 { 311 "updated_at": "2023-05-27T00:00:00Z", 312 "parent_state": { 313 "parent_stream_name": { 314 "parent_stream_cursor": "2023-05-27T00:00:00Z" 315 } 316 } 317 } 318 """ 319 if not stream_state: 320 return 321 322 parent_state = stream_state.get("parent_state", {}) 323 324 # Set state for each parent stream with an incremental dependency 325 for parent_config in self.parent_stream_configs: 326 if ( 327 not parent_state.get(parent_config.stream.name, {}) 328 and parent_config.incremental_dependency 329 ): 330 # Migrate child state to parent state format 331 parent_state = self._migrate_child_state_to_parent_state(stream_state) 332 333 if parent_config.incremental_dependency: 334 parent_config.stream.state = parent_state.get(parent_config.stream.name, {}) 335 336 def _migrate_child_state_to_parent_state(self, stream_state: StreamState) -> StreamState: 337 """ 338 Migrate the child or global stream state into the parent stream's state format. 339 340 This method converts the child stream state—or, if present, the global state—into a format that is 341 compatible with parent streams that use incremental synchronization. The migration occurs only for 342 parent streams with incremental dependencies. It filters out per-partition states and retains only the 343 global state in the form {cursor_field: cursor_value}. 344 345 The method supports multiple input formats: 346 - A simple global state, e.g.: 347 {"updated_at": "2023-05-27T00:00:00Z"} 348 - A state object that contains a "state" key (which is assumed to hold the global state), e.g.: 349 {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 350 In this case, the migration uses the first value from the "state" dictionary. 351 - Any per-partition state formats or other non-simple structures are ignored during migration. 352 353 Args: 354 stream_state (StreamState): The state to migrate. Expected formats include: 355 - {"updated_at": "2023-05-27T00:00:00Z"} 356 - {"state": {"updated_at": "2023-05-27T00:00:00Z"}, ...} 357 (In this format, only the first global state value is used, and per-partition states are ignored.) 358 359 Returns: 360 StreamState: A migrated state for parent streams in the format: 361 { 362 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 363 } 364 where each parent stream with an incremental dependency is assigned its corresponding cursor value. 365 366 Example: 367 Input: {"updated_at": "2023-05-27T00:00:00Z"} 368 Output: { 369 "parent_stream_name": {"parent_stream_cursor": "2023-05-27T00:00:00Z"} 370 } 371 """ 372 substream_state_values = list(stream_state.values()) 373 substream_state = substream_state_values[0] if substream_state_values else {} 374 375 # Ignore per-partition states or invalid formats. 376 if isinstance(substream_state, (list, dict)) or len(substream_state_values) != 1: 377 # If a global state is present under the key "state", use its first value. 378 if "state" in stream_state and isinstance(stream_state["state"], dict): 379 substream_state = list(stream_state["state"].values())[0] 380 else: 381 return {} 382 383 # Build the parent state for all parent streams with incremental dependencies. 384 parent_state = {} 385 if substream_state: 386 for parent_config in self.parent_stream_configs: 387 if parent_config.incremental_dependency: 388 parent_state[parent_config.stream.name] = { 389 parent_config.stream.cursor_field: substream_state 390 } 391 392 return parent_state 393 394 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 395 """ 396 Get the state of the parent streams. 397 398 Returns: 399 StreamState: The current state of the parent streams. 400 401 Example of state format: 402 { 403 "parent_stream_name1": { 404 "last_updated": "2023-05-27T00:00:00Z" 405 }, 406 "parent_stream_name2": { 407 "last_updated": "2023-05-27T00:00:00Z" 408 } 409 } 410 """ 411 parent_state = {} 412 for parent_config in self.parent_stream_configs: 413 if parent_config.incremental_dependency: 414 parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state) 415 return parent_state 416 417 @property 418 def logger(self) -> logging.Logger: 419 return logging.getLogger("airbyte.SubstreamPartitionRouter")
Partition router that iterates over the parent's stream records and emits slices
Will populate the state with partition_field
and parent_slice
so they can be accessed by other components
Attributes:
- parent_stream_configs (List[ParentStreamConfig]): parent streams to iterate over and their config
99 def get_request_params( 100 self, 101 stream_state: Optional[StreamState] = None, 102 stream_slice: Optional[StreamSlice] = None, 103 next_page_token: Optional[Mapping[str, Any]] = None, 104 ) -> Mapping[str, Any]: 105 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 106 return self._get_request_option(RequestOptionType.request_parameter, stream_slice)
Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
E.g: you might want to define query parameters for paging if next_page_token is not None.
108 def get_request_headers( 109 self, 110 stream_state: Optional[StreamState] = None, 111 stream_slice: Optional[StreamSlice] = None, 112 next_page_token: Optional[Mapping[str, Any]] = None, 113 ) -> Mapping[str, Any]: 114 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 115 return self._get_request_option(RequestOptionType.header, stream_slice)
Return any non-auth headers. Authentication headers will overwrite any overlapping headers returned from this method.
117 def get_request_body_data( 118 self, 119 stream_state: Optional[StreamState] = None, 120 stream_slice: Optional[StreamSlice] = None, 121 next_page_token: Optional[Mapping[str, Any]] = None, 122 ) -> Mapping[str, Any]: 123 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 124 return self._get_request_option(RequestOptionType.body_data, stream_slice)
Specifies how to populate the body of the request with a non-JSON payload.
If returns a ready text that it will be sent as is. If returns a dict that it will be converted to a urlencoded form. E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
126 def get_request_body_json( 127 self, 128 stream_state: Optional[StreamState] = None, 129 stream_slice: Optional[StreamSlice] = None, 130 next_page_token: Optional[Mapping[str, Any]] = None, 131 ) -> Mapping[str, Any]: 132 # Pass the stream_slice from the argument, not the cursor because the cursor is updated after processing the response 133 return self._get_request_option(RequestOptionType.body_json, stream_slice)
Specifies how to populate the body of the request with a JSON payload.
At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
151 def stream_slices(self) -> Iterable[StreamSlice]: 152 """ 153 Iterate over each parent stream's record and create a StreamSlice for each record. 154 155 For each stream, iterate over its stream_slices. 156 For each stream slice, iterate over each record. 157 yield a stream slice for each such records. 158 159 If a parent slice contains no record, emit a slice with parent_record=None. 160 161 The template string can interpolate the following values: 162 - parent_stream_slice: mapping representing the parent's stream slice 163 - parent_record: mapping representing the parent record 164 - parent_stream_name: string representing the parent stream name 165 """ 166 if not self.parent_stream_configs: 167 yield from [] 168 else: 169 for parent_stream_config in self.parent_stream_configs: 170 parent_stream = parent_stream_config.stream 171 parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string 172 partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string 173 extra_fields = None 174 if parent_stream_config.extra_fields: 175 extra_fields = [ 176 [field_path_part.eval(self.config) for field_path_part in field_path] # type: ignore [union-attr] 177 for field_path in parent_stream_config.extra_fields 178 ] 179 180 # read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does 181 # not support either substreams or RFR, but something that needs to be considered once we do 182 for parent_record in parent_stream.read_only_records(): 183 parent_partition = None 184 # Skip non-records (eg AirbyteLogMessage) 185 if isinstance(parent_record, AirbyteMessage): 186 self.logger.warning( 187 f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state." 188 ) 189 if parent_record.type == MessageType.RECORD: 190 parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record 191 else: 192 continue 193 elif isinstance(parent_record, Record): 194 parent_partition = ( 195 parent_record.associated_slice.partition 196 if parent_record.associated_slice 197 else {} 198 ) 199 parent_record = parent_record.data 200 elif not isinstance(parent_record, Mapping): 201 # The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid 202 raise AirbyteTracedException( 203 message=f"Parent stream returned records as invalid type {type(parent_record)}" 204 ) 205 try: 206 partition_value = dpath.get( 207 parent_record, # type: ignore [arg-type] 208 parent_field, 209 ) 210 except KeyError: 211 continue 212 213 # Add extra fields 214 extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields) 215 216 if parent_stream_config.lazy_read_pointer: 217 extracted_extra_fields = { 218 "child_response": self._extract_child_response( 219 parent_record, 220 parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config 221 ), 222 **extracted_extra_fields, 223 } 224 225 yield StreamSlice( 226 partition={ 227 partition_field: partition_value, 228 "parent_slice": parent_partition or {}, 229 }, 230 cursor_slice={}, 231 extra_fields=extracted_extra_fields, 232 )
Iterate over each parent stream's record and create a StreamSlice for each record.
For each stream, iterate over its stream_slices. For each stream slice, iterate over each record. yield a stream slice for each such records.
If a parent slice contains no record, emit a slice with parent_record=None.
The template string can interpolate the following values:
- parent_stream_slice: mapping representing the parent's stream slice
- parent_record: mapping representing the parent record
- parent_stream_name: string representing the parent stream name
282 def set_initial_state(self, stream_state: StreamState) -> None: 283 """ 284 Set the state of the parent streams. 285 286 If the `parent_state` key is missing from `stream_state`, migrate the child stream state to the parent stream's state format. 287 This migration applies only to parent streams with incremental dependencies. 288 289 Args: 290 stream_state (StreamState): The state of the streams to be set. 291 292 Example of state format: 293 { 294 "parent_state": { 295 "parent_stream_name1": { 296 "last_updated": "2023-05-27T00:00:00Z" 297 }, 298 "parent_stream_name2": { 299 "last_updated": "2023-05-27T00:00:00Z" 300 } 301 } 302 } 303 304 Example of migrating to parent state format: 305 - Initial state: 306 { 307 "updated_at": "2023-05-27T00:00:00Z" 308 } 309 - After migration: 310 { 311 "updated_at": "2023-05-27T00:00:00Z", 312 "parent_state": { 313 "parent_stream_name": { 314 "parent_stream_cursor": "2023-05-27T00:00:00Z" 315 } 316 } 317 } 318 """ 319 if not stream_state: 320 return 321 322 parent_state = stream_state.get("parent_state", {}) 323 324 # Set state for each parent stream with an incremental dependency 325 for parent_config in self.parent_stream_configs: 326 if ( 327 not parent_state.get(parent_config.stream.name, {}) 328 and parent_config.incremental_dependency 329 ): 330 # Migrate child state to parent state format 331 parent_state = self._migrate_child_state_to_parent_state(stream_state) 332 333 if parent_config.incremental_dependency: 334 parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
Set the state of the parent streams.
If the parent_state
key is missing from stream_state
, migrate the child stream state to the parent stream's state format.
This migration applies only to parent streams with incremental dependencies.
Arguments:
- stream_state (StreamState): The state of the streams to be set.
Example of state format: { "parent_state": { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } } }
Example of migrating to parent state format:
- Initial state: { "updated_at": "2023-05-27T00:00:00Z" }
- After migration: { "updated_at": "2023-05-27T00:00:00Z", "parent_state": { "parent_stream_name": { "parent_stream_cursor": "2023-05-27T00:00:00Z" } } }
394 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 395 """ 396 Get the state of the parent streams. 397 398 Returns: 399 StreamState: The current state of the parent streams. 400 401 Example of state format: 402 { 403 "parent_stream_name1": { 404 "last_updated": "2023-05-27T00:00:00Z" 405 }, 406 "parent_stream_name2": { 407 "last_updated": "2023-05-27T00:00:00Z" 408 } 409 } 410 """ 411 parent_state = {} 412 for parent_config in self.parent_stream_configs: 413 if parent_config.incremental_dependency: 414 parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state) 415 return parent_state
Get the state of the parent streams.
Returns:
StreamState: The current state of the parent streams.
Example of state format: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }
14@dataclass 15class PartitionRouter(StreamSlicer): 16 """ 17 Base class for partition routers. 18 Methods: 19 set_parent_state(stream_state): Set the state of the parent streams. 20 get_parent_state(): Get the state of the parent streams. 21 """ 22 23 @abstractmethod 24 def set_initial_state(self, stream_state: StreamState) -> None: 25 """ 26 Set the state of the parent streams. 27 28 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 29 incrementally using the state. 30 31 Args: 32 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 33 'parent_state' which is a dictionary of parent state names to their corresponding state. 34 Example: 35 { 36 "parent_state": { 37 "parent_stream_name_1": { ... }, 38 "parent_stream_name_2": { ... }, 39 ... 40 } 41 } 42 """ 43 44 @abstractmethod 45 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 46 """ 47 Get the state of the parent streams. 48 49 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 50 incrementally using the state. 51 52 Returns: 53 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 54 The returned format will be: 55 { 56 "parent_stream_name1": { 57 "last_updated": "2023-05-27T00:00:00Z" 58 }, 59 "parent_stream_name2": { 60 "last_updated": "2023-05-27T00:00:00Z" 61 } 62 } 63 """
Base class for partition routers.
Methods:
set_parent_state(stream_state): Set the state of the parent streams. get_parent_state(): Get the state of the parent streams.
23 @abstractmethod 24 def set_initial_state(self, stream_state: StreamState) -> None: 25 """ 26 Set the state of the parent streams. 27 28 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 29 incrementally using the state. 30 31 Args: 32 stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 33 'parent_state' which is a dictionary of parent state names to their corresponding state. 34 Example: 35 { 36 "parent_state": { 37 "parent_stream_name_1": { ... }, 38 "parent_stream_name_2": { ... }, 39 ... 40 } 41 } 42 """
Set the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Arguments:
- stream_state (StreamState): The state of the streams to be set. The expected format is a dictionary that includes 'parent_state' which is a dictionary of parent state names to their corresponding state. Example: { "parent_state": { "parent_stream_name_1": { ... }, "parent_stream_name_2": { ... }, ... } }
44 @abstractmethod 45 def get_stream_state(self) -> Optional[Mapping[str, StreamState]]: 46 """ 47 Get the state of the parent streams. 48 49 This method should only be implemented if the slicer is based on some parent stream and needs to read this stream 50 incrementally using the state. 51 52 Returns: 53 Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. 54 The returned format will be: 55 { 56 "parent_stream_name1": { 57 "last_updated": "2023-05-27T00:00:00Z" 58 }, 59 "parent_stream_name2": { 60 "last_updated": "2023-05-27T00:00:00Z" 61 } 62 } 63 """
Get the state of the parent streams.
This method should only be implemented if the slicer is based on some parent stream and needs to read this stream incrementally using the state.
Returns:
Optional[Mapping[str, StreamState]]: The current state of the parent streams in a dictionary format. The returned format will be: { "parent_stream_name1": { "last_updated": "2023-05-27T00:00:00Z" }, "parent_stream_name2": { "last_updated": "2023-05-27T00:00:00Z" } }