airbyte_cdk.sources.declarative.retrievers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( 8 LazySimpleRetriever, 9 SimpleRetriever, 10) 11 12__all__ = [ 13 "Retriever", 14 "SimpleRetriever", 15 "AsyncRetriever", 16 "LazySimpleRetriever", 17]
15class Retriever: 16 """ 17 Responsible for fetching a stream's records from an HTTP API source. 18 """ 19 20 @abstractmethod 21 def read_records( 22 self, 23 records_schema: Mapping[str, Any], 24 stream_slice: Optional[StreamSlice] = None, 25 ) -> Iterable[StreamData]: 26 """ 27 Fetch a stream's records from an HTTP API source 28 29 :param records_schema: json schema to describe record 30 :param stream_slice: The stream slice to read data for 31 :return: The records read from the API source 32 """ 33 34 @deprecated("Stream slicing is being moved to the stream level.") 35 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 36 """Does nothing as this method is deprecated, so underlying Retriever implementations 37 do not need to implement this. 38 """ 39 yield from [] 40 41 @property 42 @deprecated("State management is being moved to the stream level.") 43 def state(self) -> StreamState: 44 """ 45 Does nothing as this method is deprecated, so underlying Retriever implementations 46 do not need to implement this. 47 """ 48 return {} 49 50 @state.setter 51 @deprecated("State management is being moved to the stream level.") 52 def state(self, value: StreamState) -> None: 53 """ 54 Does nothing as this method is deprecated, so underlying Retriever implementations 55 do not need to implement this. 56 """ 57 pass
Responsible for fetching a stream's records from an HTTP API source.
20 @abstractmethod 21 def read_records( 22 self, 23 records_schema: Mapping[str, Any], 24 stream_slice: Optional[StreamSlice] = None, 25 ) -> Iterable[StreamData]: 26 """ 27 Fetch a stream's records from an HTTP API source 28 29 :param records_schema: json schema to describe record 30 :param stream_slice: The stream slice to read data for 31 :return: The records read from the API source 32 """
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
34 @deprecated("Stream slicing is being moved to the stream level.") 35 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 36 """Does nothing as this method is deprecated, so underlying Retriever implementations 37 do not need to implement this. 38 """ 39 yield from []
Does nothing as this method is deprecated, so underlying Retriever implementations do not need to implement this.
41 @property 42 @deprecated("State management is being moved to the stream level.") 43 def state(self) -> StreamState: 44 """ 45 Does nothing as this method is deprecated, so underlying Retriever implementations 46 do not need to implement this. 47 """ 48 return {}
Does nothing as this method is deprecated, so underlying Retriever implementations do not need to implement this.
55@dataclass 56class SimpleRetriever(Retriever): 57 """ 58 Retrieves records by synchronously sending requests to fetch records. 59 60 The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. 61 62 For each stream slice, submit requests until there are no more pages of records to fetch. 63 64 This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. 65 As a result, some of the parameters passed to some methods are unused. 66 The two will be decoupled in a future release. 67 68 Attributes: 69 stream_name (str): The stream's name 70 stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key 71 requester (Requester): The HTTP requester 72 record_selector (HttpSelector): The record selector 73 paginator (Optional[Paginator]): The paginator 74 stream_slicer (Optional[StreamSlicer]): The stream slicer 75 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 76 """ 77 78 requester: Requester 79 record_selector: HttpSelector 80 config: Config 81 parameters: InitVar[Mapping[str, Any]] 82 name: str 83 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 84 primary_key: Optional[Union[str, List[str], List[List[str]]]] 85 _primary_key: str = field(init=False, repr=False, default="") 86 paginator: Optional[Paginator] = None 87 stream_slicer: StreamSlicer = field( 88 default_factory=lambda: SinglePartitionRouter(parameters={}) 89 ) 90 request_option_provider: RequestOptionsProvider = field( 91 default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) 92 ) 93 ignore_stream_slicer_parameters_on_paginated_requests: bool = False 94 additional_query_properties: Optional[QueryProperties] = None 95 log_formatter: Optional[Callable[[requests.Response], Any]] = None 96 pagination_tracker_factory: Callable[[], PaginationTracker] = field( 97 default_factory=lambda: lambda: PaginationTracker() 98 ) 99 100 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 101 self._paginator = self.paginator or NoPagination(parameters=parameters) 102 self._parameters = parameters 103 self._name = ( 104 InterpolatedString(self._name, parameters=parameters) 105 if isinstance(self._name, str) 106 else self._name 107 ) 108 109 @property # type: ignore 110 def name(self) -> str: 111 """ 112 :return: Stream name 113 """ 114 return ( 115 str(self._name.eval(self.config)) 116 if isinstance(self._name, InterpolatedString) 117 else self._name 118 ) 119 120 @name.setter 121 def name(self, value: str) -> None: 122 if not isinstance(value, property): 123 self._name = value 124 125 def _get_mapping( 126 self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any 127 ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]: 128 """ 129 Get mapping from the provided method, and get the keys of the mapping. 130 If the method returns a string, it will return the string and an empty set. 131 If the method returns a dict, it will return the dict and its keys. 132 """ 133 mapping = method(**kwargs) or {} 134 keys = set(mapping.keys()) if not isinstance(mapping, str) else set() 135 return mapping, keys 136 137 def _get_request_options( 138 self, 139 stream_slice: Optional[StreamSlice], 140 next_page_token: Optional[Mapping[str, Any]], 141 paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 142 stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 143 ) -> Union[Mapping[str, Any], str]: 144 """ 145 Get the request_option from the paginator and the stream slicer. 146 Raise a ValueError if there's a key collision 147 Returned merged mapping otherwise 148 """ 149 is_body_json = paginator_method.__name__ == "get_request_body_json" 150 151 mappings = [ 152 paginator_method( 153 stream_slice=stream_slice, 154 next_page_token=next_page_token, 155 ), 156 ] 157 if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: 158 mappings.append( 159 stream_slicer_method( 160 stream_slice=stream_slice, 161 next_page_token=next_page_token, 162 ) 163 ) 164 return combine_mappings(mappings, allow_same_value_merge=is_body_json) 165 166 def _request_headers( 167 self, 168 stream_slice: Optional[StreamSlice] = None, 169 next_page_token: Optional[Mapping[str, Any]] = None, 170 ) -> Mapping[str, Any]: 171 """ 172 Specifies request headers. 173 Authentication headers will overwrite any overlapping headers returned from this method. 174 """ 175 headers = self._get_request_options( 176 stream_slice, 177 next_page_token, 178 self._paginator.get_request_headers, 179 self.request_option_provider.get_request_headers, 180 ) 181 if isinstance(headers, str): 182 raise ValueError("Request headers cannot be a string") 183 return {str(k): str(v) for k, v in headers.items()} 184 185 def _request_params( 186 self, 187 stream_slice: Optional[StreamSlice] = None, 188 next_page_token: Optional[Mapping[str, Any]] = None, 189 ) -> Mapping[str, Any]: 190 """ 191 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 192 193 E.g: you might want to define query parameters for paging if next_page_token is not None. 194 """ 195 params = self._get_request_options( 196 stream_slice, 197 next_page_token, 198 self._paginator.get_request_params, 199 self.request_option_provider.get_request_params, 200 ) 201 if isinstance(params, str): 202 raise ValueError("Request params cannot be a string") 203 return params 204 205 def _request_body_data( 206 self, 207 stream_slice: Optional[StreamSlice] = None, 208 next_page_token: Optional[Mapping[str, Any]] = None, 209 ) -> Union[Mapping[str, Any], str]: 210 """ 211 Specifies how to populate the body of the request with a non-JSON payload. 212 213 If returns a ready text that it will be sent as is. 214 If returns a dict that it will be converted to a urlencoded form. 215 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 216 217 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 218 """ 219 return self._get_request_options( 220 stream_slice, 221 next_page_token, 222 self._paginator.get_request_body_data, 223 self.request_option_provider.get_request_body_data, 224 ) 225 226 def _request_body_json( 227 self, 228 stream_slice: Optional[StreamSlice] = None, 229 next_page_token: Optional[Mapping[str, Any]] = None, 230 ) -> Optional[Mapping[str, Any]]: 231 """ 232 Specifies how to populate the body of the request with a JSON payload. 233 234 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 235 """ 236 body_json = self._get_request_options( 237 stream_slice, 238 next_page_token, 239 self._paginator.get_request_body_json, 240 self.request_option_provider.get_request_body_json, 241 ) 242 if isinstance(body_json, str): 243 raise ValueError("Request body json cannot be a string") 244 return body_json 245 246 def _paginator_path( 247 self, 248 next_page_token: Optional[Mapping[str, Any]] = None, 249 stream_slice: Optional[StreamSlice] = None, 250 ) -> Optional[str]: 251 """ 252 If the paginator points to a path, follow it, else return nothing so the requester is used. 253 :param next_page_token: 254 :return: 255 """ 256 return self._paginator.path( 257 next_page_token=next_page_token, 258 stream_state={}, # stream_state as an interpolation context is deprecated 259 stream_slice=stream_slice, 260 ) 261 262 def _parse_response( 263 self, 264 response: Optional[requests.Response], 265 records_schema: Mapping[str, Any], 266 stream_slice: Optional[StreamSlice] = None, 267 next_page_token: Optional[Mapping[str, Any]] = None, 268 ) -> Iterable[Record]: 269 if not response: 270 yield from [] 271 else: 272 yield from self.record_selector.select_records( 273 response=response, 274 stream_state={}, # stream_state as an interpolation context is deprecated 275 records_schema=records_schema, 276 stream_slice=stream_slice, 277 next_page_token=next_page_token, 278 ) 279 280 @property # type: ignore 281 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 282 """The stream's primary key""" 283 return self._primary_key 284 285 @primary_key.setter 286 def primary_key(self, value: str) -> None: 287 if not isinstance(value, property): 288 self._primary_key = value 289 290 def _next_page_token( 291 self, 292 response: requests.Response, 293 last_page_size: int, 294 last_record: Optional[Record], 295 last_page_token_value: Optional[Any], 296 ) -> Optional[Mapping[str, Any]]: 297 """ 298 Specifies a pagination strategy. 299 300 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 301 302 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 303 """ 304 return self._paginator.next_page_token( 305 response=response, 306 last_page_size=last_page_size, 307 last_record=last_record, 308 last_page_token_value=last_page_token_value, 309 ) 310 311 def _fetch_next_page( 312 self, 313 stream_slice: StreamSlice, 314 next_page_token: Optional[Mapping[str, Any]] = None, 315 ) -> Optional[requests.Response]: 316 return self.requester.send_request( 317 path=self._paginator_path( 318 next_page_token=next_page_token, 319 stream_slice=stream_slice, 320 ), 321 stream_state={}, # stream_state as an interpolation context is deprecated 322 stream_slice=stream_slice, 323 next_page_token=next_page_token, 324 request_headers=self._request_headers( 325 stream_slice=stream_slice, 326 next_page_token=next_page_token, 327 ), 328 request_params=self._request_params( 329 stream_slice=stream_slice, 330 next_page_token=next_page_token, 331 ), 332 request_body_data=self._request_body_data( 333 stream_slice=stream_slice, 334 next_page_token=next_page_token, 335 ), 336 request_body_json=self._request_body_json( 337 stream_slice=stream_slice, 338 next_page_token=next_page_token, 339 ), 340 log_formatter=self.log_formatter, 341 ) 342 343 # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. 344 def _read_pages( 345 self, 346 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 347 stream_slice: StreamSlice, 348 ) -> Iterable[Record]: 349 original_stream_slice = stream_slice 350 pagination_tracker = self.pagination_tracker_factory() 351 reset_pagination = False 352 next_page_token = self._get_initial_next_page_token() 353 while True: 354 merged_records: MutableMapping[str, Any] = defaultdict(dict) 355 last_page_size = 0 356 last_record: Optional[Record] = None 357 358 response = None 359 try: 360 if self.additional_query_properties: 361 for ( 362 properties 363 ) in self.additional_query_properties.get_request_property_chunks(): 364 stream_slice = StreamSlice( 365 partition=stream_slice.partition or {}, 366 cursor_slice=stream_slice.cursor_slice or {}, 367 extra_fields={"query_properties": properties}, 368 ) 369 response = self._fetch_next_page(stream_slice, next_page_token) 370 371 for current_record in records_generator_fn(response): 372 if self.additional_query_properties.property_chunking: 373 merge_key = self.additional_query_properties.property_chunking.get_merge_key( 374 current_record 375 ) 376 if merge_key: 377 _deep_merge(merged_records[merge_key], current_record) 378 else: 379 # We should still emit records even if the record did not have a merge key 380 pagination_tracker.observe(current_record) 381 last_page_size += 1 382 last_record = current_record 383 yield current_record 384 else: 385 pagination_tracker.observe(current_record) 386 last_page_size += 1 387 last_record = current_record 388 yield current_record 389 390 for merged_record in merged_records.values(): 391 record = Record( 392 data=merged_record, stream_name=self.name, associated_slice=stream_slice 393 ) 394 pagination_tracker.observe(record) 395 last_page_size += 1 396 last_record = record 397 yield record 398 else: 399 response = self._fetch_next_page(stream_slice, next_page_token) 400 for current_record in records_generator_fn(response): 401 pagination_tracker.observe(current_record) 402 last_page_size += 1 403 last_record = current_record 404 yield current_record 405 except PaginationResetRequiredException: 406 reset_pagination = True 407 else: 408 if not response: 409 break 410 411 if reset_pagination or pagination_tracker.has_reached_limit(): 412 next_page_token = self._get_initial_next_page_token() 413 previous_slice = stream_slice 414 stream_slice = pagination_tracker.reduce_slice_range_if_possible( 415 stream_slice, original_stream_slice 416 ) 417 LOGGER.info( 418 f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}" 419 ) 420 reset_pagination = False 421 else: 422 last_page_token_value = ( 423 next_page_token.get("next_page_token") if next_page_token else None 424 ) 425 next_page_token = self._next_page_token( 426 response=response, # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine 427 last_page_size=last_page_size, 428 last_record=last_record, 429 last_page_token_value=last_page_token_value, 430 ) 431 if not next_page_token: 432 break 433 434 # Always return an empty generator just in case no records were ever yielded 435 yield from [] 436 437 def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]: 438 initial_token = self._paginator.get_initial_token() 439 next_page_token = {"next_page_token": initial_token} if initial_token is not None else None 440 return next_page_token 441 442 def read_records( 443 self, 444 records_schema: Mapping[str, Any], 445 stream_slice: Optional[StreamSlice] = None, 446 ) -> Iterable[StreamData]: 447 """ 448 Fetch a stream's records from an HTTP API source 449 450 :param records_schema: json schema to describe record 451 :param stream_slice: The stream slice to read data for 452 :return: The records read from the API source 453 """ 454 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 455 456 record_generator = partial( 457 self._parse_records, 458 stream_slice=stream_slice, 459 records_schema=records_schema, 460 ) 461 yield from self._read_pages(record_generator, _slice) 462 463 def _parse_records( 464 self, 465 response: Optional[requests.Response], 466 records_schema: Mapping[str, Any], 467 stream_slice: Optional[StreamSlice], 468 ) -> Iterable[Record]: 469 yield from self._parse_response( 470 response, 471 stream_slice=stream_slice, 472 records_schema=records_schema, 473 ) 474 475 def must_deduplicate_query_params(self) -> bool: 476 return True 477 478 @staticmethod 479 def _to_partition_key(to_serialize: Any) -> str: 480 # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value 481 return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.
Attributes:
- stream_name (str): The stream's name
- stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
- requester (Requester): The HTTP requester
- record_selector (HttpSelector): The record selector
- paginator (Optional[Paginator]): The paginator
- stream_slicer (Optional[StreamSlicer]): The stream slicer
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
109 @property # type: ignore 110 def name(self) -> str: 111 """ 112 :return: Stream name 113 """ 114 return ( 115 str(self._name.eval(self.config)) 116 if isinstance(self._name, InterpolatedString) 117 else self._name 118 )
Returns
Stream name
280 @property # type: ignore 281 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 282 """The stream's primary key""" 283 return self._primary_key
The stream's primary key
442 def read_records( 443 self, 444 records_schema: Mapping[str, Any], 445 stream_slice: Optional[StreamSlice] = None, 446 ) -> Iterable[StreamData]: 447 """ 448 Fetch a stream's records from an HTTP API source 449 450 :param records_schema: json schema to describe record 451 :param stream_slice: The stream slice to read data for 452 :return: The records read from the API source 453 """ 454 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 455 456 record_generator = partial( 457 self._parse_records, 458 stream_slice=stream_slice, 459 records_schema=records_schema, 460 ) 461 yield from self._read_pages(record_generator, _slice)
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
Inherited Members
19@dataclass 20class AsyncRetriever(Retriever): 21 config: Config 22 parameters: InitVar[Mapping[str, Any]] 23 record_selector: RecordSelector 24 stream_slicer: AsyncJobPartitionRouter 25 slice_logger: AlwaysLogSliceLogger = field( 26 init=False, 27 default_factory=lambda: AlwaysLogSliceLogger(), 28 ) 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._parameters = parameters 32 33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore 51 52 @exit_on_rate_limit.setter 53 def exit_on_rate_limit(self, value: bool) -> None: 54 """ 55 Sets the `exit_on_rate_limit` property of the job repository > creation_requester, 56 meaning that the Job cannot be placed / created if the rate limit is reached. 57 Thus no further work on managing jobs is expected to be done. 58 """ 59 job_orchestrator = self.stream_slicer._job_orchestrator 60 if job_orchestrator is not None: 61 job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] 62 63 def _validate_and_get_stream_slice_jobs( 64 self, stream_slice: Optional[StreamSlice] = None 65 ) -> Iterable[AsyncJob]: 66 """ 67 Validates the stream_slice argument and returns the partition from it. 68 69 Args: 70 stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. 71 72 Returns: 73 AsyncPartition: The partition extracted from the stream_slice. 74 75 Raises: 76 AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. 77 78 """ 79 return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] 80 81 def read_records( 82 self, 83 records_schema: Mapping[str, Any], 84 stream_slice: Optional[StreamSlice] = None, 85 ) -> Iterable[StreamData]: 86 # emit the slice_descriptor log message, for connector builder TestRead 87 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 88 89 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 90 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 91 92 yield from self.record_selector.filter_and_transform( 93 all_data=records, 94 stream_state={}, # stream_state as an interpolation context is deprecated 95 records_schema=records_schema, 96 stream_slice=stream_slice, 97 )
33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.
Note:
- If the
creation_requestercannot place / create the job - it might be the case of the RateLimits- If the
creation_requestercan place / create the job - it means all other requesters should successfully manage to complete the results.
81 def read_records( 82 self, 83 records_schema: Mapping[str, Any], 84 stream_slice: Optional[StreamSlice] = None, 85 ) -> Iterable[StreamData]: 86 # emit the slice_descriptor log message, for connector builder TestRead 87 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 88 89 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 90 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 91 92 yield from self.record_selector.filter_and_transform( 93 all_data=records, 94 stream_state={}, # stream_state as an interpolation context is deprecated 95 records_schema=records_schema, 96 stream_slice=stream_slice, 97 )
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
Inherited Members
504@deprecated( 505 "This class is experimental. Use at your own risk.", 506 category=ExperimentalClassWarning, 507) 508@dataclass 509class LazySimpleRetriever(SimpleRetriever): 510 """ 511 A retriever that supports lazy loading from parent streams. 512 """ 513 514 def _read_pages( 515 self, 516 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 517 stream_slice: StreamSlice, 518 ) -> Iterable[Record]: 519 response = stream_slice.extra_fields["child_response"] 520 if response: 521 last_page_size, last_record = 0, None 522 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 523 last_page_size += 1 524 last_record = record 525 yield record 526 527 next_page_token = self._next_page_token(response, last_page_size, last_record, None) 528 if next_page_token: 529 yield from self._paginate( 530 next_page_token, 531 records_generator_fn, 532 stream_slice, 533 ) 534 535 yield from [] 536 else: 537 # coderabbit detected an interesting bug/gap where if we were to not get a child_response, we 538 # might recurse forever. This might not be the case, but it is worth noting that this code path 539 # isn't comprehensively tested. 540 yield from self._read_pages(records_generator_fn, stream_slice) 541 542 def _paginate( 543 self, 544 next_page_token: Any, 545 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 546 stream_slice: StreamSlice, 547 ) -> Iterable[Record]: 548 """Handle pagination by fetching subsequent pages.""" 549 pagination_complete = False 550 551 while not pagination_complete: 552 response = self._fetch_next_page(stream_slice, next_page_token) 553 last_page_size, last_record = 0, None 554 555 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 556 last_page_size += 1 557 last_record = record 558 yield record 559 560 if not response: 561 pagination_complete = True 562 else: 563 last_page_token_value = ( 564 next_page_token.get("next_page_token") if next_page_token else None 565 ) 566 next_page_token = self._next_page_token( 567 response, last_page_size, last_record, last_page_token_value 568 ) 569 570 if not next_page_token: 571 pagination_complete = True
A retriever that supports lazy loading from parent streams.