airbyte_cdk.sources.declarative.retrievers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
 8    LazySimpleRetriever,
 9    SimpleRetriever,
10)
11
12__all__ = [
13    "Retriever",
14    "SimpleRetriever",
15    "AsyncRetriever",
16    "LazySimpleRetriever",
17]
class Retriever:
15class Retriever:
16    """
17    Responsible for fetching a stream's records from an HTTP API source.
18    """
19
20    @abstractmethod
21    def read_records(
22        self,
23        records_schema: Mapping[str, Any],
24        stream_slice: Optional[StreamSlice] = None,
25    ) -> Iterable[StreamData]:
26        """
27        Fetch a stream's records from an HTTP API source
28
29        :param records_schema: json schema to describe record
30        :param stream_slice: The stream slice to read data for
31        :return: The records read from the API source
32        """
33
34    @abstractmethod
35    @deprecated("Stream slicing is being moved to the stream level.")
36    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
37        """Returns the stream slices"""
38
39    @property
40    @abstractmethod
41    @deprecated("State management is being moved to the stream level.")
42    def state(self) -> StreamState:
43        """State getter, should return state in form that can serialized to a string and send to the output
44        as a STATE AirbyteMessage.
45
46        A good example of a state is a cursor_value:
47            {
48                self.cursor_field: "cursor_value"
49            }
50
51         State should try to be as small as possible but at the same time descriptive enough to restore
52         syncing process from the point where it stopped.
53        """
54
55    @state.setter
56    @abstractmethod
57    @deprecated("State management is being moved to the stream level.")
58    def state(self, value: StreamState) -> None:
59        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
20    @abstractmethod
21    def read_records(
22        self,
23        records_schema: Mapping[str, Any],
24        stream_slice: Optional[StreamSlice] = None,
25    ) -> Iterable[StreamData]:
26        """
27        Fetch a stream's records from an HTTP API source
28
29        :param records_schema: json schema to describe record
30        :param stream_slice: The stream slice to read data for
31        :return: The records read from the API source
32        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
@deprecated('Stream slicing is being moved to the stream level.')
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
34    @abstractmethod
35    @deprecated("Stream slicing is being moved to the stream level.")
36    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
37        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
39    @property
40    @abstractmethod
41    @deprecated("State management is being moved to the stream level.")
42    def state(self) -> StreamState:
43        """State getter, should return state in form that can serialized to a string and send to the output
44        as a STATE AirbyteMessage.
45
46        A good example of a state is a cursor_value:
47            {
48                self.cursor_field: "cursor_value"
49            }
50
51         State should try to be as small as possible but at the same time descriptive enough to restore
52         syncing process from the point where it stopped.
53        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

@dataclass
class SimpleRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 59@dataclass
 60class SimpleRetriever(Retriever):
 61    """
 62    Retrieves records by synchronously sending requests to fetch records.
 63
 64    The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
 65
 66    For each stream slice, submit requests until there are no more pages of records to fetch.
 67
 68    This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
 69    As a result, some of the parameters passed to some methods are unused.
 70    The two will be decoupled in a future release.
 71
 72    Attributes:
 73        stream_name (str): The stream's name
 74        stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
 75        requester (Requester): The HTTP requester
 76        record_selector (HttpSelector): The record selector
 77        paginator (Optional[Paginator]): The paginator
 78        stream_slicer (Optional[StreamSlicer]): The stream slicer
 79        cursor (Optional[cursor]): The cursor
 80        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 81    """
 82
 83    requester: Requester
 84    record_selector: HttpSelector
 85    config: Config
 86    parameters: InitVar[Mapping[str, Any]]
 87    name: str
 88    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 89    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 90    _primary_key: str = field(init=False, repr=False, default="")
 91    paginator: Optional[Paginator] = None
 92    stream_slicer: StreamSlicer = field(
 93        default_factory=lambda: SinglePartitionRouter(parameters={})
 94    )
 95    request_option_provider: RequestOptionsProvider = field(
 96        default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
 97    )
 98    cursor: Optional[DeclarativeCursor] = None
 99    ignore_stream_slicer_parameters_on_paginated_requests: bool = False
