airbyte_cdk.sources.declarative.retrievers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( 8 LazySimpleRetriever, 9 SimpleRetriever, 10 SimpleRetrieverTestReadDecorator, 11) 12 13__all__ = [ 14 "Retriever", 15 "SimpleRetriever", 16 "SimpleRetrieverTestReadDecorator", 17 "AsyncRetriever", 18 "LazySimpleRetriever", 19]
14class Retriever: 15 """ 16 Responsible for fetching a stream's records from an HTTP API source. 17 """ 18 19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """ 32 33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices""" 36 37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """ 51 52 @state.setter 53 @abstractmethod 54 def state(self, value: StreamState) -> None: 55 """State setter, accept state serialized by state getter."""
Responsible for fetching a stream's records from an HTTP API source.
19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices"""
Returns the stream slices
37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
55@dataclass 56class SimpleRetriever(Retriever): 57 """ 58 Retrieves records by synchronously sending requests to fetch records. 59 60 The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. 61 62 For each stream slice, submit requests until there are no more pages of records to fetch. 63 64 This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. 65 As a result, some of the parameters passed to some methods are unused. 66 The two will be decoupled in a future release. 67 68 Attributes: 69 stream_name (str): The stream's name 70 stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key 71 requester (Requester): The HTTP requester 72 record_selector (HttpSelector): The record selector 73 paginator (Optional[Paginator]): The paginator 74 stream_slicer (Optional[StreamSlicer]): The stream slicer 75 cursor (Optional[cursor]): The cursor 76 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 77 """ 78 79 requester: Requester 80 record_selector: HttpSelector 81 config: Config 82 parameters: InitVar[Mapping[str, Any]] 83 name: str 84 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 85 primary_key: Optional[Union[str, List[str], List[List[str]]]] 86 _primary_key: str = field(init=False, repr=False, default="") 87 paginator: Optional[Paginator] = None 88 stream_slicer: StreamSlicer = field( 89 default_factory=lambda: SinglePartitionRouter(parameters={}) 90 ) 91 request_option_provider: RequestOptionsProvider = field( 92 default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) 93 ) 94 cursor: Optional[DeclarativeCursor] = None 95 ignore_stream_slicer_parameters_on_paginated_requests: bool = False 96 additional_query_properties: Optional[QueryProperties] = None 97 98 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 99 self._paginator = self.paginator or NoPagination(parameters=parameters) 100 self._parameters = parameters 101 self._name = ( 102 InterpolatedString(self._name, parameters=parameters) 103 if isinstance(self._name, str) 104 else self._name 105 ) 106 107 @property # type: ignore 108 def name(self) -> str: 109 """ 110 :return: Stream name 111 """ 112 return ( 113 str(self._name.eval(self.config)) 114 if isinstance(self._name, InterpolatedString) 115 else self._name 116 ) 117 118 @name.setter 119 def name(self, value: str) -> None: 120 if not isinstance(value, property): 121 self._name = value 122 123 def _get_mapping( 124 self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any 125 ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]: 126 """ 127 Get mapping from the provided method, and get the keys of the mapping. 128 If the method returns a string, it will return the string and an empty set. 129 If the method returns a dict, it will return the dict and its keys. 130 """ 131 mapping = method(**kwargs) or {} 132 keys = set(mapping.keys()) if not isinstance(mapping, str) else set() 133 return mapping, keys 134 135 def _get_request_options( 136 self, 137 stream_state: Optional[StreamData], 138 stream_slice: Optional[StreamSlice], 139 next_page_token: Optional[Mapping[str, Any]], 140 paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 141 stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 142 ) -> Union[Mapping[str, Any], str]: 143 """ 144 Get the request_option from the paginator and the stream slicer. 145 Raise a ValueError if there's a key collision 146 Returned merged mapping otherwise 147 """ 148 # FIXME we should eventually remove the usage of stream_state as part of the interpolation 149 150 is_body_json = paginator_method.__name__ == "get_request_body_json" 151 152 mappings = [ 153 paginator_method( 154 stream_slice=stream_slice, 155 next_page_token=next_page_token, 156 ), 157 ] 158 if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: 159 mappings.append( 160 stream_slicer_method( 161 stream_slice=stream_slice, 162 next_page_token=next_page_token, 163 ) 164 ) 165 return combine_mappings(mappings, allow_same_value_merge=is_body_json) 166 167 def _request_headers( 168 self, 169 stream_state: Optional[StreamData] = None, 170 stream_slice: Optional[StreamSlice] = None, 171 next_page_token: Optional[Mapping[str, Any]] = None, 172 ) -> Mapping[str, Any]: 173 """ 174 Specifies request headers. 175 Authentication headers will overwrite any overlapping headers returned from this method. 176 """ 177 headers = self._get_request_options( 178 stream_state, 179 stream_slice, 180 next_page_token, 181 self._paginator.get_request_headers, 182 self.request_option_provider.get_request_headers, 183 ) 184 if isinstance(headers, str): 185 raise ValueError("Request headers cannot be a string") 186 return {str(k): str(v) for k, v in headers.items()} 187 188 def _request_params( 189 self, 190 stream_state: Optional[StreamData] = None, 191 stream_slice: Optional[StreamSlice] = None, 192 next_page_token: Optional[Mapping[str, Any]] = None, 193 ) -> Mapping[str, Any]: 194 """ 195 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 196 197 E.g: you might want to define query parameters for paging if next_page_token is not None. 198 """ 199 params = self._get_request_options( 200 stream_state, 201 stream_slice, 202 next_page_token, 203 self._paginator.get_request_params, 204 self.request_option_provider.get_request_params, 205 ) 206 if isinstance(params, str): 207 raise ValueError("Request params cannot be a string") 208 return params 209 210 def _request_body_data( 211 self, 212 stream_state: Optional[StreamData] = None, 213 stream_slice: Optional[StreamSlice] = None, 214 next_page_token: Optional[Mapping[str, Any]] = None, 215 ) -> Union[Mapping[str, Any], str]: 216 """ 217 Specifies how to populate the body of the request with a non-JSON payload. 218 219 If returns a ready text that it will be sent as is. 220 If returns a dict that it will be converted to a urlencoded form. 221 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 222 223 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 224 """ 225 return self._get_request_options( 226 stream_state, 227 stream_slice, 228 next_page_token, 229 self._paginator.get_request_body_data, 230 self.request_option_provider.get_request_body_data, 231 ) 232 233 def _request_body_json( 234 self, 235 stream_state: Optional[StreamData] = None, 236 stream_slice: Optional[StreamSlice] = None, 237 next_page_token: Optional[Mapping[str, Any]] = None, 238 ) -> Optional[Mapping[str, Any]]: 239 """ 240 Specifies how to populate the body of the request with a JSON payload. 241 242 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 243 """ 244 body_json = self._get_request_options( 245 stream_state, 246 stream_slice, 247 next_page_token, 248 self._paginator.get_request_body_json, 249 self.request_option_provider.get_request_body_json, 250 ) 251 if isinstance(body_json, str): 252 raise ValueError("Request body json cannot be a string") 253 return body_json 254 255 def _paginator_path( 256 self, 257 next_page_token: Optional[Mapping[str, Any]] = None, 258 stream_state: Optional[Mapping[str, Any]] = None, 259 stream_slice: Optional[StreamSlice] = None, 260 ) -> Optional[str]: 261 """ 262 If the paginator points to a path, follow it, else return nothing so the requester is used. 263 :param next_page_token: 264 :return: 265 """ 266 return self._paginator.path( 267 next_page_token=next_page_token, 268 stream_state=stream_state, 269 stream_slice=stream_slice, 270 ) 271 272 def _parse_response( 273 self, 274 response: Optional[requests.Response], 275 stream_state: StreamState, 276 records_schema: Mapping[str, Any], 277 stream_slice: Optional[StreamSlice] = None, 278 next_page_token: Optional[Mapping[str, Any]] = None, 279 ) -> Iterable[Record]: 280 if not response: 281 yield from [] 282 else: 283 yield from self.record_selector.select_records( 284 response=response, 285 stream_state=stream_state, 286 records_schema=records_schema, 287 stream_slice=stream_slice, 288 next_page_token=next_page_token, 289 ) 290 291 @property # type: ignore 292 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 293 """The stream's primary key""" 294 return self._primary_key 295 296 @primary_key.setter 297 def primary_key(self, value: str) -> None: 298 if not isinstance(value, property): 299 self._primary_key = value 300 301 def _next_page_token( 302 self, 303 response: requests.Response, 304 last_page_size: int, 305 last_record: Optional[Record], 306 last_page_token_value: Optional[Any], 307 ) -> Optional[Mapping[str, Any]]: 308 """ 309 Specifies a pagination strategy. 310 311 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 312 313 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 314 """ 315 return self._paginator.next_page_token( 316 response=response, 317 last_page_size=last_page_size, 318 last_record=last_record, 319 last_page_token_value=last_page_token_value, 320 ) 321 322 def _fetch_next_page( 323 self, 324 stream_state: Mapping[str, Any], 325 stream_slice: StreamSlice, 326 next_page_token: Optional[Mapping[str, Any]] = None, 327 ) -> Optional[requests.Response]: 328 return self.requester.send_request( 329 path=self._paginator_path( 330 next_page_token=next_page_token, 331 stream_state=stream_state, 332 stream_slice=stream_slice, 333 ), 334 stream_state=stream_state, 335 stream_slice=stream_slice, 336 next_page_token=next_page_token, 337 request_headers=self._request_headers( 338 stream_state=stream_state, 339 stream_slice=stream_slice, 340 next_page_token=next_page_token, 341 ), 342 request_params=self._request_params( 343 stream_state=stream_state, 344 stream_slice=stream_slice, 345 next_page_token=next_page_token, 346 ), 347 request_body_data=self._request_body_data( 348 stream_state=stream_state, 349 stream_slice=stream_slice, 350 next_page_token=next_page_token, 351 ), 352 request_body_json=self._request_body_json( 353 stream_state=stream_state, 354 stream_slice=stream_slice, 355 next_page_token=next_page_token, 356 ), 357 ) 358 359 # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. 360 def _read_pages( 361 self, 362 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 363 stream_state: Mapping[str, Any], 364 stream_slice: StreamSlice, 365 ) -> Iterable[Record]: 366 pagination_complete = False 367 initial_token = self._paginator.get_initial_token() 368 next_page_token: Optional[Mapping[str, Any]] = ( 369 {"next_page_token": initial_token} if initial_token is not None else None 370 ) 371 while not pagination_complete: 372 property_chunks: List[List[str]] = ( 373 list( 374 self.additional_query_properties.get_request_property_chunks( 375 stream_slice=stream_slice 376 ) 377 ) 378 if self.additional_query_properties 379 else [ 380 [] 381 ] # A single empty property chunk represents the case where property chunking is not configured 382 ) 383 384 merged_records: MutableMapping[str, Any] = defaultdict(dict) 385 last_page_size = 0 386 last_record: Optional[Record] = None 387 response: Optional[requests.Response] = None 388 for properties in property_chunks: 389 if len(properties) > 0: 390 stream_slice = StreamSlice( 391 partition=stream_slice.partition or {}, 392 cursor_slice=stream_slice.cursor_slice or {}, 393 extra_fields={"query_properties": properties}, 394 ) 395 396 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 397 for current_record in records_generator_fn(response): 398 if ( 399 current_record 400 and self.additional_query_properties 401 and self.additional_query_properties.property_chunking 402 ): 403 merge_key = ( 404 self.additional_query_properties.property_chunking.get_merge_key( 405 current_record 406 ) 407 ) 408 if merge_key: 409 _deep_merge(merged_records[merge_key], current_record) 410 else: 411 # We should still emit records even if the record did not have a merge key 412 last_page_size += 1 413 last_record = current_record 414 yield current_record 415 else: 416 last_page_size += 1 417 last_record = current_record 418 yield current_record 419 420 if ( 421 self.additional_query_properties 422 and self.additional_query_properties.property_chunking 423 ): 424 for merged_record in merged_records.values(): 425 record = Record( 426 data=merged_record, stream_name=self.name, associated_slice=stream_slice 427 ) 428 last_page_size += 1 429 last_record = record 430 yield record 431 432 if not response: 433 pagination_complete = True 434 else: 435 last_page_token_value = ( 436 next_page_token.get("next_page_token") if next_page_token else None 437 ) 438 next_page_token = self._next_page_token( 439 response=response, 440 last_page_size=last_page_size, 441 last_record=last_record, 442 last_page_token_value=last_page_token_value, 443 ) 444 if not next_page_token: 445 pagination_complete = True 446 447 # Always return an empty generator just in case no records were ever yielded 448 yield from [] 449 450 def _read_single_page( 451 self, 452 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 453 stream_state: Mapping[str, Any], 454 stream_slice: StreamSlice, 455 ) -> Iterable[StreamData]: 456 initial_token = stream_state.get("next_page_token") 457 if initial_token is None: 458 initial_token = self._paginator.get_initial_token() 459 next_page_token: Optional[Mapping[str, Any]] = ( 460 {"next_page_token": initial_token} if initial_token else None 461 ) 462 463 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 464 465 last_page_size = 0 466 last_record: Optional[Record] = None 467 for record in records_generator_fn(response): 468 last_page_size += 1 469 last_record = record 470 yield record 471 472 if not response: 473 next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 474 else: 475 last_page_token_value = ( 476 next_page_token.get("next_page_token") if next_page_token else None 477 ) 478 next_page_token = self._next_page_token( 479 response=response, 480 last_page_size=last_page_size, 481 last_record=last_record, 482 last_page_token_value=last_page_token_value, 483 ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 484 485 if self.cursor: 486 self.cursor.close_slice( 487 StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition) 488 ) 489 490 # Always return an empty generator just in case no records were ever yielded 491 yield from [] 492 493 def read_records( 494 self, 495 records_schema: Mapping[str, Any], 496 stream_slice: Optional[StreamSlice] = None, 497 ) -> Iterable[StreamData]: 498 """ 499 Fetch a stream's records from an HTTP API source 500 501 :param records_schema: json schema to describe record 502 :param stream_slice: The stream slice to read data for 503 :return: The records read from the API source 504 """ 505 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 506 507 most_recent_record_from_slice = None 508 record_generator = partial( 509 self._parse_records, 510 stream_slice=stream_slice, 511 stream_state=self.state or {}, 512 records_schema=records_schema, 513 ) 514 515 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 516 stream_state = self.state 517 518 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 519 # fetch more records. The platform deletes stream state for full refresh streams before starting a 520 # new job, so we don't need to worry about this value existing for the initial attempt 521 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 522 return 523 524 yield from self._read_single_page(record_generator, stream_state, _slice) 525 else: 526 for stream_data in self._read_pages(record_generator, self.state, _slice): 527 current_record = self._extract_record(stream_data, _slice) 528 if self.cursor and current_record: 529 self.cursor.observe(_slice, current_record) 530 531 # Latest record read, not necessarily within slice boundaries. 532 # TODO Remove once all custom components implement `observe` method. 533 # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 534 most_recent_record_from_slice = self._get_most_recent_record( 535 most_recent_record_from_slice, current_record, _slice 536 ) 537 yield stream_data 538 539 if self.cursor: 540 self.cursor.close_slice(_slice, most_recent_record_from_slice) 541 return 542 543 def _get_most_recent_record( 544 self, 545 current_most_recent: Optional[Record], 546 current_record: Optional[Record], 547 stream_slice: StreamSlice, 548 ) -> Optional[Record]: 549 if self.cursor and current_record: 550 if not current_most_recent: 551 return current_record 552 else: 553 return ( 554 current_most_recent 555 if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) 556 else current_record 557 ) 558 else: 559 return None 560 561 def _extract_record( 562 self, stream_data: StreamData, stream_slice: StreamSlice 563 ) -> Optional[Record]: 564 """ 565 As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize 566 to data to streamline the rest of the process. 567 """ 568 if isinstance(stream_data, Record): 569 # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData` 570 return stream_data 571 elif isinstance(stream_data, (dict, Mapping)): 572 return Record( 573 data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name 574 ) 575 elif isinstance(stream_data, AirbyteMessage) and stream_data.record: 576 return Record( 577 data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data 578 associated_slice=stream_slice, 579 stream_name=self.name, 580 ) 581 return None 582 583 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 584 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 585 """ 586 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 587 588 :param sync_mode: 589 :param cursor_field: 590 :param stream_state: 591 :return: 592 """ 593 return self.stream_slicer.stream_slices() 594 595 @property 596 def state(self) -> Mapping[str, Any]: 597 return self.cursor.get_stream_state() if self.cursor else {} 598 599 @state.setter 600 def state(self, value: StreamState) -> None: 601 """State setter, accept state serialized by state getter.""" 602 if self.cursor: 603 self.cursor.set_initial_state(value) 604 605 def _parse_records( 606 self, 607 response: Optional[requests.Response], 608 stream_state: Mapping[str, Any], 609 records_schema: Mapping[str, Any], 610 stream_slice: Optional[StreamSlice], 611 ) -> Iterable[Record]: 612 yield from self._parse_response( 613 response, 614 stream_slice=stream_slice, 615 stream_state=stream_state, 616 records_schema=records_schema, 617 ) 618 619 def must_deduplicate_query_params(self) -> bool: 620 return True 621 622 @staticmethod 623 def _to_partition_key(to_serialize: Any) -> str: 624 # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value 625 return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.
Attributes:
- stream_name (str): The stream's name
- stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
- requester (Requester): The HTTP requester
- record_selector (HttpSelector): The record selector
- paginator (Optional[Paginator]): The paginator
- stream_slicer (Optional[StreamSlicer]): The stream slicer
- cursor (Optional[cursor]): The cursor
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
107 @property # type: ignore 108 def name(self) -> str: 109 """ 110 :return: Stream name 111 """ 112 return ( 113 str(self._name.eval(self.config)) 114 if isinstance(self._name, InterpolatedString) 115 else self._name 116 )
Returns
Stream name
291 @property # type: ignore 292 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 293 """The stream's primary key""" 294 return self._primary_key
The stream's primary key
493 def read_records( 494 self, 495 records_schema: Mapping[str, Any], 496 stream_slice: Optional[StreamSlice] = None, 497 ) -> Iterable[StreamData]: 498 """ 499 Fetch a stream's records from an HTTP API source 500 501 :param records_schema: json schema to describe record 502 :param stream_slice: The stream slice to read data for 503 :return: The records read from the API source 504 """ 505 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 506 507 most_recent_record_from_slice = None 508 record_generator = partial( 509 self._parse_records, 510 stream_slice=stream_slice, 511 stream_state=self.state or {}, 512 records_schema=records_schema, 513 ) 514 515 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 516 stream_state = self.state 517 518 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 519 # fetch more records. The platform deletes stream state for full refresh streams before starting a 520 # new job, so we don't need to worry about this value existing for the initial attempt 521 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 522 return 523 524 yield from self._read_single_page(record_generator, stream_state, _slice) 525 else: 526 for stream_data in self._read_pages(record_generator, self.state, _slice): 527 current_record = self._extract_record(stream_data, _slice) 528 if self.cursor and current_record: 529 self.cursor.observe(_slice, current_record) 530 531 # Latest record read, not necessarily within slice boundaries. 532 # TODO Remove once all custom components implement `observe` method. 533 # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 534 most_recent_record_from_slice = self._get_most_recent_record( 535 most_recent_record_from_slice, current_record, _slice 536 ) 537 yield stream_data 538 539 if self.cursor: 540 self.cursor.close_slice(_slice, most_recent_record_from_slice) 541 return
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
584 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 585 """ 586 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 587 588 :param sync_mode: 589 :param cursor_field: 590 :param stream_state: 591 :return: 592 """ 593 return self.stream_slicer.stream_slices()
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
595 @property 596 def state(self) -> Mapping[str, Any]: 597 return self.cursor.get_stream_state() if self.cursor else {}
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
648@dataclass 649class SimpleRetrieverTestReadDecorator(SimpleRetriever): 650 """ 651 In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of 652 slices that are queried throughout a read command. 653 """ 654 655 maximum_number_of_slices: int = 5 656 657 def __post_init__(self, options: Mapping[str, Any]) -> None: 658 super().__post_init__(options) 659 if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: 660 raise ValueError( 661 f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" 662 ) 663 664 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 665 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 666 return islice(super().stream_slices(), self.maximum_number_of_slices) 667 668 def _fetch_next_page( 669 self, 670 stream_state: Mapping[str, Any], 671 stream_slice: StreamSlice, 672 next_page_token: Optional[Mapping[str, Any]] = None, 673 ) -> Optional[requests.Response]: 674 return self.requester.send_request( 675 path=self._paginator_path( 676 next_page_token=next_page_token, 677 stream_state=stream_state, 678 stream_slice=stream_slice, 679 ), 680 stream_state=stream_state, 681 stream_slice=stream_slice, 682 next_page_token=next_page_token, 683 request_headers=self._request_headers( 684 stream_state=stream_state, 685 stream_slice=stream_slice, 686 next_page_token=next_page_token, 687 ), 688 request_params=self._request_params( 689 stream_state=stream_state, 690 stream_slice=stream_slice, 691 next_page_token=next_page_token, 692 ), 693 request_body_data=self._request_body_data( 694 stream_state=stream_state, 695 stream_slice=stream_slice, 696 next_page_token=next_page_token, 697 ), 698 request_body_json=self._request_body_json( 699 stream_state=stream_state, 700 stream_slice=stream_slice, 701 next_page_token=next_page_token, 702 ), 703 log_formatter=lambda response: format_http_message( 704 response, 705 f"Stream '{self.name}' request", 706 f"Request performed in order to extract records for stream '{self.name}'", 707 self.name, 708 ), 709 )
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command.
665 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 666 return islice(super().stream_slices(), self.maximum_number_of_slices)
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
19@dataclass 20class AsyncRetriever(Retriever): 21 config: Config 22 parameters: InitVar[Mapping[str, Any]] 23 record_selector: RecordSelector 24 stream_slicer: AsyncJobPartitionRouter 25 slice_logger: AlwaysLogSliceLogger = field( 26 init=False, 27 default_factory=lambda: AlwaysLogSliceLogger(), 28 ) 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._parameters = parameters 32 33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore 51 52 @exit_on_rate_limit.setter 53 def exit_on_rate_limit(self, value: bool) -> None: 54 """ 55 Sets the `exit_on_rate_limit` property of the job repository > creation_requester, 56 meaning that the Job cannot be placed / created if the rate limit is reached. 57 Thus no further work on managing jobs is expected to be done. 58 """ 59 job_orchestrator = self.stream_slicer._job_orchestrator 60 if job_orchestrator is not None: 61 job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] 62 63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {} 69 70 @state.setter 71 def state(self, value: StreamState) -> None: 72 """ 73 As a first iteration for sendgrid, there is no state to be managed 74 """ 75 pass 76 77 def _get_stream_state(self) -> StreamState: 78 """ 79 Gets the current state of the stream. 80 81 Returns: 82 StreamState: Mapping[str, Any] 83 """ 84 85 return self.state 86 87 def _validate_and_get_stream_slice_jobs( 88 self, stream_slice: Optional[StreamSlice] = None 89 ) -> Iterable[AsyncJob]: 90 """ 91 Validates the stream_slice argument and returns the partition from it. 92 93 Args: 94 stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. 95 96 Returns: 97 AsyncPartition: The partition extracted from the stream_slice. 98 99 Raises: 100 AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. 101 102 """ 103 return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] 104 105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices() 107 108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.
Note:
- If the
creation_requester
cannot place / create the job - it might be the case of the RateLimits- If the
creation_requester
can place / create the job - it means all other requesters should successfully manage to complete the results.
63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {}
As a first iteration for sendgrid, there is no state to be managed
105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices()
Returns the stream slices
108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
712@deprecated( 713 "This class is experimental. Use at your own risk.", 714 category=ExperimentalClassWarning, 715) 716@dataclass 717class LazySimpleRetriever(SimpleRetriever): 718 """ 719 A retriever that supports lazy loading from parent streams. 720 """ 721 722 def _read_pages( 723 self, 724 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 725 stream_state: Mapping[str, Any], 726 stream_slice: StreamSlice, 727 ) -> Iterable[Record]: 728 response = stream_slice.extra_fields["child_response"] 729 if response: 730 last_page_size, last_record = 0, None 731 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 732 last_page_size += 1 733 last_record = record 734 yield record 735 736 next_page_token = self._next_page_token(response, last_page_size, last_record, None) 737 if next_page_token: 738 yield from self._paginate( 739 next_page_token, 740 records_generator_fn, 741 stream_state, 742 stream_slice, 743 ) 744 745 yield from [] 746 else: 747 yield from self._read_pages(records_generator_fn, stream_state, stream_slice) 748 749 def _paginate( 750 self, 751 next_page_token: Any, 752 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 753 stream_state: Mapping[str, Any], 754 stream_slice: StreamSlice, 755 ) -> Iterable[Record]: 756 """Handle pagination by fetching subsequent pages.""" 757 pagination_complete = False 758 759 while not pagination_complete: 760 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 761 last_page_size, last_record = 0, None 762 763 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 764 last_page_size += 1 765 last_record = record 766 yield record 767 768 if not response: 769 pagination_complete = True 770 else: 771 last_page_token_value = ( 772 next_page_token.get("next_page_token") if next_page_token else None 773 ) 774 next_page_token = self._next_page_token( 775 response, last_page_size, last_record, last_page_token_value 776 ) 777 778 if not next_page_token: 779 pagination_complete = True
A retriever that supports lazy loading from parent streams.