airbyte_cdk.sources.declarative.retrievers
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( 8 LazySimpleRetriever, 9 SimpleRetriever, 10 SimpleRetrieverTestReadDecorator, 11) 12 13__all__ = [ 14 "Retriever", 15 "SimpleRetriever", 16 "SimpleRetrieverTestReadDecorator", 17 "AsyncRetriever", 18 "LazySimpleRetriever", 19]
14class Retriever: 15 """ 16 Responsible for fetching a stream's records from an HTTP API source. 17 """ 18 19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """ 32 33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices""" 36 37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """ 51 52 @state.setter 53 @abstractmethod 54 def state(self, value: StreamState) -> None: 55 """State setter, accept state serialized by state getter."""
Responsible for fetching a stream's records from an HTTP API source.
19 @abstractmethod 20 def read_records( 21 self, 22 records_schema: Mapping[str, Any], 23 stream_slice: Optional[StreamSlice] = None, 24 ) -> Iterable[StreamData]: 25 """ 26 Fetch a stream's records from an HTTP API source 27 28 :param records_schema: json schema to describe record 29 :param stream_slice: The stream slice to read data for 30 :return: The records read from the API source 31 """
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
33 @abstractmethod 34 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 35 """Returns the stream slices"""
Returns the stream slices
37 @property 38 @abstractmethod 39 def state(self) -> StreamState: 40 """State getter, should return state in form that can serialized to a string and send to the output 41 as a STATE AirbyteMessage. 42 43 A good example of a state is a cursor_value: 44 { 45 self.cursor_field: "cursor_value" 46 } 47 48 State should try to be as small as possible but at the same time descriptive enough to restore 49 syncing process from the point where it stopped. 50 """
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
51@dataclass 52class SimpleRetriever(Retriever): 53 """ 54 Retrieves records by synchronously sending requests to fetch records. 55 56 The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer. 57 58 For each stream slice, submit requests until there are no more pages of records to fetch. 59 60 This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. 61 As a result, some of the parameters passed to some methods are unused. 62 The two will be decoupled in a future release. 63 64 Attributes: 65 stream_name (str): The stream's name 66 stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key 67 requester (Requester): The HTTP requester 68 record_selector (HttpSelector): The record selector 69 paginator (Optional[Paginator]): The paginator 70 stream_slicer (Optional[StreamSlicer]): The stream slicer 71 cursor (Optional[cursor]): The cursor 72 parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation 73 """ 74 75 requester: Requester 76 record_selector: HttpSelector 77 config: Config 78 parameters: InitVar[Mapping[str, Any]] 79 name: str 80 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 81 primary_key: Optional[Union[str, List[str], List[List[str]]]] 82 _primary_key: str = field(init=False, repr=False, default="") 83 paginator: Optional[Paginator] = None 84 stream_slicer: StreamSlicer = field( 85 default_factory=lambda: SinglePartitionRouter(parameters={}) 86 ) 87 request_option_provider: RequestOptionsProvider = field( 88 default_factory=lambda: DefaultRequestOptionsProvider(parameters={}) 89 ) 90 cursor: Optional[DeclarativeCursor] = None 91 ignore_stream_slicer_parameters_on_paginated_requests: bool = False 92 93 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 94 self._paginator = self.paginator or NoPagination(parameters=parameters) 95 self._parameters = parameters 96 self._name = ( 97 InterpolatedString(self._name, parameters=parameters) 98 if isinstance(self._name, str) 99 else self._name 100 ) 101 102 @property # type: ignore 103 def name(self) -> str: 104 """ 105 :return: Stream name 106 """ 107 return ( 108 str(self._name.eval(self.config)) 109 if isinstance(self._name, InterpolatedString) 110 else self._name 111 ) 112 113 @name.setter 114 def name(self, value: str) -> None: 115 if not isinstance(value, property): 116 self._name = value 117 118 def _get_mapping( 119 self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any 120 ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]: 121 """ 122 Get mapping from the provided method, and get the keys of the mapping. 123 If the method returns a string, it will return the string and an empty set. 124 If the method returns a dict, it will return the dict and its keys. 125 """ 126 mapping = method(**kwargs) or {} 127 keys = set(mapping.keys()) if not isinstance(mapping, str) else set() 128 return mapping, keys 129 130 def _get_request_options( 131 self, 132 stream_state: Optional[StreamData], 133 stream_slice: Optional[StreamSlice], 134 next_page_token: Optional[Mapping[str, Any]], 135 paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 136 stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]], 137 ) -> Union[Mapping[str, Any], str]: 138 """ 139 Get the request_option from the paginator and the stream slicer. 140 Raise a ValueError if there's a key collision 141 Returned merged mapping otherwise 142 """ 143 # FIXME we should eventually remove the usage of stream_state as part of the interpolation 144 145 is_body_json = paginator_method.__name__ == "get_request_body_json" 146 147 mappings = [ 148 paginator_method( 149 stream_slice=stream_slice, 150 next_page_token=next_page_token, 151 ), 152 ] 153 if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests: 154 mappings.append( 155 stream_slicer_method( 156 stream_slice=stream_slice, 157 next_page_token=next_page_token, 158 ) 159 ) 160 return combine_mappings(mappings, allow_same_value_merge=is_body_json) 161 162 def _request_headers( 163 self, 164 stream_state: Optional[StreamData] = None, 165 stream_slice: Optional[StreamSlice] = None, 166 next_page_token: Optional[Mapping[str, Any]] = None, 167 ) -> Mapping[str, Any]: 168 """ 169 Specifies request headers. 170 Authentication headers will overwrite any overlapping headers returned from this method. 171 """ 172 headers = self._get_request_options( 173 stream_state, 174 stream_slice, 175 next_page_token, 176 self._paginator.get_request_headers, 177 self.request_option_provider.get_request_headers, 178 ) 179 if isinstance(headers, str): 180 raise ValueError("Request headers cannot be a string") 181 return {str(k): str(v) for k, v in headers.items()} 182 183 def _request_params( 184 self, 185 stream_state: Optional[StreamData] = None, 186 stream_slice: Optional[StreamSlice] = None, 187 next_page_token: Optional[Mapping[str, Any]] = None, 188 ) -> Mapping[str, Any]: 189 """ 190 Specifies the query parameters that should be set on an outgoing HTTP request given the inputs. 191 192 E.g: you might want to define query parameters for paging if next_page_token is not None. 193 """ 194 params = self._get_request_options( 195 stream_state, 196 stream_slice, 197 next_page_token, 198 self._paginator.get_request_params, 199 self.request_option_provider.get_request_params, 200 ) 201 if isinstance(params, str): 202 raise ValueError("Request params cannot be a string") 203 return params 204 205 def _request_body_data( 206 self, 207 stream_state: Optional[StreamData] = None, 208 stream_slice: Optional[StreamSlice] = None, 209 next_page_token: Optional[Mapping[str, Any]] = None, 210 ) -> Union[Mapping[str, Any], str]: 211 """ 212 Specifies how to populate the body of the request with a non-JSON payload. 213 214 If returns a ready text that it will be sent as is. 215 If returns a dict that it will be converted to a urlencoded form. 216 E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2" 217 218 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 219 """ 220 return self._get_request_options( 221 stream_state, 222 stream_slice, 223 next_page_token, 224 self._paginator.get_request_body_data, 225 self.request_option_provider.get_request_body_data, 226 ) 227 228 def _request_body_json( 229 self, 230 stream_state: Optional[StreamData] = None, 231 stream_slice: Optional[StreamSlice] = None, 232 next_page_token: Optional[Mapping[str, Any]] = None, 233 ) -> Optional[Mapping[str, Any]]: 234 """ 235 Specifies how to populate the body of the request with a JSON payload. 236 237 At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden. 238 """ 239 body_json = self._get_request_options( 240 stream_state, 241 stream_slice, 242 next_page_token, 243 self._paginator.get_request_body_json, 244 self.request_option_provider.get_request_body_json, 245 ) 246 if isinstance(body_json, str): 247 raise ValueError("Request body json cannot be a string") 248 return body_json 249 250 def _paginator_path( 251 self, 252 next_page_token: Optional[Mapping[str, Any]] = None, 253 stream_state: Optional[Mapping[str, Any]] = None, 254 stream_slice: Optional[StreamSlice] = None, 255 ) -> Optional[str]: 256 """ 257 If the paginator points to a path, follow it, else return nothing so the requester is used. 258 :param next_page_token: 259 :return: 260 """ 261 return self._paginator.path( 262 next_page_token=next_page_token, 263 stream_state=stream_state, 264 stream_slice=stream_slice, 265 ) 266 267 def _parse_response( 268 self, 269 response: Optional[requests.Response], 270 stream_state: StreamState, 271 records_schema: Mapping[str, Any], 272 stream_slice: Optional[StreamSlice] = None, 273 next_page_token: Optional[Mapping[str, Any]] = None, 274 ) -> Iterable[Record]: 275 if not response: 276 yield from [] 277 else: 278 yield from self.record_selector.select_records( 279 response=response, 280 stream_state=stream_state, 281 records_schema=records_schema, 282 stream_slice=stream_slice, 283 next_page_token=next_page_token, 284 ) 285 286 @property # type: ignore 287 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 288 """The stream's primary key""" 289 return self._primary_key 290 291 @primary_key.setter 292 def primary_key(self, value: str) -> None: 293 if not isinstance(value, property): 294 self._primary_key = value 295 296 def _next_page_token( 297 self, 298 response: requests.Response, 299 last_page_size: int, 300 last_record: Optional[Record], 301 last_page_token_value: Optional[Any], 302 ) -> Optional[Mapping[str, Any]]: 303 """ 304 Specifies a pagination strategy. 305 306 The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params. 307 308 :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response. 309 """ 310 return self._paginator.next_page_token( 311 response=response, 312 last_page_size=last_page_size, 313 last_record=last_record, 314 last_page_token_value=last_page_token_value, 315 ) 316 317 def _fetch_next_page( 318 self, 319 stream_state: Mapping[str, Any], 320 stream_slice: StreamSlice, 321 next_page_token: Optional[Mapping[str, Any]] = None, 322 ) -> Optional[requests.Response]: 323 return self.requester.send_request( 324 path=self._paginator_path( 325 next_page_token=next_page_token, 326 stream_state=stream_state, 327 stream_slice=stream_slice, 328 ), 329 stream_state=stream_state, 330 stream_slice=stream_slice, 331 next_page_token=next_page_token, 332 request_headers=self._request_headers( 333 stream_state=stream_state, 334 stream_slice=stream_slice, 335 next_page_token=next_page_token, 336 ), 337 request_params=self._request_params( 338 stream_state=stream_state, 339 stream_slice=stream_slice, 340 next_page_token=next_page_token, 341 ), 342 request_body_data=self._request_body_data( 343 stream_state=stream_state, 344 stream_slice=stream_slice, 345 next_page_token=next_page_token, 346 ), 347 request_body_json=self._request_body_json( 348 stream_state=stream_state, 349 stream_slice=stream_slice, 350 next_page_token=next_page_token, 351 ), 352 ) 353 354 # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well. 355 def _read_pages( 356 self, 357 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 358 stream_state: Mapping[str, Any], 359 stream_slice: StreamSlice, 360 ) -> Iterable[Record]: 361 pagination_complete = False 362 initial_token = self._paginator.get_initial_token() 363 next_page_token: Optional[Mapping[str, Any]] = ( 364 {"next_page_token": initial_token} if initial_token else None 365 ) 366 while not pagination_complete: 367 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 368 369 last_page_size = 0 370 last_record: Optional[Record] = None 371 for record in records_generator_fn(response): 372 last_page_size += 1 373 last_record = record 374 yield record 375 376 if not response: 377 pagination_complete = True 378 else: 379 last_page_token_value = ( 380 next_page_token.get("next_page_token") if next_page_token else None 381 ) 382 next_page_token = self._next_page_token( 383 response=response, 384 last_page_size=last_page_size, 385 last_record=last_record, 386 last_page_token_value=last_page_token_value, 387 ) 388 if not next_page_token: 389 pagination_complete = True 390 391 # Always return an empty generator just in case no records were ever yielded 392 yield from [] 393 394 def _read_single_page( 395 self, 396 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 397 stream_state: Mapping[str, Any], 398 stream_slice: StreamSlice, 399 ) -> Iterable[StreamData]: 400 initial_token = stream_state.get("next_page_token") 401 if initial_token is None: 402 initial_token = self._paginator.get_initial_token() 403 next_page_token: Optional[Mapping[str, Any]] = ( 404 {"next_page_token": initial_token} if initial_token else None 405 ) 406 407 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 408 409 last_page_size = 0 410 last_record: Optional[Record] = None 411 for record in records_generator_fn(response): 412 last_page_size += 1 413 last_record = record 414 yield record 415 416 if not response: 417 next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 418 else: 419 last_page_token_value = ( 420 next_page_token.get("next_page_token") if next_page_token else None 421 ) 422 next_page_token = self._next_page_token( 423 response=response, 424 last_page_size=last_page_size, 425 last_record=last_record, 426 last_page_token_value=last_page_token_value, 427 ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True} 428 429 if self.cursor: 430 self.cursor.close_slice( 431 StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition) 432 ) 433 434 # Always return an empty generator just in case no records were ever yielded 435 yield from [] 436 437 def read_records( 438 self, 439 records_schema: Mapping[str, Any], 440 stream_slice: Optional[StreamSlice] = None, 441 ) -> Iterable[StreamData]: 442 """ 443 Fetch a stream's records from an HTTP API source 444 445 :param records_schema: json schema to describe record 446 :param stream_slice: The stream slice to read data for 447 :return: The records read from the API source 448 """ 449 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 450 451 most_recent_record_from_slice = None 452 record_generator = partial( 453 self._parse_records, 454 stream_slice=stream_slice, 455 stream_state=self.state or {}, 456 records_schema=records_schema, 457 ) 458 459 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 460 stream_state = self.state 461 462 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 463 # fetch more records. The platform deletes stream state for full refresh streams before starting a 464 # new job, so we don't need to worry about this value existing for the initial attempt 465 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 466 return 467 468 yield from self._read_single_page(record_generator, stream_state, _slice) 469 else: 470 for stream_data in self._read_pages(record_generator, self.state, _slice): 471 current_record = self._extract_record(stream_data, _slice) 472 if self.cursor and current_record: 473 self.cursor.observe(_slice, current_record) 474 475 # Latest record read, not necessarily within slice boundaries. 476 # TODO Remove once all custom components implement `observe` method. 477 # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 478 most_recent_record_from_slice = self._get_most_recent_record( 479 most_recent_record_from_slice, current_record, _slice 480 ) 481 yield stream_data 482 483 if self.cursor: 484 self.cursor.close_slice(_slice, most_recent_record_from_slice) 485 return 486 487 def _get_most_recent_record( 488 self, 489 current_most_recent: Optional[Record], 490 current_record: Optional[Record], 491 stream_slice: StreamSlice, 492 ) -> Optional[Record]: 493 if self.cursor and current_record: 494 if not current_most_recent: 495 return current_record 496 else: 497 return ( 498 current_most_recent 499 if self.cursor.is_greater_than_or_equal(current_most_recent, current_record) 500 else current_record 501 ) 502 else: 503 return None 504 505 def _extract_record( 506 self, stream_data: StreamData, stream_slice: StreamSlice 507 ) -> Optional[Record]: 508 """ 509 As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize 510 to data to streamline the rest of the process. 511 """ 512 if isinstance(stream_data, Record): 513 # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData` 514 return stream_data 515 elif isinstance(stream_data, (dict, Mapping)): 516 return Record( 517 data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name 518 ) 519 elif isinstance(stream_data, AirbyteMessage) and stream_data.record: 520 return Record( 521 data=stream_data.record.data, # type:ignore # AirbyteMessage always has record.data 522 associated_slice=stream_slice, 523 stream_name=self.name, 524 ) 525 return None 526 527 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 528 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 529 """ 530 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 531 532 :param sync_mode: 533 :param cursor_field: 534 :param stream_state: 535 :return: 536 """ 537 return self.stream_slicer.stream_slices() 538 539 @property 540 def state(self) -> Mapping[str, Any]: 541 return self.cursor.get_stream_state() if self.cursor else {} 542 543 @state.setter 544 def state(self, value: StreamState) -> None: 545 """State setter, accept state serialized by state getter.""" 546 if self.cursor: 547 self.cursor.set_initial_state(value) 548 549 def _parse_records( 550 self, 551 response: Optional[requests.Response], 552 stream_state: Mapping[str, Any], 553 records_schema: Mapping[str, Any], 554 stream_slice: Optional[StreamSlice], 555 ) -> Iterable[Record]: 556 yield from self._parse_response( 557 response, 558 stream_slice=stream_slice, 559 stream_state=stream_state, 560 records_schema=records_schema, 561 ) 562 563 def must_deduplicate_query_params(self) -> bool: 564 return True 565 566 @staticmethod 567 def _to_partition_key(to_serialize: Any) -> str: 568 # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value 569 return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)
Retrieves records by synchronously sending requests to fetch records.
The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
For each stream slice, submit requests until there are no more pages of records to fetch.
This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.
Attributes:
- stream_name (str): The stream's name
- stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
- requester (Requester): The HTTP requester
- record_selector (HttpSelector): The record selector
- paginator (Optional[Paginator]): The paginator
- stream_slicer (Optional[StreamSlicer]): The stream slicer
- cursor (Optional[cursor]): The cursor
- parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
102 @property # type: ignore 103 def name(self) -> str: 104 """ 105 :return: Stream name 106 """ 107 return ( 108 str(self._name.eval(self.config)) 109 if isinstance(self._name, InterpolatedString) 110 else self._name 111 )
Returns
Stream name
286 @property # type: ignore 287 def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: 288 """The stream's primary key""" 289 return self._primary_key
The stream's primary key
437 def read_records( 438 self, 439 records_schema: Mapping[str, Any], 440 stream_slice: Optional[StreamSlice] = None, 441 ) -> Iterable[StreamData]: 442 """ 443 Fetch a stream's records from an HTTP API source 444 445 :param records_schema: json schema to describe record 446 :param stream_slice: The stream slice to read data for 447 :return: The records read from the API source 448 """ 449 _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check 450 451 most_recent_record_from_slice = None 452 record_generator = partial( 453 self._parse_records, 454 stream_slice=stream_slice, 455 stream_state=self.state or {}, 456 records_schema=records_schema, 457 ) 458 459 if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): 460 stream_state = self.state 461 462 # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to 463 # fetch more records. The platform deletes stream state for full refresh streams before starting a 464 # new job, so we don't need to worry about this value existing for the initial attempt 465 if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): 466 return 467 468 yield from self._read_single_page(record_generator, stream_state, _slice) 469 else: 470 for stream_data in self._read_pages(record_generator, self.state, _slice): 471 current_record = self._extract_record(stream_data, _slice) 472 if self.cursor and current_record: 473 self.cursor.observe(_slice, current_record) 474 475 # Latest record read, not necessarily within slice boundaries. 476 # TODO Remove once all custom components implement `observe` method. 477 # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 478 most_recent_record_from_slice = self._get_most_recent_record( 479 most_recent_record_from_slice, current_record, _slice 480 ) 481 yield stream_data 482 483 if self.cursor: 484 self.cursor.close_slice(_slice, most_recent_record_from_slice) 485 return
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
528 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 529 """ 530 Specifies the slices for this stream. See the stream slicing section of the docs for more information. 531 532 :param sync_mode: 533 :param cursor_field: 534 :param stream_state: 535 :return: 536 """ 537 return self.stream_slicer.stream_slices()
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
539 @property 540 def state(self) -> Mapping[str, Any]: 541 return self.cursor.get_stream_state() if self.cursor else {}
State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.
A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }
State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.
572@dataclass 573class SimpleRetrieverTestReadDecorator(SimpleRetriever): 574 """ 575 In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of 576 slices that are queried throughout a read command. 577 """ 578 579 maximum_number_of_slices: int = 5 580 581 def __post_init__(self, options: Mapping[str, Any]) -> None: 582 super().__post_init__(options) 583 if self.maximum_number_of_slices and self.maximum_number_of_slices < 1: 584 raise ValueError( 585 f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}" 586 ) 587 588 # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever 589 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 590 return islice(super().stream_slices(), self.maximum_number_of_slices) 591 592 def _fetch_next_page( 593 self, 594 stream_state: Mapping[str, Any], 595 stream_slice: StreamSlice, 596 next_page_token: Optional[Mapping[str, Any]] = None, 597 ) -> Optional[requests.Response]: 598 return self.requester.send_request( 599 path=self._paginator_path( 600 next_page_token=next_page_token, 601 stream_state=stream_state, 602 stream_slice=stream_slice, 603 ), 604 stream_state=stream_state, 605 stream_slice=stream_slice, 606 next_page_token=next_page_token, 607 request_headers=self._request_headers( 608 stream_state=stream_state, 609 stream_slice=stream_slice, 610 next_page_token=next_page_token, 611 ), 612 request_params=self._request_params( 613 stream_state=stream_state, 614 stream_slice=stream_slice, 615 next_page_token=next_page_token, 616 ), 617 request_body_data=self._request_body_data( 618 stream_state=stream_state, 619 stream_slice=stream_slice, 620 next_page_token=next_page_token, 621 ), 622 request_body_json=self._request_body_json( 623 stream_state=stream_state, 624 stream_slice=stream_slice, 625 next_page_token=next_page_token, 626 ), 627 log_formatter=lambda response: format_http_message( 628 response, 629 f"Stream '{self.name}' request", 630 f"Request performed in order to extract records for stream '{self.name}'", 631 self.name, 632 ), 633 )
In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command.
589 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore 590 return islice(super().stream_slices(), self.maximum_number_of_slices)
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
Parameters
- sync_mode:
- cursor_field:
- stream_state:
Returns
19@dataclass 20class AsyncRetriever(Retriever): 21 config: Config 22 parameters: InitVar[Mapping[str, Any]] 23 record_selector: RecordSelector 24 stream_slicer: AsyncJobPartitionRouter 25 slice_logger: AlwaysLogSliceLogger = field( 26 init=False, 27 default_factory=lambda: AlwaysLogSliceLogger(), 28 ) 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._parameters = parameters 32 33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore 51 52 @exit_on_rate_limit.setter 53 def exit_on_rate_limit(self, value: bool) -> None: 54 """ 55 Sets the `exit_on_rate_limit` property of the job repository > creation_requester, 56 meaning that the Job cannot be placed / created if the rate limit is reached. 57 Thus no further work on managing jobs is expected to be done. 58 """ 59 job_orchestrator = self.stream_slicer._job_orchestrator 60 if job_orchestrator is not None: 61 job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment] 62 63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {} 69 70 @state.setter 71 def state(self, value: StreamState) -> None: 72 """ 73 As a first iteration for sendgrid, there is no state to be managed 74 """ 75 pass 76 77 def _get_stream_state(self) -> StreamState: 78 """ 79 Gets the current state of the stream. 80 81 Returns: 82 StreamState: Mapping[str, Any] 83 """ 84 85 return self.state 86 87 def _validate_and_get_stream_slice_jobs( 88 self, stream_slice: Optional[StreamSlice] = None 89 ) -> Iterable[AsyncJob]: 90 """ 91 Validates the stream_slice argument and returns the partition from it. 92 93 Args: 94 stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from. 95 96 Returns: 97 AsyncPartition: The partition extracted from the stream_slice. 98 99 Raises: 100 AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice. 101 102 """ 103 return stream_slice.extra_fields.get("jobs", []) if stream_slice else [] 104 105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices() 107 108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
33 @property 34 def exit_on_rate_limit(self) -> bool: 35 """ 36 Whether to exit on rate limit. This is a property of the job repository 37 and not the stream slicer. The stream slicer is responsible for creating 38 the jobs, but the job repository is responsible for managing the rate 39 limits and other job-related properties. 40 41 Note: 42 - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits 43 - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage 44 to complete the results. 45 """ 46 job_orchestrator = self.stream_slicer._job_orchestrator 47 if job_orchestrator is None: 48 # Default value when orchestrator is not available 49 return False 50 return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.
Note:
- If the
creation_requester
cannot place / create the job - it might be the case of the RateLimits- If the
creation_requester
can place / create the job - it means all other requesters should successfully manage to complete the results.
63 @property 64 def state(self) -> StreamState: 65 """ 66 As a first iteration for sendgrid, there is no state to be managed 67 """ 68 return {}
As a first iteration for sendgrid, there is no state to be managed
105 def stream_slices(self) -> Iterable[Optional[StreamSlice]]: 106 yield from self.stream_slicer.stream_slices()
Returns the stream slices
108 def read_records( 109 self, 110 records_schema: Mapping[str, Any], 111 stream_slice: Optional[StreamSlice] = None, 112 ) -> Iterable[StreamData]: 113 # emit the slice_descriptor log message, for connector builder TestRead 114 yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore 115 116 stream_state: StreamState = self._get_stream_state() 117 jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice) 118 records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs) 119 120 yield from self.record_selector.filter_and_transform( 121 all_data=records, 122 stream_state=stream_state, 123 records_schema=records_schema, 124 stream_slice=stream_slice, 125 )
Fetch a stream's records from an HTTP API source
Parameters
- records_schema: json schema to describe record
- stream_slice: The stream slice to read data for
Returns
The records read from the API source
636@deprecated( 637 "This class is experimental. Use at your own risk.", 638 category=ExperimentalClassWarning, 639) 640@dataclass 641class LazySimpleRetriever(SimpleRetriever): 642 """ 643 A retriever that supports lazy loading from parent streams. 644 """ 645 646 def _read_pages( 647 self, 648 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 649 stream_state: Mapping[str, Any], 650 stream_slice: StreamSlice, 651 ) -> Iterable[Record]: 652 response = stream_slice.extra_fields["child_response"] 653 if response: 654 last_page_size, last_record = 0, None 655 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 656 last_page_size += 1 657 last_record = record 658 yield record 659 660 next_page_token = self._next_page_token(response, last_page_size, last_record, None) 661 if next_page_token: 662 yield from self._paginate( 663 next_page_token, 664 records_generator_fn, 665 stream_state, 666 stream_slice, 667 ) 668 669 yield from [] 670 else: 671 yield from self._read_pages(records_generator_fn, stream_state, stream_slice) 672 673 def _paginate( 674 self, 675 next_page_token: Any, 676 records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]], 677 stream_state: Mapping[str, Any], 678 stream_slice: StreamSlice, 679 ) -> Iterable[Record]: 680 """Handle pagination by fetching subsequent pages.""" 681 pagination_complete = False 682 683 while not pagination_complete: 684 response = self._fetch_next_page(stream_state, stream_slice, next_page_token) 685 last_page_size, last_record = 0, None 686 687 for record in records_generator_fn(response): # type: ignore[call-arg] # only _parse_records expected as a func 688 last_page_size += 1 689 last_record = record 690 yield record 691 692 if not response: 693 pagination_complete = True 694 else: 695 last_page_token_value = ( 696 next_page_token.get("next_page_token") if next_page_token else None 697 ) 698 next_page_token = self._next_page_token( 699 response, last_page_size, last_record, last_page_token_value 700 ) 701 702 if not next_page_token: 703 pagination_complete = True
A retriever that supports lazy loading from parent streams.