airbyte_cdk.sources.declarative.retrievers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( 8 LazySimpleRetriever, 9 SimpleRetriever, 10) 11 12__all__ = [ 13 "Retriever", 14 "SimpleRetriever", 15 "AsyncRetriever", 16 "LazySimpleRetriever", 17]
14class Retriever: 15 """ 16 Responsible for fetching a stream's records from an HTTP API source. 17 """ 18 19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """ 32 33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices""" 36 37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """ 51 52 @state.setter 53 @abstractmethod 54 def state(self, value: StreamState) -> None: 55 """State setter, accept state serialized by state getter."""
Responsible for fetching a stream's records from an HTTP API source.
19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices"""
Returns the stream slices
37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
53@dataclass 54class SimpleRetriever(Retriever): 55 """ 56 Retrieves records by synchronously sending requests to fetch records. 57 58 The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. 59 60 For each stream slice, submit requests until there are no more pages of records to fetch. 61 62 This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. 63 As a result, some of the parameters passed to some methods are unused. 64 The two will be decoupled in a future release. 65 66 Attributes: 67 stream_name (str): The stream's name 68 stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key 69 requester (Requester): The HTTP requester 70 record_selector (HttpSelector): The record selector 71 paginator (Optional[Paginator]): The paginator 72 stream_slicer (Optional[StreamSlicer]): The stream slicer 73 cursor (Optional[cursor]): The cursor 74 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 75 """ 76 77 requester: Requester 78 record_selector: HttpSelector 79 config: Config 80 parameters: InitVar[Mapping[str, Any]] 81 name: str 82 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 83 primary_key: Optional[Union[str, List[str], List[List[str]]]] 84 _primary_key: str = field(init=False, repr=False, default="") 85 paginator: Optional[Paginator] = None 86 stream_slicer: StreamSlicer = field( 87 default_factory=lambda: SinglePartitionRouter(parameters={}) 88 ) 89 request_option_provider: RequestOptionsProvider = field( 90 default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) 91 ) 92 cursor: Optional[DeclarativeCursor] = None 93 ignore_stream_slicer_parameters_on_paginated_requests: bool = False 94 additional_query_properties: Optional[QueryProperties] = None 95 log_formatter: Optional[Callable[[requests.Response], Any]] = None 96 97 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 98 self._paginator = self.paginator or NoPagination(parameters=parameters) 99 self._parameters = parameters 100 self._name = ( 101 InterpolatedString(self._name, parameters=parameters) 102 if isinstance(self._name, str) 103 else self._name 104 ) 105 106 @property # type: ignore 107 def name(self) -> str: 108 """ 109 :return: Stream name 110 """ 111 return ( 112 str(self._name.eval(self.config)) 113 if isinstance(self._name, InterpolatedString) 114 else self._name 115 ) 116 117 @name.setter 118 def name(self, value: str) -> None: 119 if not isinstance(value, property): 120 self._name = value 121 122 def _get_mapping( 123 self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any 124 ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]: 125 """ 126 Get mapping from the provided method, and get the keys of the mapping. 127 If the method returns a string, it will return the string and an empty set. 128 If the method returns a dict, it will return the dict and its keys. 129 """ 130 mapping = method(**kwargs) or {} 131 keys = set(mapping.keys()) if not isinstance(mapping, str) else set() 132 return mapping, keys 133 134 def _get_request_options( 135 self, 136 stream_state: Optional[StreamData], 137 stream_slice: Optional[StreamSlice], 138 next_page_token: Optional[Mapping[str, Any]], 139 paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 140 stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 141 ) -> Union[Mapping[str, Any], str]: 142 """ 143 Get the request_option from the paginator and the stream slicer. 144 Raise a ValueError if there's a key collision 145 Returned merged mapping otherwise 146 """ 147 # FIXME we should eventually remove the usage of stream_state as part of the interpolation 148 149 is_body_json = paginator_method.__name__ == "get_request_body_json" 150 151 mappings = [ 152 paginator_method( 153 stream_slice=stream_slice, 154 next_page_token=next_page_token, 155 ), 156 ] 157 if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: 158 mappings.append( 159 stream_slicer_method( 160 stream_slice=stream_slice, 161 next_page_token=next_page_token, 162 ) 163 ) 164 return combine_mappings(mappings, allow_same_value_merge=is_body_json) 165 166 def _request_headers( 167 self, 168 stream_state: Optional[StreamData] = None, 169 stream_slice: Optional[StreamSlice] = None, 170 next_page_token: Optional[Mapping[str, Any]] = None, 171 ) -> Mapping[str, Any]: 172 """ 173 Specifies request headers. 174 Authentication headers will overwrite any overlapping headers returned from this method. 175 """ 176 headers = self._get_request_options( 177 stream_state, 178 stream_slice, 179 next_page_token, 180 self._paginator.get_request_headers, 181 self.request_option_provider.get_request_headers, 182 ) 183 if isinstance(headers, str): 184 raise ValueError("Request headers cannot be a string") 185 return {str(k): str(v) for k, v in headers.items()} 186 187 def _request_params( 188 self, 189 stream_state: Optional[StreamData] = None, 190 stream_slice: Optional[StreamSlice] = None, 191 next_page_token: Optional[Mapping[str, Any]] = None, 192 ) -> Mapping[str, Any]: 193 """ 194 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 195 196 E.g: you might want to define query parameters for paging if next_page_token is not None. 197 """ 198 params = self._get_request_options( 199 stream_state, 200 stream_slice, 201 next_page_token, 202 self._paginator.get_request_params, 203 self.request_option_provider.get_request_params, 204 ) 205 if isinstance(params, str): 206 raise ValueError("Request params cannot be a string") 207 return params 208 209 def _request_body_data( 210 self, 211 stream_state: Optional[StreamData] = None, 212 stream_slice: Optional[StreamSlice] = None, 213 next_page_token: Optional[Mapping[str, Any]] = None, 214 ) -> Union[Mapping[str, Any], str]: 215 """ 216 Specifies how to populate the body of the request with a non-JSON payload. 217 218 If returns a ready text that it will be sent as is. 219 If returns a dict that it will be converted to a urlencoded form. 220 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 221 222 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 223 """ 224 return self._get_request_options( 225 stream_state, 226 stream_slice, 227 next_page_token, 228 self._paginator.get_request_body_data, 229 self.request_option_provider.get_request_body_data, 230 ) 231 232 def _request_body_json( 233 self, 234 stream_state: Optional[StreamData] = None, 235 stream_slice: Optional[StreamSlice] = None, 236 next_page_token: Optional[Mapping[str, Any]] = None, 237 ) -> Optional[Mapping[str, Any]]: 238 """ 239 Specifies how to populate the body of the request with a JSON payload. 240 241 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 242 """ 243 body_json = self._get_request_options( 244 stream_state, 245 stream_slice, 246 next_page_token, 247 self._paginator.get_request_body_json, 248 self.request_option_provider.get_request_body_json, 249 ) 250 if isinstance(body_json, str): 251 raise ValueError("Request body json cannot be a string") 252 return body_json 253 254 def _paginator_path( 255 self, 256 next_page_token: Optional[Mapping[str, Any]] = None, 257 stream_state: Optional[Mapping[str, Any]] = None, 258 stream_slice: Optional[StreamSlice] = None, 259 ) -> Optional[str]: 260 """ 261 If the paginator points to a path, follow it, else return nothing so the requester is used. 262 :param next_page_token: 263 :return: 264 """ 265 return self._paginator.path( 266 next_page_token=next_page_token, 267 stream_state=stream_state, 268 stream_slice=stream_slice, 269 ) 270 271 def _parse_response( 272 self, 273 response: Optional[requests.Response], 274 stream_state: StreamState, 275 records_schema: Mapping[str, Any], 276 stream_slice: Optional[StreamSlice] = None, 277 next_page_token: Optional[Mapping[str, Any]] = None, 278 ) -> Iterable[Record]: 279 if not response: 280 yield from [] 281 else: 282 yield from self.record_selector.select_records( 283 response=response, 284 stream_state=stream_state, 285 records_schema=records_schema, 286 stream_slice=stream_slice, 287 next_page_token=next_page_token, 288 ) 289 290 @property # type: ignore 291 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 292 """The stream's primary key""" 293 return self._primary_key 294 295 @primary_key.setter 296 def primary_key(self, value: str) -> None: 297 if not isinstance(value, property): 298 self._primary_key = value 299 300 def _next_page_token( 301 self, 302 response: requests.Response, 303 last_page_size: int, 304 last_record: Optional[Record], 305 last_page_token_value: Optional[Any], 306 ) -> Optional[Mapping[str, Any]]: 307 """ 308 Specifies a pagination strategy. 309 310 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 311 312 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 313 """ 314 return self._paginator.next_page_token( 315 response=response, 316 last_page_size=last_page_size, 317 last_record=last_record, 318 last_page_token_value=last_page_token_value, 319 ) 320 321 def _fetch_next_page( 322 self, 323 stream_state: Mapping[str, Any], 324 stream_slice: StreamSlice, 325 next_page_token: Optional[Mapping[str, Any]] = None, 326 ) -> Optional[requests.Response]: 327 return self.requester.send_request( 328 path=self._paginator_path( 329 next_page_token=next_page_token, 330 stream_state=stream_state, 331 stream_slice=stream_slice, 332 ), 333 stream_state=stream_state, 334 stream_slice=stream_slice, 335 next_page_token=next_page_token, 336 request_headers=self._request_headers( 337 stream_state=stream_state, 338 stream_slice=stream_slice, 339 next_page_token=next_page_token, 340 ), 341 request_params=self._request_params( 342 stream_state=stream_state, 343 stream_slice=stream_slice, 344 next_page_token=next_page_token, 345 ), 346 request_body_data=self._request_body_data( 347 stream_state=stream_state, 348 stream_slice=stream_slice, 349 next_page_token=next_page_token, 350 ), 351 request_body_json=self._request_body_json( 352 stream_state=stream_state, 353 stream_slice=stream_slice, 354 next_page_token=next_page_token, 355 ), 356 log_formatter=self.log_formatter, 357 ) 358 359 # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. 360 def _read_pages( 361 self, 362 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 363 stream_state: Mapping[str, Any], 364 stream_slice: StreamSlice, 365 ) -> Iterable[Record]: 366 pagination_complete = False 367 initial_token = self._paginator.get_initial_token() 368 next_page_token: Optional[Mapping[str, Any]] = ( 369 {"next_page_token": initial_token} if initial_token is not None else None 370 ) 371 while not pagination_complete: 372 property_chunks: List[List[str]] = ( 373 list( 374 self.additional_query_properties.get_request_property_chunks( 375 stream_slice=stream_slice 376 ) 377 ) 378 if self.additional_query_properties 379 else [ 380 [] 381 ] # A single empty property chunk represents the case where property chunking is not configured 382 ) 383 384 merged_records: MutableMapping[str, Any] = defaultdict(dict) 385 last_page_size = 0 386 last_record: Optional[Record] = None 387 response: Optional[requests.Response] = None 388 for properties in property_chunks: 389 if len(properties) > 0: 390 stream_slice = StreamSlice( 391 partition=stream_slice.partition or {}, 392 cursor_slice=stream_slice.cursor_slice or {}, 393 extra_fields={"query_properties": properties}, 394 ) 395 396 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 397 for current_record in records_generator_fn(response): 398 if ( 399 current_record 400 and self.additional_query_properties 401 and self.additional_query_properties.property_chunking 402 ): 403 merge_key = ( 404 self.additional_query_properties.property_chunking.get_merge_key( 405 current_record 406 ) 407 ) 408 if merge_key: 409 _deep_merge(merged_records[merge_key], current_record) 410 else: 411 # We should still emit records even if the record did not have a merge key 412 last_page_size += 1 413 last_record = current_record 414 yield current_record 415 else: 416 last_page_size += 1 417 last_record = current_record 418 yield current_record 419 420 if ( 421 self.additional_query_properties 422 and self.additional_query_properties.property_chunking 423 ): 424 for merged_record in merged_records.values(): 425 record = Record( 426 data=merged_record, stream_name=self.name, associated_slice=stream_slice 427 ) 428 last_page_size += 1 429 last_record = record 430 yield record 431 432 if not response: 433 pagination_complete = True 434 else: 435 last_page_token_value = ( 436 next_page_token.get("next_page_token") if next_page_token else None 437 ) 438 next_page_token = self._next_page_token( 439 response=response, 440 last_page_size=last_page_size, 441 last_record=last_record, 442 last_page_token_value=last_page_token_value, 443 ) 444 if not next_page_token: 445 pagination_complete = True 446 447 # Always return an empty generator just in case no records were ever yielded 448 yield from [] 449 450 def _read_single_page( 451 self, 452 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 453 stream_state: Mapping[str, Any], 454 stream_slice: StreamSlice, 455 ) -> Iterable[StreamData]: 456 initial_token = stream_state.get("next_page_token") 457 if initial_token is None: 458 initial_token = self._paginator.get_initial_token() 459 next_page_token: Optional[Mapping[str, Any]] = ( 460 {"next_page_token": initial_token} if initial_token else None 461 ) 462 463 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 464 465 last_page_size = 0 466 last_record: Optional[Record] = None 467 for record in records_generator_fn(response): 468 last_page_size += 1 469 last_record = record 470 yield record 471 472 if not response: 473 next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 474 else: 475 last_page_token_value = ( 476 next_page_token.get("next_page_token") if next_page_token else None 477 ) 478 next_page_token = self._next_page_token( 479 response=response, 480 last_page_size=last_page_size, 481 last_record=last_record, 482 last_page_token_value=last_page_token_value, 483 ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 484 485 if self.cursor: 486 self.cursor.close_slice( 487 StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition) 488 ) 489 490 # Always return an empty generator just in case no records were ever yielded 491 yield from [] 492 493 def read_records( 494 self, 495 records_schema: Mapping[str, Any], 496 stream_slice: Optional[StreamSlice] = None, 497 ) -> Iterable[StreamData]: 498 """ 499 Fetch a stream's records from an HTTP API source 500 501 :param records_schema: json schema to describe record 502 :param stream_slice: The stream slice to read data for 503 :return: The records read from the API source 504 """ 505 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 506 507 most_recent_record_from_slice = None 508 record_generator = partial( 509 self._parse_records, 510 stream_slice=stream_slice, 511 stream_state=self.state or {}, 512 records_schema=records_schema, 513 ) 514 515 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 516 stream_state = self.state 517 518 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 519 # fetch more records. The platform deletes stream state for full refresh streams before starting a 520 # new job, so we don't need to worry about this value existing for the initial attempt 521 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 522 return 523 524 yield from self._read_single_page(record_generator, stream_state, _slice) 525 else: 526 for stream_data in self._read_pages(record_generator, self.state, _slice): 527 current_record = self._extract_record(stream_data, _slice) 528 if self.cursor and current_record: 529 self.cursor.observe(_slice, current_record) 530 531 yield stream_data 532 533 if self.cursor: 534 self.cursor.close_slice(_slice) 535 return 536 537 # FIXME based on the comment above in SimpleRetriever.read_records, it seems like we can tackle https://github.com/airbytehq/airbyte-internal-issues/issues/6955 and remove this 538 539 def _extract_record( 540 self, stream_data: StreamData, stream_slice: StreamSlice 541 ) -> Optional[Record]: 542 """ 543 As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize 544 to data to streamline the rest of the process. 545 """ 546 if isinstance(stream_data, Record): 547 # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData` 548 return stream_data 549 elif isinstance(stream_data, (dict, Mapping)): 550 return Record( 551 data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name 552 ) 553 elif isinstance(stream_data, AirbyteMessage) and stream_data.record: 554 return Record( 555 data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data 556 associated_slice=stream_slice, 557 stream_name=self.name, 558 ) 559 return None 560 561 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 562 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 563 """ 564 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 565 566 :param sync_mode: 567 :param cursor_field: 568 :param stream_state: 569 :return: 570 """ 571 return self.stream_slicer.stream_slices() 572 573 @property 574 def state(self) -> Mapping[str, Any]: 575 return self.cursor.get_stream_state() if self.cursor else {} 576 577 @state.setter 578 def state(self, value: StreamState) -> None: 579 """State setter, accept state serialized by state getter.""" 580 if self.cursor: 581 self.cursor.set_initial_state(value) 582 583 def _parse_records( 584 self, 585 response: Optional[requests.Response], 586 stream_state: Mapping[str, Any], 587 records_schema: Mapping[str, Any], 588 stream_slice: Optional[StreamSlice], 589 ) -> Iterable[Record]: 590 yield from self._parse_response( 591 response, 592 stream_slice=stream_slice, 593 stream_state=stream_state, 594 records_schema=records_schema, 595 ) 596 597 def must_deduplicate_query_params(self) -> bool: 598 return True 599 600 @staticmethod 601 def _to_partition_key(to_serialize: Any) -> str: 602 # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value 603 return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.
Attributes:
- stream_name (str): The stream's name
- stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
- requester (Requester): The HTTP requester
- record_selector (HttpSelector): The record selector
- paginator (Optional[Paginator]): The paginator
- stream_slicer (Optional[StreamSlicer]): The stream slicer
- cursor (Optional[cursor]): The cursor
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
106 @property # type: ignore 107 def name(self) -> str: 108 """ 109 :return: Stream name 110 """ 111 return ( 112 str(self._name.eval(self.config)) 113 if isinstance(self._name, InterpolatedString) 114 else self._name 115 )
Returns
Stream name
290 @property # type: ignore 291 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 292 """The stream's primary key""" 293 return self._primary_key
The stream's primary key
493 def read_records( 494 self, 495 records_schema: Mapping[str, Any], 496 stream_slice: Optional[StreamSlice] = None, 497 ) -> Iterable[StreamData]: 498 """ 499 Fetch a stream's records from an HTTP API source 500 501 :param records_schema: json schema to describe record 502 :param stream_slice: The stream slice to read data for 503 :return: The records read from the API source 504 """ 505 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 506 507 most_recent_record_from_slice = None 508 record_generator = partial( 509 self._parse_records, 510 stream_slice=stream_slice, 511 stream_state=self.state or {}, 512 records_schema=records_schema, 513 ) 514 515 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 516 stream_state = self.state 517 518 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 519 # fetch more records. The platform deletes stream state for full refresh streams before starting a 520 # new job, so we don't need to worry about this value existing for the initial attempt 521 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 522 return 523 524 yield from self._read_single_page(record_generator, stream_state, _slice) 525 else: 526 for stream_data in self._read_pages(record_generator, self.state, _slice): 527 current_record = self._extract_record(stream_data, _slice) 528 if self.cursor and current_record: 529 self.cursor.observe(_slice, current_record) 530 531 yield stream_data 532 533 if self.cursor: 534 self.cursor.close_slice(_slice) 535 return
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
562 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 563 """ 564 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 565 566 :param sync_mode: 567 :param cursor_field: 568 :param stream_state: 569 :return: 570 """ 571 return self.stream_slicer.stream_slices()
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
573 @property 574 def state(self) -> Mapping[str, Any]: 575 return self.cursor.get_stream_state() if self.cursor else {}
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
19@dataclass 20class AsyncRetriever(Retriever): 21 config: Config 22 parameters: InitVar[Mapping[str, Any]] 23 record_selector: RecordSelector 24 stream_slicer: AsyncJobPartitionRouter 25 slice_logger: AlwaysLogSliceLogger = field( 26 init=False, 27 default_factory=lambda: AlwaysLogSliceLogger(), 28 ) 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._parameters = parameters 32 33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore 51 52 @exit_on_rate_limit.setter 53 def exit_on_rate_limit(self, value: bool) -> None: 54 """ 55 Sets the `exit_on_rate_limit` property of the job repository > creation_requester, 56 meaning that the Job cannot be placed / created if the rate limit is reached. 57 Thus no further work on managing jobs is expected to be done. 58 """ 59 job_orchestrator = self.stream_slicer._job_orchestrator 60 if job_orchestrator is not None: 61 job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] 62 63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {} 69 70 @state.setter 71 def state(self, value: StreamState) -> None: 72 """ 73 As a first iteration for sendgrid, there is no state to be managed 74 """ 75 pass 76 77 def _get_stream_state(self) -> StreamState: 78 """ 79 Gets the current state of the stream. 80 81 Returns: 82 StreamState: Mapping[str, Any] 83 """ 84 85 return self.state 86 87 def _validate_and_get_stream_slice_jobs( 88 self, stream_slice: Optional[StreamSlice] = None 89 ) -> Iterable[AsyncJob]: 90 """ 91 Validates the stream_slice argument and returns the partition from it. 92 93 Args: 94 stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. 95 96 Returns: 97 AsyncPartition: The partition extracted from the stream_slice. 98 99 Raises: 100 AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. 101 102 """ 103 return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] 104 105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices() 107 108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.
Note:
- If the
creation_requester
cannot place / create the job - it might be the case of the RateLimits- If the
creation_requester
can place / create the job - it means all other requesters should successfully manage to complete the results.
63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {}
As a first iteration for sendgrid, there is no state to be managed
105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices()
Returns the stream slices
108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
626@deprecated( 627 "This class is experimental. Use at your own risk.", 628 category=ExperimentalClassWarning, 629) 630@dataclass 631class LazySimpleRetriever(SimpleRetriever): 632 """ 633 A retriever that supports lazy loading from parent streams. 634 """ 635 636 def _read_pages( 637 self, 638 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 639 stream_state: Mapping[str, Any], 640 stream_slice: StreamSlice, 641 ) -> Iterable[Record]: 642 response = stream_slice.extra_fields["child_response"] 643 if response: 644 last_page_size, last_record = 0, None 645 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 646 last_page_size += 1 647 last_record = record 648 yield record 649 650 next_page_token = self._next_page_token(response, last_page_size, last_record, None) 651 if next_page_token: 652 yield from self._paginate( 653 next_page_token, 654 records_generator_fn, 655 stream_state, 656 stream_slice, 657 ) 658 659 yield from [] 660 else: 661 yield from self._read_pages(records_generator_fn, stream_state, stream_slice) 662 663 def _paginate( 664 self, 665 next_page_token: Any, 666 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 667 stream_state: Mapping[str, Any], 668 stream_slice: StreamSlice, 669 ) -> Iterable[Record]: 670 """Handle pagination by fetching subsequent pages.""" 671 pagination_complete = False 672 673 while not pagination_complete: 674 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 675 last_page_size, last_record = 0, None 676 677 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 678 last_page_size += 1 679 last_record = record 680 yield record 681 682 if not response: 683 pagination_complete = True 684 else: 685 last_page_token_value = ( 686 next_page_token.get("next_page_token") if next_page_token else None 687 ) 688 next_page_token = self._next_page_token( 689 response, last_page_size, last_record, last_page_token_value 690 ) 691 692 if not next_page_token: 693 pagination_complete = True
A retriever that supports lazy loading from parent streams.