100    additional_query_properties: Optional[QueryProperties] = None
101    log_formatter: Optional[Callable[[requests.Response], Any]] = None
102    pagination_tracker_factory: Callable[[], PaginationTracker] = field(
103        default_factory=lambda: lambda: PaginationTracker()
104    )
105
106    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
107        # while changing `ModelToComponentFactory.create_simple_retriever` to accept a cursor, the sources implementing
108        # a CustomRetriever inheriting for SimpleRetriever needed to have the following validation added.
109        self.cursor = None if isinstance(self.cursor, Cursor) else self.cursor
110        self._paginator = self.paginator or NoPagination(parameters=parameters)
111        self._parameters = parameters
112        self._name = (
113            InterpolatedString(self._name, parameters=parameters)
114            if isinstance(self._name, str)
115            else self._name
116        )
117
118    @property  # type: ignore
119    def name(self) -> str:
120        """
121        :return: Stream name
122        """
123        return (
124            str(self._name.eval(self.config))
125            if isinstance(self._name, InterpolatedString)
126            else self._name
127        )
128
129    @name.setter
130    def name(self, value: str) -> None:
131        if not isinstance(value, property):
132            self._name = value
133
134    def _get_mapping(
135        self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
136    ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
137        """
138        Get mapping from the provided method, and get the keys of the mapping.
139        If the method returns a string, it will return the string and an empty set.
140        If the method returns a dict, it will return the dict and its keys.
141        """
142        mapping = method(**kwargs) or {}
143        keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
144        return mapping, keys
145
146    def _get_request_options(
147        self,
148        stream_state: Optional[StreamData],
149        stream_slice: Optional[StreamSlice],
150        next_page_token: Optional[Mapping[str, Any]],
151        paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
152        stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
153    ) -> Union[Mapping[str, Any], str]:
154        """
155        Get the request_option from the paginator and the stream slicer.
156        Raise a ValueError if there's a key collision
157        Returned merged mapping otherwise
158        """
159        # FIXME we should eventually remove the usage of stream_state as part of the interpolation
160
161        is_body_json = paginator_method.__name__ == "get_request_body_json"
162
163        mappings = [
164            paginator_method(
165                stream_slice=stream_slice,
166                next_page_token=next_page_token,
167            ),
168        ]
169        if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
170            mappings.append(
171                stream_slicer_method(
172                    stream_slice=stream_slice,
173                    next_page_token=next_page_token,
174                )
175            )
176        return combine_mappings(mappings, allow_same_value_merge=is_body_json)
177
178    def _request_headers(
179        self,
180        stream_state: Optional[StreamData] = None,
181        stream_slice: Optional[StreamSlice] = None,
182        next_page_token: Optional[Mapping[str, Any]] = None,
183    ) -> Mapping[str, Any]:
184        """
185        Specifies request headers.
186        Authentication headers will overwrite any overlapping headers returned from this method.
187        """
188        headers = self._get_request_options(
189            stream_state,
190            stream_slice,
191            next_page_token,
192            self._paginator.get_request_headers,
193            self.request_option_provider.get_request_headers,
194        )
195        if isinstance(headers, str):
196            raise ValueError("Request headers cannot be a string")
197        return {str(k): str(v) for k, v in headers.items()}
198
199    def _request_params(
200        self,
201        stream_state: Optional[StreamData] = None,
202        stream_slice: Optional[StreamSlice] = None,
203        next_page_token: Optional[Mapping[str, Any]] = None,
204    ) -> Mapping[str, Any]:
205        """
206        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
207
208        E.g: you might want to define query parameters for paging if next_page_token is not None.
209        """
210        params = self._get_request_options(
211            stream_state,
212            stream_slice,
213            next_page_token,
214            self._paginator.get_request_params,
215            self.request_option_provider.get_request_params,
216        )
217        if isinstance(params, str):
218            raise ValueError("Request params cannot be a string")
219        return params
220
221    def _request_body_data(
222        self,
223        stream_state: Optional[StreamData] = None,
224        stream_slice: Optional[StreamSlice] = None,
225        next_page_token: Optional[Mapping[str, Any]] = None,
226    ) -> Union[Mapping[str, Any], str]:
227        """
228        Specifies how to populate the body of the request with a non-JSON payload.
229
230        If returns a ready text that it will be sent as is.
231        If returns a dict that it will be converted to a urlencoded form.
232        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
233
234        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
235        """
236        return self._get_request_options(
237            stream_state,
238            stream_slice,
239            next_page_token,
240            self._paginator.get_request_body_data,
241            self.request_option_provider.get_request_body_data,
242        )
243
244    def _request_body_json(
245        self,
246        stream_state: Optional[StreamData] = None,
247        stream_slice: Optional[StreamSlice] = None,
248        next_page_token: Optional[Mapping[str, Any]] = None,
249    ) -> Optional[Mapping[str, Any]]:
250        """
251        Specifies how to populate the body of the request with a JSON payload.
252
253        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
254        """
255        body_json = self._get_request_options(
256            stream_state,
257            stream_slice,
258            next_page_token,
259            self._paginator.get_request_body_json,
260            self.request_option_provider.get_request_body_json,
261        )
262        if isinstance(body_json, str):
263            raise ValueError("Request body json cannot be a string")
264        return body_json
265
266    def _paginator_path(
267        self,
268        next_page_token: Optional[Mapping[str, Any]] = None,
269        stream_state: Optional[Mapping[str, Any]] = None,
270        stream_slice: Optional[StreamSlice] = None,
271    ) -> Optional[str]:
272        """
273        If the paginator points to a path, follow it, else return nothing so the requester is used.
274        :param next_page_token:
275        :return:
276        """
277        return self._paginator.path(
278            next_page_token=next_page_token,
279            stream_state=stream_state,
280            stream_slice=stream_slice,
281        )
282
283    def _parse_response(
284        self,
285        response: Optional[requests.Response],
286        stream_state: StreamState,
287        records_schema: Mapping[str, Any],
288        stream_slice: Optional[StreamSlice] = None,
289        next_page_token: Optional[Mapping[str, Any]] = None,
290    ) -> Iterable[Record]:
291        if not response:
292            yield from []
293        else:
294            yield from self.record_selector.select_records(
295                response=response,
296                stream_state=stream_state,
297                records_schema=records_schema,
298                stream_slice=stream_slice,
299                next_page_token=next_page_token,
300            )
301
302    @property  # type: ignore
303    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
304        """The stream's primary key"""
305        return self._primary_key
306
307    @primary_key.setter
308    def primary_key(self, value: str) -> None:
309        if not isinstance(value, property):
310            self._primary_key = value
311
312    def _next_page_token(
313        self,
314        response: requests.Response,
315        last_page_size: int,
316        last_record: Optional[Record],
317        last_page_token_value: Optional[Any],
318    ) -> Optional[Mapping[str, Any]]:
319        """
320        Specifies a pagination strategy.
321
322        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
323
324        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
325        """
326        return self._paginator.next_page_token(
327            response=response,
328            last_page_size=last_page_size,
329            last_record=last_record,
330            last_page_token_value=last_page_token_value,
331        )
332
333    def _fetch_next_page(
334        self,
335        stream_state: Mapping[str, Any],
336        stream_slice: StreamSlice,
337        next_page_token: Optional[Mapping[str, Any]] = None,
338    ) -> Optional[requests.Response]:
339        return self.requester.send_request(
340            path=self._paginator_path(
341                next_page_token=next_page_token,
342                stream_state=stream_state,
343                stream_slice=stream_slice,
344            ),
345            stream_state=stream_state,
346            stream_slice=stream_slice,
347            next_page_token=next_page_token,
348            request_headers=self._request_headers(
349                stream_state=stream_state,
350                stream_slice=stream_slice,
351                next_page_token=next_page_token,
352            ),
353            request_params=self._request_params(
354                stream_state=stream_state,
355                stream_slice=stream_slice,
356                next_page_token=next_page_token,
357            ),
358            request_body_data=self._request_body_data(
359                stream_state=stream_state,
360                stream_slice=stream_slice,
361                next_page_token=next_page_token,
362            ),
363            request_body_json=self._request_body_json(
364                stream_state=stream_state,
365                stream_slice=stream_slice,
366                next_page_token=next_page_token,
367            ),
368            log_formatter=self.log_formatter,
369        )
370
371    # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
372    def _read_pages(
373        self,
374        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
375        stream_state: Mapping[str, Any],
376        stream_slice: StreamSlice,
377    ) -> Iterable[Record]:
378        pagination_tracker = self.pagination_tracker_factory()
379        reset_pagination = False
380        next_page_token = self._get_initial_next_page_token()
381        while True:
382            merged_records: MutableMapping[str, Any] = defaultdict(dict)
383            last_page_size = 0
384            last_record: Optional[Record] = None
385
386            response = None
387            try:
388                if (
389                    self.additional_query_properties
390                    and self.additional_query_properties.property_chunking
391                ):
392                    for properties in self.additional_query_properties.get_request_property_chunks(
393                        stream_slice=stream_slice
394                    ):
395                        stream_slice = StreamSlice(
396                            partition=stream_slice.partition or {},
397                            cursor_slice=stream_slice.cursor_slice or {},
398                            extra_fields={"query_properties": properties},
399                        )
400                        response = self._fetch_next_page(
401                            stream_state, stream_slice, next_page_token
402                        )
403
404                        for current_record in records_generator_fn(response):
405                            merge_key = (
406                                self.additional_query_properties.property_chunking.get_merge_key(
407                                    current_record
408                                )
409                            )
410                            if merge_key:
411                                _deep_merge(merged_records[merge_key], current_record)
412                            else:
413                                # We should still emit records even if the record did not have a merge key
414                                pagination_tracker.observe(current_record)
415                                last_page_size += 1
416                                last_record = current_record
417                                yield current_record
418
419                    for merged_record in merged_records.values():
420                        record = Record(
421                            data=merged_record, stream_name=self.name, associated_slice=stream_slice
422                        )
423                        pagination_tracker.observe(record)
424                        last_page_size += 1
425                        last_record = record
426                        yield record
427                else:
428                    response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
429                    for current_record in records_generator_fn(response):
430                        pagination_tracker.observe(current_record)
431                        last_page_size += 1
432                        last_record = current_record
433                        yield current_record
434            except PaginationResetRequiredException:
435                reset_pagination = True
436            else:
437                if not response:
438                    break
439
440            if reset_pagination or pagination_tracker.has_reached_limit():
441                next_page_token = self._get_initial_next_page_token()
442                previous_slice = stream_slice
443                stream_slice = pagination_tracker.reduce_slice_range_if_possible(stream_slice)
444                LOGGER.info(
445                    f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}"
446                )
447                reset_pagination = False
448            else:
449                last_page_token_value = (
450                    next_page_token.get("next_page_token") if next_page_token else None
451                )
452                next_page_token = self._next_page_token(
453                    response=response,  # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine
454                    last_page_size=last_page_size,
455                    last_record=last_record,
456                    last_page_token_value=last_page_token_value,
457                )
458                if not next_page_token:
459                    break
460
461        # Always return an empty generator just in case no records were ever yielded
462        yield from []
463
464    def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]:
465        initial_token = self._paginator.get_initial_token()
466        next_page_token = {"next_page_token": initial_token} if initial_token is not None else None
467        return next_page_token
468
469    def _read_single_page(
470        self,
471        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
472        stream_state: Mapping[str, Any],
473        stream_slice: StreamSlice,
474    ) -> Iterable[StreamData]:
475        initial_token = stream_state.get("next_page_token")
476        if initial_token is None:
477            initial_token = self._paginator.get_initial_token()
478        next_page_token: Optional[Mapping[str, Any]] = (
479            {"next_page_token": initial_token} if initial_token else None
480        )
481
482        response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
483
484        last_page_size = 0
485        last_record: Optional[Record] = None
486        for record in records_generator_fn(response):
487            last_page_size += 1
488            last_record = record
489            yield record
490
491        if not response:
492            next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
493        else:
494            last_page_token_value = (
495                next_page_token.get("next_page_token") if next_page_token else None
496            )
497            next_page_token = self._next_page_token(
498                response=response,
499                last_page_size=last_page_size,
500                last_record=last_record,
501                last_page_token_value=last_page_token_value,
502            ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
503
504        if self.cursor:
505            self.cursor.close_slice(
506                StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition)
507            )
508
509        # Always return an empty generator just in case no records were ever yielded
510        yield from []
511
512    def read_records(
513        self,
514        records_schema: Mapping[str, Any],
515        stream_slice: Optional[StreamSlice] = None,
516    ) -> Iterable[StreamData]:
517        """
518        Fetch a stream's records from an HTTP API source
519
520        :param records_schema: json schema to describe record
521        :param stream_slice: The stream slice to read data for
522        :return: The records read from the API source
523        """
524        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
525
526        most_recent_record_from_slice = None
527        record_generator = partial(
528            self._parse_records,
529            stream_slice=stream_slice,
530            stream_state=self.state or {},
531            records_schema=records_schema,
532        )
533
534        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
535            stream_state = self.state
536
537            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
538            # fetch more records. The platform deletes stream state for full refresh streams before starting a
539            # new job, so we don't need to worry about this value existing for the initial attempt
540            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
541                return
542
543            yield from self._read_single_page(record_generator, stream_state, _slice)
544        else:
545            for stream_data in self._read_pages(record_generator, self.state, _slice):
546                current_record = self._extract_record(stream_data, _slice)
547                if self.cursor and current_record:
548                    self.cursor.observe(_slice, current_record)
549
550                yield stream_data
551
552            if self.cursor:
553                self.cursor.close_slice(_slice)
554        return
555
556    # FIXME based on the comment above in SimpleRetriever.read_records, it seems like we can tackle https://github.com/airbytehq/airbyte-internal-issues/issues/6955 and remove this
557
558    def _extract_record(
559        self, stream_data: StreamData, stream_slice: StreamSlice
560    ) -> Optional[Record]:
561        """
562        As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
563        to data to streamline the rest of the process.
564        """
565        if isinstance(stream_data, Record):
566            # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
567            return stream_data
568        elif isinstance(stream_data, (dict, Mapping)):
569            return Record(
570                data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
571            )
572        elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
573            return Record(
574                data=stream_data.record.data,  # type:ignore # AirbyteMessage always has record.data
575                associated_slice=stream_slice,
576                stream_name=self.name,
577            )
578        return None
579
580    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
581    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
582        """
583        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
584
585        :param sync_mode:
586        :param cursor_field:
587        :param stream_state:
588        :return:
589        """
590        return self.stream_slicer.stream_slices()
591
592    # todo: There are a number of things that can be cleaned up when we remove self.cursor and all the related
593    #  SimpleRetriever state management that is handled by the concurrent CDK Framework:
594    #  - ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated
595    #  - ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder
596    #  - test_simple_retriever.py: Remove all imports and usages of legacy cursor components
597    #  - test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed
598    @property
599    def state(self) -> Mapping[str, Any]:
600        return self.cursor.get_stream_state() if self.cursor else {}
601
602    @state.setter
603    def state(self, value: StreamState) -> None:
604        """State setter, accept state serialized by state getter."""
605        if self.cursor:
606            self.cursor.set_initial_state(value)
607
608    def _parse_records(
609        self,
610        response: Optional[requests.Response],
611        stream_state: Mapping[str, Any],
612        records_schema: Mapping[str, Any],
613        stream_slice: Optional[StreamSlice],
614    ) -> Iterable[Record]:
615        yield from self._parse_response(
616            response,
617            stream_slice=stream_slice,
618            stream_state=stream_state,
619            records_schema=records_schema,
620        )
621
622    def must_deduplicate_query_params(self) -> bool:
623        return True
624
625    @staticmethod
626    def _to_partition_key(to_serialize: Any) -> str:
627        # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
628        return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

