airbyte_cdk.sources.declarative.retrievers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( 8 LazySimpleRetriever, 9 SimpleRetriever, 10) 11 12__all__ = [ 13 "Retriever", 14 "SimpleRetriever", 15 "AsyncRetriever", 16 "LazySimpleRetriever", 17]
15class Retriever: 16 """ 17 Responsible for fetching a stream's records from an HTTP API source. 18 """ 19 20 @abstractmethod 21 def read_records( 22 self, 23 records_schema: Mapping[str, Any], 24 stream_slice: Optional[StreamSlice] = None, 25 ) -> Iterable[StreamData]: 26 """ 27 Fetch a stream's records from an HTTP API source 28 29 :param records_schema: json schema to describe record 30 :param stream_slice: The stream slice to read data for 31 :return: The records read from the API source 32 """ 33 34 @abstractmethod 35 @deprecated("Stream slicing is being moved to the stream level.") 36 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 37 """Returns the stream slices""" 38 39 @property 40 @abstractmethod 41 @deprecated("State management is being moved to the stream level.") 42 def state(self) -> StreamState: 43 """State getter, should return state in form that can serialized to a string and send to the output 44 as a STATE AirbyteMessage. 45 46 A good example of a state is a cursor_value: 47 { 48 self.cursor_field: "cursor_value" 49 } 50 51 State should try to be as small as possible but at the same time descriptive enough to restore 52 syncing process from the point where it stopped. 53 """ 54 55 @state.setter 56 @abstractmethod 57 @deprecated("State management is being moved to the stream level.") 58 def state(self, value: StreamState) -> None: 59 """State setter, accept state serialized by state getter."""
Responsible for fetching a stream's records from an HTTP API source.
20 @abstractmethod 21 def read_records( 22 self, 23 records_schema: Mapping[str, Any], 24 stream_slice: Optional[StreamSlice] = None, 25 ) -> Iterable[StreamData]: 26 """ 27 Fetch a stream's records from an HTTP API source 28 29 :param records_schema: json schema to describe record 30 :param stream_slice: The stream slice to read data for 31 :return: The records read from the API source 32 """
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
34 @abstractmethod 35 @deprecated("Stream slicing is being moved to the stream level.") 36 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 37 """Returns the stream slices"""
Returns the stream slices
39 @property 40 @abstractmethod 41 @deprecated("State management is being moved to the stream level.") 42 def state(self) -> StreamState: 43 """State getter, should return state in form that can serialized to a string and send to the output 44 as a STATE AirbyteMessage. 45 46 A good example of a state is a cursor_value: 47 { 48 self.cursor_field: "cursor_value" 49 } 50 51 State should try to be as small as possible but at the same time descriptive enough to restore 52 syncing process from the point where it stopped. 53 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
59@dataclass 60class SimpleRetriever(Retriever): 61 """ 62 Retrieves records by synchronously sending requests to fetch records. 63 64 The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. 65 66 For each stream slice, submit requests until there are no more pages of records to fetch. 67 68 This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. 69 As a result, some of the parameters passed to some methods are unused. 70 The two will be decoupled in a future release. 71 72 Attributes: 73 stream_name (str): The stream's name 74 stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key 75 requester (Requester): The HTTP requester 76 record_selector (HttpSelector): The record selector 77 paginator (Optional[Paginator]): The paginator 78 stream_slicer (Optional[StreamSlicer]): The stream slicer 79 cursor (Optional[cursor]): The cursor 80 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 81 """ 82 83 requester: Requester 84 record_selector: HttpSelector 85 config: Config 86 parameters: InitVar[Mapping[str, Any]] 87 name: str 88 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 89 primary_key: Optional[Union[str, List[str], List[List[str]]]] 90 _primary_key: str = field(init=False, repr=False, default="") 91 paginator: Optional[Paginator] = None 92 stream_slicer: StreamSlicer = field( 93 default_factory=lambda: SinglePartitionRouter(parameters={}) 94 ) 95 request_option_provider: RequestOptionsProvider = field( 96 default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) 97 ) 98 cursor: Optional[DeclarativeCursor] = None 99 ignore_stream_slicer_parameters_on_paginated_requests: bool = False 100 additional_query_properties: Optional[QueryProperties] = None 101 log_formatter: Optional[Callable[[requests.Response], Any]] = None 102 pagination_tracker_factory: Callable[[], PaginationTracker] = field( 103 default_factory=lambda: lambda: PaginationTracker() 104 ) 105 106 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 107 # while changing `ModelToComponentFactory.create_simple_retriever` to accept a cursor, the sources implementing 108 # a CustomRetriever inheriting for SimpleRetriever needed to have the following validation added. 109 self.cursor = None if isinstance(self.cursor, Cursor) else self.cursor 110 self._paginator = self.paginator or NoPagination(parameters=parameters) 111 self._parameters = parameters 112 self._name = ( 113 InterpolatedString(self._name, parameters=parameters) 114 if isinstance(self._name, str) 115 else self._name 116 ) 117 118 @property # type: ignore 119 def name(self) -> str: 120 """ 121 :return: Stream name 122 """ 123 return ( 124 str(self._name.eval(self.config)) 125 if isinstance(self._name, InterpolatedString) 126 else self._name 127 ) 128 129 @name.setter 130 def name(self, value: str) -> None: 131 if not isinstance(value, property): 132 self._name = value 133 134 def _get_mapping( 135 self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any 136 ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]: 137 """ 138 Get mapping from the provided method, and get the keys of the mapping. 139 If the method returns a string, it will return the string and an empty set. 140 If the method returns a dict, it will return the dict and its keys. 141 """ 142 mapping = method(**kwargs) or {} 143 keys = set(mapping.keys()) if not isinstance(mapping, str) else set() 144 return mapping, keys 145 146 def _get_request_options( 147 self, 148 stream_state: Optional[StreamData], 149 stream_slice: Optional[StreamSlice], 150 next_page_token: Optional[Mapping[str, Any]], 151 paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 152 stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 153 ) -> Union[Mapping[str, Any], str]: 154 """ 155 Get the request_option from the paginator and the stream slicer. 156 Raise a ValueError if there's a key collision 157 Returned merged mapping otherwise 158 """ 159 # FIXME we should eventually remove the usage of stream_state as part of the interpolation 160 161 is_body_json = paginator_method.__name__ == "get_request_body_json" 162 163 mappings = [ 164 paginator_method( 165 stream_slice=stream_slice, 166 next_page_token=next_page_token, 167 ), 168 ] 169 if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: 170 mappings.append( 171 stream_slicer_method( 172 stream_slice=stream_slice, 173 next_page_token=next_page_token, 174 ) 175 ) 176 return combine_mappings(mappings, allow_same_value_merge=is_body_json) 177 178 def _request_headers( 179 self, 180 stream_state: Optional[StreamData] = None, 181 stream_slice: Optional[StreamSlice] = None, 182 next_page_token: Optional[Mapping[str, Any]] = None, 183 ) -> Mapping[str, Any]: 184 """ 185 Specifies request headers. 186 Authentication headers will overwrite any overlapping headers returned from this method. 187 """ 188 headers = self._get_request_options( 189 stream_state, 190 stream_slice, 191 next_page_token, 192 self._paginator.get_request_headers, 193 self.request_option_provider.get_request_headers, 194 ) 195 if isinstance(headers, str): 196 raise ValueError("Request headers cannot be a string") 197 return {str(k): str(v) for k, v in headers.items()} 198 199 def _request_params( 200 self, 201 stream_state: Optional[StreamData] = None, 202 stream_slice: Optional[StreamSlice] = None, 203 next_page_token: Optional[Mapping[str, Any]] = None, 204 ) -> Mapping[str, Any]: 205 """ 206 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 207 208 E.g: you might want to define query parameters for paging if next_page_token is not None. 209 """ 210 params = self._get_request_options( 211 stream_state, 212 stream_slice, 213 next_page_token, 214 self._paginator.get_request_params, 215 self.request_option_provider.get_request_params, 216 ) 217 if isinstance(params, str): 218 raise ValueError("Request params cannot be a string") 219 return params 220 221 def _request_body_data( 222 self, 223 stream_state: Optional[StreamData] = None, 224 stream_slice: Optional[StreamSlice] = None, 225 next_page_token: Optional[Mapping[str, Any]] = None, 226 ) -> Union[Mapping[str, Any], str]: 227 """ 228 Specifies how to populate the body of the request with a non-JSON payload. 229 230 If returns a ready text that it will be sent as is. 231 If returns a dict that it will be converted to a urlencoded form. 232 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 233 234 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 235 """ 236 return self._get_request_options( 237 stream_state, 238 stream_slice, 239 next_page_token, 240 self._paginator.get_request_body_data, 241 self.request_option_provider.get_request_body_data, 242 ) 243 244 def _request_body_json( 245 self, 246 stream_state: Optional[StreamData] = None, 247 stream_slice: Optional[StreamSlice] = None, 248 next_page_token: Optional[Mapping[str, Any]] = None, 249 ) -> Optional[Mapping[str, Any]]: 250 """ 251 Specifies how to populate the body of the request with a JSON payload. 252 253 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 254 """ 255 body_json = self._get_request_options( 256 stream_state, 257 stream_slice, 258 next_page_token, 259 self._paginator.get_request_body_json, 260 self.request_option_provider.get_request_body_json, 261 ) 262 if isinstance(body_json, str): 263 raise ValueError("Request body json cannot be a string") 264 return body_json 265 266 def _paginator_path( 267 self, 268 next_page_token: Optional[Mapping[str, Any]] = None, 269 stream_state: Optional[Mapping[str, Any]] = None, 270 stream_slice: Optional[StreamSlice] = None, 271 ) -> Optional[str]: 272 """ 273 If the paginator points to a path, follow it, else return nothing so the requester is used. 274 :param next_page_token: 275 :return: 276 """ 277 return self._paginator.path( 278 next_page_token=next_page_token, 279 stream_state=stream_state, 280 stream_slice=stream_slice, 281 ) 282 283 def _parse_response( 284 self, 285 response: Optional[requests.Response], 286 stream_state: StreamState, 287 records_schema: Mapping[str, Any], 288 stream_slice: Optional[StreamSlice] = None, 289 next_page_token: Optional[Mapping[str, Any]] = None, 290 ) -> Iterable[Record]: 291 if not response: 292 yield from [] 293 else: 294 yield from self.record_selector.select_records( 295 response=response, 296 stream_state=stream_state, 297 records_schema=records_schema, 298 stream_slice=stream_slice, 299 next_page_token=next_page_token, 300 ) 301 302 @property # type: ignore 303 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 304 """The stream's primary key""" 305 return self._primary_key 306 307 @primary_key.setter 308 def primary_key(self, value: str) -> None: 309 if not isinstance(value, property): 310 self._primary_key = value 311 312 def _next_page_token( 313 self, 314 response: requests.Response, 315 last_page_size: int, 316 last_record: Optional[Record], 317 last_page_token_value: Optional[Any], 318 ) -> Optional[Mapping[str, Any]]: 319 """ 320 Specifies a pagination strategy. 321 322 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 323 324 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 325 """ 326 return self._paginator.next_page_token( 327 response=response, 328 last_page_size=last_page_size, 329 last_record=last_record, 330 last_page_token_value=last_page_token_value, 331 ) 332 333 def _fetch_next_page( 334 self, 335 stream_state: Mapping[str, Any], 336 stream_slice: StreamSlice, 337 next_page_token: Optional[Mapping[str, Any]] = None, 338 ) -> Optional[requests.Response]: 339 return self.requester.send_request( 340 path=self._paginator_path( 341 next_page_token=next_page_token, 342 stream_state=stream_state, 343 stream_slice=stream_slice, 344 ), 345 stream_state=stream_state, 346 stream_slice=stream_slice, 347 next_page_token=next_page_token, 348 request_headers=self._request_headers( 349 stream_state=stream_state, 350 stream_slice=stream_slice, 351 next_page_token=next_page_token, 352 ), 353 request_params=self._request_params( 354 stream_state=stream_state, 355 stream_slice=stream_slice, 356 next_page_token=next_page_token, 357 ), 358 request_body_data=self._request_body_data( 359 stream_state=stream_state, 360 stream_slice=stream_slice, 361 next_page_token=next_page_token, 362 ), 363 request_body_json=self._request_body_json( 364 stream_state=stream_state, 365 stream_slice=stream_slice, 366 next_page_token=next_page_token, 367 ), 368 log_formatter=self.log_formatter, 369 ) 370 371 # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. 372 def _read_pages( 373 self, 374 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 375 stream_state: Mapping[str, Any], 376 stream_slice: StreamSlice, 377 ) -> Iterable[Record]: 378 pagination_tracker = self.pagination_tracker_factory() 379 reset_pagination = False 380 next_page_token = self._get_initial_next_page_token() 381 while True: 382 merged_records: MutableMapping[str, Any] = defaultdict(dict) 383 last_page_size = 0 384 last_record: Optional[Record] = None 385 386 response = None 387 try: 388 if ( 389 self.additional_query_properties 390 and self.additional_query_properties.property_chunking 391 ): 392 for properties in self.additional_query_properties.get_request_property_chunks( 393 stream_slice=stream_slice 394 ): 395 stream_slice = StreamSlice( 396 partition=stream_slice.partition or {}, 397 cursor_slice=stream_slice.cursor_slice or {}, 398 extra_fields={"query_properties": properties}, 399 ) 400 response = self._fetch_next_page( 401 stream_state, stream_slice, next_page_token 402 ) 403 404 for current_record in records_generator_fn(response): 405 merge_key = ( 406 self.additional_query_properties.property_chunking.get_merge_key( 407 current_record 408 ) 409 ) 410 if merge_key: 411 _deep_merge(merged_records[merge_key], current_record) 412 else: 413 # We should still emit records even if the record did not have a merge key 414 pagination_tracker.observe(current_record) 415 last_page_size += 1 416 last_record = current_record 417 yield current_record 418 419 for merged_record in merged_records.values(): 420 record = Record( 421 data=merged_record, stream_name=self.name, associated_slice=stream_slice 422 ) 423 pagination_tracker.observe(record) 424 last_page_size += 1 425 last_record = record 426 yield record 427 else: 428 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 429 for current_record in records_generator_fn(response): 430 pagination_tracker.observe(current_record) 431 last_page_size += 1 432 last_record = current_record 433 yield current_record 434 except PaginationResetRequiredException: 435 reset_pagination = True 436 else: 437 if not response: 438 break 439 440 if reset_pagination or pagination_tracker.has_reached_limit(): 441 next_page_token = self._get_initial_next_page_token() 442 previous_slice = stream_slice 443 stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice) 444 LOGGER.info( 445 f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}" 446 ) 447 reset_pagination = False 448 else: 449 last_page_token_value = ( 450 next_page_token.get("next_page_token") if next_page_token else None 451 ) 452 next_page_token = self._next_page_token( 453 response=response, # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine 454 last_page_size=last_page_size, 455 last_record=last_record, 456 last_page_token_value=last_page_token_value, 457 ) 458 if not next_page_token: 459 break 460 461 # Always return an empty generator just in case no records were ever yielded 462 yield from [] 463 464 def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]: 465 initial_token = self._paginator.get_initial_token() 466 next_page_token = {"next_page_token": initial_token} if initial_token is not None else None 467 return next_page_token 468 469 def _read_single_page( 470 self, 471 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 472 stream_state: Mapping[str, Any], 473 stream_slice: StreamSlice, 474 ) -> Iterable[StreamData]: 475 initial_token = stream_state.get("next_page_token") 476 if initial_token is None: 477 initial_token = self._paginator.get_initial_token() 478 next_page_token: Optional[Mapping[str, Any]] = ( 479 {"next_page_token": initial_token} if initial_token else None 480 ) 481 482 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 483 484 last_page_size = 0 485 last_record: Optional[Record] = None 486 for record in records_generator_fn(response): 487 last_page_size += 1 488 last_record = record 489 yield record 490 491 if not response: 492 next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 493 else: 494 last_page_token_value = ( 495 next_page_token.get("next_page_token") if next_page_token else None 496 ) 497 next_page_token = self._next_page_token( 498 response=response, 499 last_page_size=last_page_size, 500 last_record=last_record, 501 last_page_token_value=last_page_token_value, 502 ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 503 504 if self.cursor: 505 self.cursor.close_slice( 506 StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition) 507 ) 508 509 # Always return an empty generator just in case no records were ever yielded 510 yield from [] 511 512 def read_records( 513 self, 514 records_schema: Mapping[str, Any], 515 stream_slice: Optional[StreamSlice] = None, 516 ) -> Iterable[StreamData]: 517 """ 518 Fetch a stream's records from an HTTP API source 519 520 :param records_schema: json schema to describe record 521 :param stream_slice: The stream slice to read data for 522 :return: The records read from the API source 523 """ 524 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 525 526 most_recent_record_from_slice = None 527 record_generator = partial( 528 self._parse_records, 529 stream_slice=stream_slice, 530 stream_state=self.state or {}, 531 records_schema=records_schema, 532 ) 533 534 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 535 stream_state = self.state 536 537 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 538 # fetch more records. The platform deletes stream state for full refresh streams before starting a 539 # new job, so we don't need to worry about this value existing for the initial attempt 540 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 541 return 542 543 yield from self._read_single_page(record_generator, stream_state, _slice) 544 else: 545 for stream_data in self._read_pages(record_generator, self.state, _slice): 546 current_record = self._extract_record(stream_data, _slice) 547 if self.cursor and current_record: 548 self.cursor.observe(_slice, current_record) 549 550 yield stream_data 551 552 if self.cursor: 553 self.cursor.close_slice(_slice) 554 return 555 556 # FIXME based on the comment above in SimpleRetriever.read_records, it seems like we can tackle https://github.com/airbytehq/airbyte-internal-issues/issues/6955 and remove this 557 558 def _extract_record( 559 self, stream_data: StreamData, stream_slice: StreamSlice 560 ) -> Optional[Record]: 561 """ 562 As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize 563 to data to streamline the rest of the process. 564 """ 565 if isinstance(stream_data, Record): 566 # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData` 567 return stream_data 568 elif isinstance(stream_data, (dict, Mapping)): 569 return Record( 570 data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name 571 ) 572 elif isinstance(stream_data, AirbyteMessage) and stream_data.record: 573 return Record( 574 data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data 575 associated_slice=stream_slice, 576 stream_name=self.name, 577 ) 578 return None 579 580 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 581 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 582 """ 583 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 584 585 :param sync_mode: 586 :param cursor_field: 587 :param stream_state: 588 :return: 589 """ 590 return self.stream_slicer.stream_slices() 591 592 # todo: There are a number of things that can be cleaned up when we remove self.cursor and all the related 593 # SimpleRetriever state management that is handled by the concurrent CDK Framework: 594 # - ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated 595 # - ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder 596 # - test_simple_retriever.py: Remove all imports and usages of legacy cursor components 597 # - test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed 598 @property 599 def state(self) -> Mapping[str, Any]: 600 return self.cursor.get_stream_state() if self.cursor else {} 601 602 @state.setter 603 def state(self, value: StreamState) -> None: 604 """State setter, accept state serialized by state getter.""" 605 if self.cursor: 606 self.cursor.set_initial_state(value) 607 608 def _parse_records( 609 self, 610 response: Optional[requests.Response], 611 stream_state: Mapping[str, Any], 612 records_schema: Mapping[str, Any], 613 stream_slice: Optional[StreamSlice], 614 ) -> Iterable[Record]: 615 yield from self._parse_response( 616 response, 617 stream_slice=stream_slice, 618 stream_state=stream_state, 619 records_schema=records_schema, 620 ) 621 622 def must_deduplicate_query_params(self) -> bool: 623 return True 624 625 @staticmethod 626 def _to_partition_key(to_serialize: Any) -> str: 627 # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value 628 return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.
Attributes:
- stream_name (str): The stream's name
- stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
- requester (Requester): The HTTP requester
- record_selector (HttpSelector): The record selector
- paginator (Optional[Paginator]): The paginator
- stream_slicer (Optional[StreamSlicer]): The stream slicer
- cursor (Optional[cursor]): The cursor
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
118 @property # type: ignore 119 def name(self) -> str: 120 """ 121 :return: Stream name 122 """ 123 return ( 124 str(self._name.eval(self.config)) 125 if isinstance(self._name, InterpolatedString) 126 else self._name 127 )
Returns
Stream name
302 @property # type: ignore 303 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 304 """The stream's primary key""" 305 return self._primary_key
The stream's primary key
512 def read_records( 513 self, 514 records_schema: Mapping[str, Any], 515 stream_slice: Optional[StreamSlice] = None, 516 ) -> Iterable[StreamData]: 517 """ 518 Fetch a stream's records from an HTTP API source 519 520 :param records_schema: json schema to describe record 521 :param stream_slice: The stream slice to read data for 522 :return: The records read from the API source 523 """ 524 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 525 526 most_recent_record_from_slice = None 527 record_generator = partial( 528 self._parse_records, 529 stream_slice=stream_slice, 530 stream_state=self.state or {}, 531 records_schema=records_schema, 532 ) 533 534 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 535 stream_state = self.state 536 537 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 538 # fetch more records. The platform deletes stream state for full refresh streams before starting a 539 # new job, so we don't need to worry about this value existing for the initial attempt 540 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 541 return 542 543 yield from self._read_single_page(record_generator, stream_state, _slice) 544 else: 545 for stream_data in self._read_pages(record_generator, self.state, _slice): 546 current_record = self._extract_record(stream_data, _slice) 547 if self.cursor and current_record: 548 self.cursor.observe(_slice, current_record) 549 550 yield stream_data 551 552 if self.cursor: 553 self.cursor.close_slice(_slice) 554 return
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
581 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 582 """ 583 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 584 585 :param sync_mode: 586 :param cursor_field: 587 :param stream_state: 588 :return: 589 """ 590 return self.stream_slicer.stream_slices()
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
598 @property 599 def state(self) -> Mapping[str, Any]: 600 return self.cursor.get_stream_state() if self.cursor else {}
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
19@dataclass 20class AsyncRetriever(Retriever): 21 config: Config 22 parameters: InitVar[Mapping[str, Any]] 23 record_selector: RecordSelector 24 stream_slicer: AsyncJobPartitionRouter 25 slice_logger: AlwaysLogSliceLogger = field( 26 init=False, 27 default_factory=lambda: AlwaysLogSliceLogger(), 28 ) 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._parameters = parameters 32 33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore 51 52 @exit_on_rate_limit.setter 53 def exit_on_rate_limit(self, value: bool) -> None: 54 """ 55 Sets the `exit_on_rate_limit` property of the job repository > creation_requester, 56 meaning that the Job cannot be placed / created if the rate limit is reached. 57 Thus no further work on managing jobs is expected to be done. 58 """ 59 job_orchestrator = self.stream_slicer._job_orchestrator 60 if job_orchestrator is not None: 61 job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] 62 63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {} 69 70 @state.setter 71 def state(self, value: StreamState) -> None: 72 """ 73 As a first iteration for sendgrid, there is no state to be managed 74 """ 75 pass 76 77 def _get_stream_state(self) -> StreamState: 78 """ 79 Gets the current state of the stream. 80 81 Returns: 82 StreamState: Mapping[str, Any] 83 """ 84 85 return self.state 86 87 def _validate_and_get_stream_slice_jobs( 88 self, stream_slice: Optional[StreamSlice] = None 89 ) -> Iterable[AsyncJob]: 90 """ 91 Validates the stream_slice argument and returns the partition from it. 92 93 Args: 94 stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. 95 96 Returns: 97 AsyncPartition: The partition extracted from the stream_slice. 98 99 Raises: 100 AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. 101 102 """ 103 return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] 104 105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices() 107 108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.
Note:
- If the
creation_requester
cannot place / create the job - it might be the case of the RateLimits- If the
creation_requester
can place / create the job - it means all other requesters should successfully manage to complete the results.
63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {}
As a first iteration for sendgrid, there is no state to be managed
105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices()
Returns the stream slices
108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
651@deprecated( 652 "This class is experimental. Use at your own risk.", 653 category=ExperimentalClassWarning, 654) 655@dataclass 656class LazySimpleRetriever(SimpleRetriever): 657 """ 658 A retriever that supports lazy loading from parent streams. 659 """ 660 661 def _read_pages( 662 self, 663 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 664 stream_state: Mapping[str, Any], 665 stream_slice: StreamSlice, 666 ) -> Iterable[Record]: 667 response = stream_slice.extra_fields["child_response"] 668 if response: 669 last_page_size, last_record = 0, None 670 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 671 last_page_size += 1 672 last_record = record 673 yield record 674 675 next_page_token = self._next_page_token(response, last_page_size, last_record, None) 676 if next_page_token: 677 yield from self._paginate( 678 next_page_token, 679 records_generator_fn, 680 stream_state, 681 stream_slice, 682 ) 683 684 yield from [] 685 else: 686 yield from self._read_pages(records_generator_fn, stream_state, stream_slice) 687 688 def _paginate( 689 self, 690 next_page_token: Any, 691 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 692 stream_state: Mapping[str, Any], 693 stream_slice: StreamSlice, 694 ) -> Iterable[Record]: 695 """Handle pagination by fetching subsequent pages.""" 696 pagination_complete = False 697 698 while not pagination_complete: 699 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 700 last_page_size, last_record = 0, None 701 702 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 703 last_page_size += 1 704 last_record = record 705 yield record 706 707 if not response: 708 pagination_complete = True 709 else: 710 last_page_token_value = ( 711 next_page_token.get("next_page_token") if next_page_token else None 712 ) 713 next_page_token = self._next_page_token( 714 response, last_page_size, last_record, last_page_token_value 715 ) 716 717 if not next_page_token: 718 pagination_complete = True
A retriever that supports lazy loading from parent streams.
Inherited Members
- SimpleRetriever
- requester
- record_selector
- config
- parameters
- name
- primary_key
- paginator
- stream_slicer
- request_option_provider
- cursor
- ignore_stream_slicer_parameters_on_paginated_requests
- additional_query_properties
- log_formatter
- pagination_tracker_factory
- read_records
- stream_slices
- state
- must_deduplicate_query_params