airbyte_cdk.sources.declarative.retrievers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( 8 LazySimpleRetriever, 9 SimpleRetriever, 10 SimpleRetrieverTestReadDecorator, 11) 12 13__all__ = [ 14 "Retriever", 15 "SimpleRetriever", 16 "SimpleRetrieverTestReadDecorator", 17 "AsyncRetriever", 18 "LazySimpleRetriever", 19]
14class Retriever: 15 """ 16 Responsible for fetching a stream's records from an HTTP API source. 17 """ 18 19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """ 32 33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices""" 36 37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """ 51 52 @state.setter 53 @abstractmethod 54 def state(self, value: StreamState) -> None: 55 """State setter, accept state serialized by state getter."""
Responsible for fetching a stream's records from an HTTP API source.
19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices"""
Returns the stream slices
37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
54@dataclass 55class SimpleRetriever(Retriever): 56 """ 57 Retrieves records by synchronously sending requests to fetch records. 58 59 The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. 60 61 For each stream slice, submit requests until there are no more pages of records to fetch. 62 63 This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. 64 As a result, some of the parameters passed to some methods are unused. 65 The two will be decoupled in a future release. 66 67 Attributes: 68 stream_name (str): The stream's name 69 stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key 70 requester (Requester): The HTTP requester 71 record_selector (HttpSelector): The record selector 72 paginator (Optional[Paginator]): The paginator 73 stream_slicer (Optional[StreamSlicer]): The stream slicer 74 cursor (Optional[cursor]): The cursor 75 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 76 """ 77 78 requester: Requester 79 record_selector: HttpSelector 80 config: Config 81 parameters: InitVar[Mapping[str, Any]] 82 name: str 83 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 84 primary_key: Optional[Union[str, List[str], List[List[str]]]] 85 _primary_key: str = field(init=False, repr=False, default="") 86 paginator: Optional[Paginator] = None 87 stream_slicer: StreamSlicer = field( 88 default_factory=lambda: SinglePartitionRouter(parameters={}) 89 ) 90 request_option_provider: RequestOptionsProvider = field( 91 default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) 92 ) 93 cursor: Optional[DeclarativeCursor] = None 94 ignore_stream_slicer_parameters_on_paginated_requests: bool = False 95 additional_query_properties: Optional[QueryProperties] = None 96 log_formatter: Optional[Callable[[requests.Response], Any]] = None 97 98 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 99 self._paginator = self.paginator or NoPagination(parameters=parameters) 100 self._parameters = parameters 101 self._name = ( 102 InterpolatedString(self._name, parameters=parameters) 103 if isinstance(self._name, str) 104 else self._name 105 ) 106 107 @property # type: ignore 108 def name(self) -> str: 109 """ 110 :return: Stream name 111 """ 112 return ( 113 str(self._name.eval(self.config)) 114 if isinstance(self._name, InterpolatedString) 115 else self._name 116 ) 117 118 @name.setter 119 def name(self, value: str) -> None: 120 if not isinstance(value, property): 121 self._name = value 122 123 def _get_mapping( 124 self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any 125 ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]: 126 """ 127 Get mapping from the provided method, and get the keys of the mapping. 128 If the method returns a string, it will return the string and an empty set. 129 If the method returns a dict, it will return the dict and its keys. 130 """ 131 mapping = method(**kwargs) or {} 132 keys = set(mapping.keys()) if not isinstance(mapping, str) else set() 133 return mapping, keys 134 135 def _get_request_options( 136 self, 137 stream_state: Optional[StreamData], 138 stream_slice: Optional[StreamSlice], 139 next_page_token: Optional[Mapping[str, Any]], 140 paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 141 stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 142 ) -> Union[Mapping[str, Any], str]: 143 """ 144 Get the request_option from the paginator and the stream slicer. 145 Raise a ValueError if there's a key collision 146 Returned merged mapping otherwise 147 """ 148 # FIXME we should eventually remove the usage of stream_state as part of the interpolation 149 150 is_body_json = paginator_method.__name__ == "get_request_body_json" 151 152 mappings = [ 153 paginator_method( 154 stream_slice=stream_slice, 155 next_page_token=next_page_token, 156 ), 157 ] 158 if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: 159 mappings.append( 160 stream_slicer_method( 161 stream_slice=stream_slice, 162 next_page_token=next_page_token, 163 ) 164 ) 165 return combine_mappings(mappings, allow_same_value_merge=is_body_json) 166 167 def _request_headers( 168 self, 169 stream_state: Optional[StreamData] = None, 170 stream_slice: Optional[StreamSlice] = None, 171 next_page_token: Optional[Mapping[str, Any]] = None, 172 ) -> Mapping[str, Any]: 173 """ 174 Specifies request headers. 175 Authentication headers will overwrite any overlapping headers returned from this method. 176 """ 177 headers = self._get_request_options( 178 stream_state, 179 stream_slice, 180 next_page_token, 181 self._paginator.get_request_headers, 182 self.request_option_provider.get_request_headers, 183 ) 184 if isinstance(headers, str): 185 raise ValueError("Request headers cannot be a string") 186 return {str(k): str(v) for k, v in headers.items()} 187 188 def _request_params( 189 self, 190 stream_state: Optional[StreamData] = None, 191 stream_slice: Optional[StreamSlice] = None, 192 next_page_token: Optional[Mapping[str, Any]] = None, 193 ) -> Mapping[str, Any]: 194 """ 195 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 196 197 E.g: you might want to define query parameters for paging if next_page_token is not None. 198 """ 199 params = self._get_request_options( 200 stream_state, 201 stream_slice, 202 next_page_token, 203 self._paginator.get_request_params, 204 self.request_option_provider.get_request_params, 205 ) 206 if isinstance(params, str): 207 raise ValueError("Request params cannot be a string") 208 return params 209 210 def _request_body_data( 211 self, 212 stream_state: Optional[StreamData] = None, 213 stream_slice: Optional[StreamSlice] = None, 214 next_page_token: Optional[Mapping[str, Any]] = None, 215 ) -> Union[Mapping[str, Any], str]: 216 """ 217 Specifies how to populate the body of the request with a non-JSON payload. 218 219 If returns a ready text that it will be sent as is. 220 If returns a dict that it will be converted to a urlencoded form. 221 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 222 223 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 224 """ 225 return self._get_request_options( 226 stream_state, 227 stream_slice, 228 next_page_token, 229 self._paginator.get_request_body_data, 230 self.request_option_provider.get_request_body_data, 231 ) 232 233 def _request_body_json( 234 self, 235 stream_state: Optional[StreamData] = None, 236 stream_slice: Optional[StreamSlice] = None, 237 next_page_token: Optional[Mapping[str, Any]] = None, 238 ) -> Optional[Mapping[str, Any]]: 239 """ 240 Specifies how to populate the body of the request with a JSON payload. 241 242 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 243 """ 244 body_json = self._get_request_options( 245 stream_state, 246 stream_slice, 247 next_page_token, 248 self._paginator.get_request_body_json, 249 self.request_option_provider.get_request_body_json, 250 ) 251 if isinstance(body_json, str): 252 raise ValueError("Request body json cannot be a string") 253 return body_json 254 255 def _paginator_path( 256 self, 257 next_page_token: Optional[Mapping[str, Any]] = None, 258 stream_state: Optional[Mapping[str, Any]] = None, 259 stream_slice: Optional[StreamSlice] = None, 260 ) -> Optional[str]: 261 """ 262 If the paginator points to a path, follow it, else return nothing so the requester is used. 263 :param next_page_token: 264 :return: 265 """ 266 return self._paginator.path( 267 next_page_token=next_page_token, 268 stream_state=stream_state, 269 stream_slice=stream_slice, 270 ) 271 272 def _parse_response( 273 self, 274 response: Optional[requests.Response], 275 stream_state: StreamState, 276 records_schema: Mapping[str, Any], 277 stream_slice: Optional[StreamSlice] = None, 278 next_page_token: Optional[Mapping[str, Any]] = None, 279 ) -> Iterable[Record]: 280 if not response: 281 yield from [] 282 else: 283 yield from self.record_selector.select_records( 284 response=response, 285 stream_state=stream_state, 286 records_schema=records_schema, 287 stream_slice=stream_slice, 288 next_page_token=next_page_token, 289 ) 290 291 @property # type: ignore 292 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 293 """The stream's primary key""" 294 return self._primary_key 295 296 @primary_key.setter 297 def primary_key(self, value: str) -> None: 298 if not isinstance(value, property): 299 self._primary_key = value 300 301 def _next_page_token( 302 self, 303 response: requests.Response, 304 last_page_size: int, 305 last_record: Optional[Record], 306 last_page_token_value: Optional[Any], 307 ) -> Optional[Mapping[str, Any]]: 308 """ 309 Specifies a pagination strategy. 310 311 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 312 313 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 314 """ 315 return self._paginator.next_page_token( 316 response=response, 317 last_page_size=last_page_size, 318 last_record=last_record, 319 last_page_token_value=last_page_token_value, 320 ) 321 322 def _fetch_next_page( 323 self, 324 stream_state: Mapping[str, Any], 325 stream_slice: StreamSlice, 326 next_page_token: Optional[Mapping[str, Any]] = None, 327 ) -> Optional[requests.Response]: 328 return self.requester.send_request( 329 path=self._paginator_path( 330 next_page_token=next_page_token, 331 stream_state=stream_state, 332 stream_slice=stream_slice, 333 ), 334 stream_state=stream_state, 335 stream_slice=stream_slice, 336 next_page_token=next_page_token, 337 request_headers=self._request_headers( 338 stream_state=stream_state, 339 stream_slice=stream_slice, 340 next_page_token=next_page_token, 341 ), 342 request_params=self._request_params( 343 stream_state=stream_state, 344 stream_slice=stream_slice, 345 next_page_token=next_page_token, 346 ), 347 request_body_data=self._request_body_data( 348 stream_state=stream_state, 349 stream_slice=stream_slice, 350 next_page_token=next_page_token, 351 ), 352 request_body_json=self._request_body_json( 353 stream_state=stream_state, 354 stream_slice=stream_slice, 355 next_page_token=next_page_token, 356 ), 357 log_formatter=self.log_formatter, 358 ) 359 360 # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. 361 def _read_pages( 362 self, 363 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 364 stream_state: Mapping[str, Any], 365 stream_slice: StreamSlice, 366 ) -> Iterable[Record]: 367 pagination_complete = False 368 initial_token = self._paginator.get_initial_token() 369 next_page_token: Optional[Mapping[str, Any]] = ( 370 {"next_page_token": initial_token} if initial_token is not None else None 371 ) 372 while not pagination_complete: 373 property_chunks: List[List[str]] = ( 374 list( 375 self.additional_query_properties.get_request_property_chunks( 376 stream_slice=stream_slice 377 ) 378 ) 379 if self.additional_query_properties 380 else [ 381 [] 382 ] # A single empty property chunk represents the case where property chunking is not configured 383 ) 384 385 merged_records: MutableMapping[str, Any] = defaultdict(dict) 386 last_page_size = 0 387 last_record: Optional[Record] = None 388 response: Optional[requests.Response] = None 389 for properties in property_chunks: 390 if len(properties) > 0: 391 stream_slice = StreamSlice( 392 partition=stream_slice.partition or {}, 393 cursor_slice=stream_slice.cursor_slice or {}, 394 extra_fields={"query_properties": properties}, 395 ) 396 397 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 398 for current_record in records_generator_fn(response): 399 if ( 400 current_record 401 and self.additional_query_properties 402 and self.additional_query_properties.property_chunking 403 ): 404 merge_key = ( 405 self.additional_query_properties.property_chunking.get_merge_key( 406 current_record 407 ) 408 ) 409 if merge_key: 410 _deep_merge(merged_records[merge_key], current_record) 411 else: 412 # We should still emit records even if the record did not have a merge key 413 last_page_size += 1 414 last_record = current_record 415 yield current_record 416 else: 417 last_page_size += 1 418 last_record = current_record 419 yield current_record 420 421 if ( 422 self.additional_query_properties 423 and self.additional_query_properties.property_chunking 424 ): 425 for merged_record in merged_records.values(): 426 record = Record( 427 data=merged_record, stream_name=self.name, associated_slice=stream_slice 428 ) 429 last_page_size += 1 430 last_record = record 431 yield record 432 433 if not response: 434 pagination_complete = True 435 else: 436 last_page_token_value = ( 437 next_page_token.get("next_page_token") if next_page_token else None 438 ) 439 next_page_token = self._next_page_token( 440 response=response, 441 last_page_size=last_page_size, 442 last_record=last_record, 443 last_page_token_value=last_page_token_value, 444 ) 445 if not next_page_token: 446 pagination_complete = True 447 448 # Always return an empty generator just in case no records were ever yielded 449 yield from [] 450 451 def _read_single_page( 452 self, 453 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 454 stream_state: Mapping[str, Any], 455 stream_slice: StreamSlice, 456 ) -> Iterable[StreamData]: 457 initial_token = stream_state.get("next_page_token") 458 if initial_token is None: 459 initial_token = self._paginator.get_initial_token() 460 next_page_token: Optional[Mapping[str, Any]] = ( 461 {"next_page_token": initial_token} if initial_token else None 462 ) 463 464 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 465 466 last_page_size = 0 467 last_record: Optional[Record] = None 468 for record in records_generator_fn(response): 469 last_page_size += 1 470 last_record = record 471 yield record 472 473 if not response: 474 next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 475 else: 476 last_page_token_value = ( 477 next_page_token.get("next_page_token") if next_page_token else None 478 ) 479 next_page_token = self._next_page_token( 480 response=response, 481 last_page_size=last_page_size, 482 last_record=last_record, 483 last_page_token_value=last_page_token_value, 484 ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 485 486 if self.cursor: 487 self.cursor.close_slice( 488 StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition) 489 ) 490 491 # Always return an empty generator just in case no records were ever yielded 492 yield from [] 493 494 def read_records( 495 self, 496 records_schema: Mapping[str, Any], 497 stream_slice: Optional[StreamSlice] = None, 498 ) -> Iterable[StreamData]: 499 """ 500 Fetch a stream's records from an HTTP API source 501 502 :param records_schema: json schema to describe record 503 :param stream_slice: The stream slice to read data for 504 :return: The records read from the API source 505 """ 506 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 507 508 most_recent_record_from_slice = None 509 record_generator = partial( 510 self._parse_records, 511 stream_slice=stream_slice, 512 stream_state=self.state or {}, 513 records_schema=records_schema, 514 ) 515 516 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 517 stream_state = self.state 518 519 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 520 # fetch more records. The platform deletes stream state for full refresh streams before starting a 521 # new job, so we don't need to worry about this value existing for the initial attempt 522 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 523 return 524 525 yield from self._read_single_page(record_generator, stream_state, _slice) 526 else: 527 for stream_data in self._read_pages(record_generator, self.state, _slice): 528 current_record = self._extract_record(stream_data, _slice) 529 if self.cursor and current_record: 530 self.cursor.observe(_slice, current_record) 531 532 # Latest record read, not necessarily within slice boundaries. 533 # TODO Remove once all custom components implement `observe` method. 534 # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 535 most_recent_record_from_slice = self._get_most_recent_record( 536 most_recent_record_from_slice, current_record, _slice 537 ) 538 yield stream_data 539 540 if self.cursor: 541 self.cursor.close_slice(_slice, most_recent_record_from_slice) 542 return 543 544 def _get_most_recent_record( 545 self, 546 current_most_recent: Optional[Record], 547 current_record: Optional[Record], 548 stream_slice: StreamSlice, 549 ) -> Optional[Record]: 550 if self.cursor and current_record: 551 if not current_most_recent: 552 return current_record 553 else: 554 return ( 555 current_most_recent 556 if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) 557 else current_record 558 ) 559 else: 560 return None 561 562 def _extract_record( 563 self, stream_data: StreamData, stream_slice: StreamSlice 564 ) -> Optional[Record]: 565 """ 566 As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize 567 to data to streamline the rest of the process. 568 """ 569 if isinstance(stream_data, Record): 570 # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData` 571 return stream_data 572 elif isinstance(stream_data, (dict, Mapping)): 573 return Record( 574 data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name 575 ) 576 elif isinstance(stream_data, AirbyteMessage) and stream_data.record: 577 return Record( 578 data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data 579 associated_slice=stream_slice, 580 stream_name=self.name, 581 ) 582 return None 583 584 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 585 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 586 """ 587 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 588 589 :param sync_mode: 590 :param cursor_field: 591 :param stream_state: 592 :return: 593 """ 594 return self.stream_slicer.stream_slices() 595 596 @property 597 def state(self) -> Mapping[str, Any]: 598 return self.cursor.get_stream_state() if self.cursor else {} 599 600 @state.setter 601 def state(self, value: StreamState) -> None: 602 """State setter, accept state serialized by state getter.""" 603 if self.cursor: 604 self.cursor.set_initial_state(value) 605 606 def _parse_records( 607 self, 608 response: Optional[requests.Response], 609 stream_state: Mapping[str, Any], 610 records_schema: Mapping[str, Any], 611 stream_slice: Optional[StreamSlice], 612 ) -> Iterable[Record]: 613 yield from self._parse_response( 614 response, 615 stream_slice=stream_slice, 616 stream_state=stream_state, 617 records_schema=records_schema, 618 ) 619 620 def must_deduplicate_query_params(self) -> bool: 621 return True 622 623 @staticmethod 624 def _to_partition_key(to_serialize: Any) -> str: 625 # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value 626 return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.
Attributes:
- stream_name (str): The stream's name
- stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
- requester (Requester): The HTTP requester
- record_selector (HttpSelector): The record selector
- paginator (Optional[Paginator]): The paginator
- stream_slicer (Optional[StreamSlicer]): The stream slicer
- cursor (Optional[cursor]): The cursor
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
107 @property # type: ignore 108 def name(self) -> str: 109 """ 110 :return: Stream name 111 """ 112 return ( 113 str(self._name.eval(self.config)) 114 if isinstance(self._name, InterpolatedString) 115 else self._name 116 )
Returns
Stream name
291 @property # type: ignore 292 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 293 """The stream's primary key""" 294 return self._primary_key
The stream's primary key
494 def read_records( 495 self, 496 records_schema: Mapping[str, Any], 497 stream_slice: Optional[StreamSlice] = None, 498 ) -> Iterable[StreamData]: 499 """ 500 Fetch a stream's records from an HTTP API source 501 502 :param records_schema: json schema to describe record 503 :param stream_slice: The stream slice to read data for 504 :return: The records read from the API source 505 """ 506 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 507 508 most_recent_record_from_slice = None 509 record_generator = partial( 510 self._parse_records, 511 stream_slice=stream_slice, 512 stream_state=self.state or {}, 513 records_schema=records_schema, 514 ) 515 516 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 517 stream_state = self.state 518 519 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 520 # fetch more records. The platform deletes stream state for full refresh streams before starting a 521 # new job, so we don't need to worry about this value existing for the initial attempt 522 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 523 return 524 525 yield from self._read_single_page(record_generator, stream_state, _slice) 526 else: 527 for stream_data in self._read_pages(record_generator, self.state, _slice): 528 current_record = self._extract_record(stream_data, _slice) 529 if self.cursor and current_record: 530 self.cursor.observe(_slice, current_record) 531 532 # Latest record read, not necessarily within slice boundaries. 533 # TODO Remove once all custom components implement `observe` method. 534 # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 535 most_recent_record_from_slice = self._get_most_recent_record( 536 most_recent_record_from_slice, current_record, _slice 537 ) 538 yield stream_data 539 540 if self.cursor: 541 self.cursor.close_slice(_slice, most_recent_record_from_slice) 542 return
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
585 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 586 """ 587 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 588 589 :param sync_mode: 590 :param cursor_field: 591 :param stream_state: 592 :return: 593 """ 594 return self.stream_slicer.stream_slices()
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
596 @property 597 def state(self) -> Mapping[str, Any]: 598 return self.cursor.get_stream_state() if self.cursor else {}
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
649@dataclass 650class SimpleRetrieverTestReadDecorator(SimpleRetriever): 651 """ 652 In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of 653 slices that are queried throughout a read command. 654 """ 655 656 maximum_number_of_slices: int = 5 657 658 def __post_init__(self, options: Mapping[str, Any]) -> None: 659 super().__post_init__(options) 660 self.log_formatter = ( 661 ( 662 lambda response: format_http_message( 663 response, 664 f"Stream '{self.name}' request", 665 f"Request performed in order to extract records for stream '{self.name}'", 666 self.name, 667 ) 668 ) 669 if not self.log_formatter 670 else self.log_formatter 671 ) 672 673 if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: 674 raise ValueError( 675 f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" 676 ) 677 678 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 679 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 680 return islice(super().stream_slices(), self.maximum_number_of_slices)
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command.
679 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 680 return islice(super().stream_slices(), self.maximum_number_of_slices)
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
19@dataclass 20class AsyncRetriever(Retriever): 21 config: Config 22 parameters: InitVar[Mapping[str, Any]] 23 record_selector: RecordSelector 24 stream_slicer: AsyncJobPartitionRouter 25 slice_logger: AlwaysLogSliceLogger = field( 26 init=False, 27 default_factory=lambda: AlwaysLogSliceLogger(), 28 ) 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._parameters = parameters 32 33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore 51 52 @exit_on_rate_limit.setter 53 def exit_on_rate_limit(self, value: bool) -> None: 54 """ 55 Sets the `exit_on_rate_limit` property of the job repository > creation_requester, 56 meaning that the Job cannot be placed / created if the rate limit is reached. 57 Thus no further work on managing jobs is expected to be done. 58 """ 59 job_orchestrator = self.stream_slicer._job_orchestrator 60 if job_orchestrator is not None: 61 job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] 62 63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {} 69 70 @state.setter 71 def state(self, value: StreamState) -> None: 72 """ 73 As a first iteration for sendgrid, there is no state to be managed 74 """ 75 pass 76 77 def _get_stream_state(self) -> StreamState: 78 """ 79 Gets the current state of the stream. 80 81 Returns: 82 StreamState: Mapping[str, Any] 83 """ 84 85 return self.state 86 87 def _validate_and_get_stream_slice_jobs( 88 self, stream_slice: Optional[StreamSlice] = None 89 ) -> Iterable[AsyncJob]: 90 """ 91 Validates the stream_slice argument and returns the partition from it. 92 93 Args: 94 stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. 95 96 Returns: 97 AsyncPartition: The partition extracted from the stream_slice. 98 99 Raises: 100 AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. 101 102 """ 103 return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] 104 105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices() 107 108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.
Note:
- If the
creation_requester
cannot place / create the job - it might be the case of the RateLimits- If the
creation_requester
can place / create the job - it means all other requesters should successfully manage to complete the results.
63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {}
As a first iteration for sendgrid, there is no state to be managed
105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices()
Returns the stream slices
108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
683@deprecated( 684 "This class is experimental. Use at your own risk.", 685 category=ExperimentalClassWarning, 686) 687@dataclass 688class LazySimpleRetriever(SimpleRetriever): 689 """ 690 A retriever that supports lazy loading from parent streams. 691 """ 692 693 def _read_pages( 694 self, 695 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 696 stream_state: Mapping[str, Any], 697 stream_slice: StreamSlice, 698 ) -> Iterable[Record]: 699 response = stream_slice.extra_fields["child_response"] 700 if response: 701 last_page_size, last_record = 0, None 702 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 703 last_page_size += 1 704 last_record = record 705 yield record 706 707 next_page_token = self._next_page_token(response, last_page_size, last_record, None) 708 if next_page_token: 709 yield from self._paginate( 710 next_page_token, 711 records_generator_fn, 712 stream_state, 713 stream_slice, 714 ) 715 716 yield from [] 717 else: 718 yield from self._read_pages(records_generator_fn, stream_state, stream_slice) 719 720 def _paginate( 721 self, 722 next_page_token: Any, 723 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 724 stream_state: Mapping[str, Any], 725 stream_slice: StreamSlice, 726 ) -> Iterable[Record]: 727 """Handle pagination by fetching subsequent pages.""" 728 pagination_complete = False 729 730 while not pagination_complete: 731 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 732 last_page_size, last_record = 0, None 733 734 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 735 last_page_size += 1 736 last_record = record 737 yield record 738 739 if not response: 740 pagination_complete = True 741 else: 742 last_page_token_value = ( 743 next_page_token.get("next_page_token") if next_page_token else None 744 ) 745 next_page_token = self._next_page_token( 746 response, last_page_size, last_record, last_page_token_value 747 ) 748 749 if not next_page_token: 750 pagination_complete = True
A retriever that supports lazy loading from parent streams.