Retrieves records by synchronously sending requests to fetch records.

The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.

For each stream slice, submit requests until there are no more pages of records to fetch.

This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.

Attributes:
  • stream_name (str): The stream's name
  • stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
  • requester (Requester): The HTTP requester
  • record_selector (HttpSelector): The record selector
  • paginator (Optional[Paginator]): The paginator
  • stream_slicer (Optional[StreamSlicer]): The stream slicer
  • cursor (Optional[cursor]): The cursor
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
SimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.legacy.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, pagination_tracker_factory: Callable[[], airbyte_cdk.sources.declarative.retrievers.pagination_tracker.PaginationTracker] = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
118    @property  # type: ignore
119    def name(self) -> str:
120        """
121        :return: Stream name
122        """
123        return (
124            str(self._name.eval(self.config))
125            if isinstance(self._name, InterpolatedString)
126            else self._name
127        )
Returns

Stream name

primary_key: Union[str, List[str], List[List[str]], NoneType]
302    @property  # type: ignore
303    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
304        """The stream's primary key"""
305        return self._primary_key

The stream's primary key

ignore_stream_slicer_parameters_on_paginated_requests: bool = False
log_formatter: Optional[Callable[[requests.models.Response], Any]] = None
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
512    def read_records(
513        self,
514        records_schema: Mapping[str, Any],
515        stream_slice: Optional[StreamSlice] = None,
516    ) -> Iterable[StreamData]:
517        """
518        Fetch a stream's records from an HTTP API source
519
520        :param records_schema: json schema to describe record
521        :param stream_slice: The stream slice to read data for
522        :return: The records read from the API source
523        """
524        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
525
526        most_recent_record_from_slice = None
527        record_generator = partial(
528            self._parse_records,
529            stream_slice=stream_slice,
530            stream_state=self.state or {},
531            records_schema=records_schema,
532        )
533
534        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
535            stream_state = self.state
536
537            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
538            # fetch more records. The platform deletes stream state for full refresh streams before starting a
539            # new job, so we don't need to worry about this value existing for the initial attempt
540            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
541                return
542
543            yield from self._read_single_page(record_generator, stream_state, _slice)
544        else:
545            for stream_data in self._read_pages(record_generator, self.state, _slice):
546                current_record = self._extract_record(stream_data, _slice)
547                if self.cursor and current_record:
548                    self.cursor.observe(_slice, current_record)
549
550                yield stream_data
551
552            if self.cursor:
553                self.cursor.close_slice(_slice)
554        return

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
581    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
582        """
583        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
584
585        :param sync_mode:
586        :param cursor_field:
587        :param stream_state:
588        :return:
589        """
590        return self.stream_slicer.stream_slices()

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
state: Mapping[str, Any]
598    @property
599    def state(self) -> Mapping[str, Any]:
600        return self.cursor.get_stream_state() if self.cursor else {}

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def must_deduplicate_query_params(self) -> bool:
622    def must_deduplicate_query_params(self) -> bool:
623        return True
@dataclass
class AsyncRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 19@dataclass
 20class AsyncRetriever(Retriever):
 21    config: Config
 22    parameters: InitVar[Mapping[str, Any]]
 23    record_selector: RecordSelector
 24    stream_slicer: AsyncJobPartitionRouter
 25    slice_logger: AlwaysLogSliceLogger = field(
 26        init=False,
 27        default_factory=lambda: AlwaysLogSliceLogger(),
 28    )
 29
 30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 31        self._parameters = parameters
 32
 33    @property
 34    def exit_on_rate_limit(self) -> bool:
 35        """
 36        Whether to exit on rate limit. This is a property of the job repository
 37        and not the stream slicer. The stream slicer is responsible for creating
 38        the jobs, but the job repository is responsible for managing the rate
 39        limits and other job-related properties.
 40
 41        Note:
 42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
 43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
 44           to complete the results.
 45        """
 46        job_orchestrator = self.stream_slicer._job_orchestrator
 47        if job_orchestrator is None:
 48            # Default value when orchestrator is not available
 49            return False
 50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore
 51
 52    @exit_on_rate_limit.setter
 53    def exit_on_rate_limit(self, value: bool) -> None:
 54        """
 55        Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
 56        meaning that the Job cannot be placed / created if the rate limit is reached.
 57        Thus no further work on managing jobs is expected to be done.
 58        """
 59        job_orchestrator = self.stream_slicer._job_orchestrator
 60        if job_orchestrator is not None:
 61            job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value  # type: ignore[attr-defined, assignment]
 62
 63    @property
 64    def state(self) -> StreamState:
 65        """
 66        As a first iteration for sendgrid, there is no state to be managed
 67        """
 68        return {}
 69
 70    @state.setter
 71    def state(self, value: StreamState) -> None:
 72        """
 73        As a first iteration for sendgrid, there is no state to be managed
 74        """
 75        pass
 76
 77    def _get_stream_state(self) -> StreamState:
 78        """
 79        Gets the current state of the stream.
 80
 81        Returns:
 82            StreamState: Mapping[str, Any]
 83        """
 84
 85        return self.state
 86
 87    def _validate_and_get_stream_slice_jobs(
 88        self, stream_slice: Optional[StreamSlice] = None
 89    ) -> Iterable[AsyncJob]:
 90        """
 91        Validates the stream_slice argument and returns the partition from it.
 92
 93        Args:
 94            stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
 95
 96        Returns:
 97            AsyncPartition: The partition extracted from the stream_slice.
 98
 99        Raises:
100            AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
101
102        """
103        return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
104
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()
107
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )
AsyncRetriever( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], record_selector: airbyte_cdk.RecordSelector, stream_slicer: airbyte_cdk.sources.declarative.partition_routers.AsyncJobPartitionRouter)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
record_selector: airbyte_cdk.RecordSelector
exit_on_rate_limit: bool
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore

Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.

Note:
  • If the creation_requester cannot place / create the job - it might be the case of the RateLimits
  • If the creation_requester can place / create the job - it means all other requesters should successfully manage to complete the results.
state: Mapping[str, Any]
63    @property
64    def state(self) -> StreamState:
65        """
66        As a first iteration for sendgrid, there is no state to be managed
67        """
68        return {}

As a first iteration for sendgrid, there is no state to be managed

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()

Returns the stream slices

def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class LazySimpleRetriever(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
651@deprecated(
652    "This class is experimental. Use at your own risk.",
653    category=ExperimentalClassWarning,
654)
655@dataclass
656class LazySimpleRetriever(SimpleRetriever):
657    """
658    A retriever that supports lazy loading from parent streams.
659    """
660
661    def _read_pages(
662        self,
663        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
664        stream_state: Mapping[str, Any],
665        stream_slice: StreamSlice,
666    ) -> Iterable[Record]:
667        response = stream_slice.extra_fields["child_response"]
668        if response:
669            last_page_size, last_record = 0, None
670            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
671                last_page_size += 1
672                last_record = record
673                yield record
674
675            next_page_token = self._next_page_token(response, last_page_size, last_record, None)
676            if next_page_token:
677                yield from self._paginate(
678                    next_page_token,
679                    records_generator_fn,
680                    stream_state,
681                    stream_slice,
682                )
683
684            yield from []
685        else:
686            yield from self._read_pages(records_generator_fn, stream_state, stream_slice)
687
688    def _paginate(
689        self,
690        next_page_token: Any,
691        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
692        stream_state: Mapping[str, Any],
693        stream_slice: StreamSlice,
694    ) -> Iterable[Record]:
695        """Handle pagination by fetching subsequent pages."""
696        pagination_complete = False
697
698        while not pagination_complete:
699            response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
700            last_page_size, last_record = 0, None
701
702            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
703                last_page_size += 1
704                last_record = record
705                yield record
706
707            if not response:
708                pagination_complete = True
709            else:
710                last_page_token_value = (
711                    next_page_token.get("next_page_token") if next_page_token else None
712                )
713                next_page_token = self._next_page_token(
714                    response, last_page_size, last_record, last_page_token_value
715                )
716
717                if not next_page_token:
718                    pagination_complete = True

A retriever that supports lazy loading from parent streams.

LazySimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.legacy.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, pagination_tracker_factory: Callable[[], airbyte_cdk.sources.declarative.retrievers.pagination_tracker.PaginationTracker] = <factory>)