airbyte_cdk.sources.declarative.retrievers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
 8    LazySimpleRetriever,
 9    SimpleRetriever,
10    SimpleRetrieverTestReadDecorator,
11)
12
13__all__ = [
14    "Retriever",
15    "SimpleRetriever",
16    "SimpleRetrieverTestReadDecorator",
17    "AsyncRetriever",
18    "LazySimpleRetriever",
19]
class Retriever:
14class Retriever:
15    """
16    Responsible for fetching a stream's records from an HTTP API source.
17    """
18
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """
32
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""
36
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """
51
52    @state.setter
53    @abstractmethod
54    def state(self, value: StreamState) -> None:
55        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

@dataclass
class SimpleRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 55@dataclass
 56class SimpleRetriever(Retriever):
 57    """
 58    Retrieves records by synchronously sending requests to fetch records.
 59
 60    The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
 61
 62    For each stream slice, submit requests until there are no more pages of records to fetch.
 63
 64    This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
 65    As a result, some of the parameters passed to some methods are unused.
 66    The two will be decoupled in a future release.
 67
 68    Attributes:
 69        stream_name (str): The stream's name
 70        stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
 71        requester (Requester): The HTTP requester
 72        record_selector (HttpSelector): The record selector
 73        paginator (Optional[Paginator]): The paginator
 74        stream_slicer (Optional[StreamSlicer]): The stream slicer
 75        cursor (Optional[cursor]): The cursor
 76        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 77    """
 78
 79    requester: Requester
 80    record_selector: HttpSelector
 81    config: Config
 82    parameters: InitVar[Mapping[str, Any]]
 83    name: str
 84    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 85    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 86    _primary_key: str = field(init=False, repr=False, default="")
 87    paginator: Optional[Paginator] = None
 88    stream_slicer: StreamSlicer = field(
 89        default_factory=lambda: SinglePartitionRouter(parameters={})
 90    )
 91    request_option_provider: RequestOptionsProvider = field(
 92        default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
 93    )
 94    cursor: Optional[DeclarativeCursor] = None
 95    ignore_stream_slicer_parameters_on_paginated_requests: bool = False
 96    additional_query_properties: Optional[QueryProperties] = None
 97
 98    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 99        self._paginator = self.paginator or NoPagination(parameters=parameters)
100        self._parameters = parameters
101        self._name = (
102            InterpolatedString(self._name, parameters=parameters)
103            if isinstance(self._name, str)
104            else self._name
105        )
106
107    @property  # type: ignore
108    def name(self) -> str:
109        """
110        :return: Stream name
111        """
112        return (
113            str(self._name.eval(self.config))
114            if isinstance(self._name, InterpolatedString)
115            else self._name
116        )
117
118    @name.setter
119    def name(self, value: str) -> None:
120        if not isinstance(value, property):
121            self._name = value
122
123    def _get_mapping(
124        self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
125    ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
126        """
127        Get mapping from the provided method, and get the keys of the mapping.
128        If the method returns a string, it will return the string and an empty set.
129        If the method returns a dict, it will return the dict and its keys.
130        """
131        mapping = method(**kwargs) or {}
132        keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
133        return mapping, keys
134
135    def _get_request_options(
136        self,
137        stream_state: Optional[StreamData],
138        stream_slice: Optional[StreamSlice],
139        next_page_token: Optional[Mapping[str, Any]],
140        paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
141        stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
142    ) -> Union[Mapping[str, Any], str]:
143        """
144        Get the request_option from the paginator and the stream slicer.
145        Raise a ValueError if there's a key collision
146        Returned merged mapping otherwise
147        """
148        # FIXME we should eventually remove the usage of stream_state as part of the interpolation
149
150        is_body_json = paginator_method.__name__ == "get_request_body_json"
151
152        mappings = [
153            paginator_method(
154                stream_slice=stream_slice,
155                next_page_token=next_page_token,
156            ),
157        ]
158        if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
159            mappings.append(
160                stream_slicer_method(
161                    stream_slice=stream_slice,
162                    next_page_token=next_page_token,
163                )
164            )
165        return combine_mappings(mappings, allow_same_value_merge=is_body_json)
166
167    def _request_headers(
168        self,
169        stream_state: Optional[StreamData] = None,
170        stream_slice: Optional[StreamSlice] = None,
171        next_page_token: Optional[Mapping[str, Any]] = None,
172    ) -> Mapping[str, Any]:
173        """
174        Specifies request headers.
175        Authentication headers will overwrite any overlapping headers returned from this method.
176        """
177        headers = self._get_request_options(
178            stream_state,
179            stream_slice,
180            next_page_token,
181            self._paginator.get_request_headers,
182            self.request_option_provider.get_request_headers,
183        )
184        if isinstance(headers, str):
185            raise ValueError("Request headers cannot be a string")
186        return {str(k): str(v) for k, v in headers.items()}
187
188    def _request_params(
189        self,
190        stream_state: Optional[StreamData] = None,
191        stream_slice: Optional[StreamSlice] = None,
192        next_page_token: Optional[Mapping[str, Any]] = None,
193    ) -> Mapping[str, Any]:
194        """
195        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
196
197        E.g: you might want to define query parameters for paging if next_page_token is not None.
198        """
199        params = self._get_request_options(
200            stream_state,
201            stream_slice,
202            next_page_token,
203            self._paginator.get_request_params,
204            self.request_option_provider.get_request_params,
205        )
206        if isinstance(params, str):
207            raise ValueError("Request params cannot be a string")
208        return params
209
210    def _request_body_data(
211        self,
212        stream_state: Optional[StreamData] = None,
213        stream_slice: Optional[StreamSlice] = None,
214        next_page_token: Optional[Mapping[str, Any]] = None,
215    ) -> Union[Mapping[str, Any], str]:
216        """
217        Specifies how to populate the body of the request with a non-JSON payload.
218
219        If returns a ready text that it will be sent as is.
220        If returns a dict that it will be converted to a urlencoded form.
221        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
222
223        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
224        """
225        return self._get_request_options(
226            stream_state,
227            stream_slice,
228            next_page_token,
229            self._paginator.get_request_body_data,
230            self.request_option_provider.get_request_body_data,
231        )
232
233    def _request_body_json(
234        self,
235        stream_state: Optional[StreamData] = None,
236        stream_slice: Optional[StreamSlice] = None,
237        next_page_token: Optional[Mapping[str, Any]] = None,
238    ) -> Optional[Mapping[str, Any]]:
239        """
240        Specifies how to populate the body of the request with a JSON payload.
241
242        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
243        """
244        body_json = self._get_request_options(
245            stream_state,
246            stream_slice,
247            next_page_token,
248            self._paginator.get_request_body_json,
249            self.request_option_provider.get_request_body_json,
250        )
251        if isinstance(body_json, str):
252            raise ValueError("Request body json cannot be a string")
253        return body_json
254
255    def _paginator_path(
256        self,
257        next_page_token: Optional[Mapping[str, Any]] = None,
258        stream_state: Optional[Mapping[str, Any]] = None,
259        stream_slice: Optional[StreamSlice] = None,
260    ) -> Optional[str]:
261        """
262        If the paginator points to a path, follow it, else return nothing so the requester is used.
263        :param next_page_token:
264        :return:
265        """
266        return self._paginator.path(
267            next_page_token=next_page_token,
268            stream_state=stream_state,
269            stream_slice=stream_slice,
270        )
271
272    def _parse_response(
273        self,
274        response: Optional[requests.Response],
275        stream_state: StreamState,
276        records_schema: Mapping[str, Any],
277        stream_slice: Optional[StreamSlice] = None,
278        next_page_token: Optional[Mapping[str, Any]] = None,
279    ) -> Iterable[Record]:
280        if not response:
281            yield from []
282        else:
283            yield from self.record_selector.select_records(
284                response=response,
285                stream_state=stream_state,
286                records_schema=records_schema,
287                stream_slice=stream_slice,
288                next_page_token=next_page_token,
289            )
290
291    @property  # type: ignore
292    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
293        """The stream's primary key"""
294        return self._primary_key
295
296    @primary_key.setter
297    def primary_key(self, value: str) -> None:
298        if not isinstance(value, property):
299            self._primary_key = value
300
301    def _next_page_token(
302        self,
303        response: requests.Response,
304        last_page_size: int,
305        last_record: Optional[Record],
306        last_page_token_value: Optional[Any],
307    ) -> Optional[Mapping[str, Any]]:
308        """
309        Specifies a pagination strategy.
310
311        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
312
313        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
314        """
315        return self._paginator.next_page_token(
316            response=response,
317            last_page_size=last_page_size,
318            last_record=last_record,
319            last_page_token_value=last_page_token_value,
320        )
321
322    def _fetch_next_page(
323        self,
324        stream_state: Mapping[str, Any],
325        stream_slice: StreamSlice,
326        next_page_token: Optional[Mapping[str, Any]] = None,
327    ) -> Optional[requests.Response]:
328        return self.requester.send_request(
329            path=self._paginator_path(
330                next_page_token=next_page_token,
331                stream_state=stream_state,
332                stream_slice=stream_slice,
333            ),
334            stream_state=stream_state,
335            stream_slice=stream_slice,
336            next_page_token=next_page_token,
337            request_headers=self._request_headers(
338                stream_state=stream_state,
339                stream_slice=stream_slice,
340                next_page_token=next_page_token,
341            ),
342            request_params=self._request_params(
343                stream_state=stream_state,
344                stream_slice=stream_slice,
345                next_page_token=next_page_token,
346            ),
347            request_body_data=self._request_body_data(
348                stream_state=stream_state,
349                stream_slice=stream_slice,
350                next_page_token=next_page_token,
351            ),
352            request_body_json=self._request_body_json(
353                stream_state=stream_state,
354                stream_slice=stream_slice,
355                next_page_token=next_page_token,
356            ),
357        )
358
359    # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
360    def _read_pages(
361        self,
362        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
363        stream_state: Mapping[str, Any],
364        stream_slice: StreamSlice,
365    ) -> Iterable[Record]:
366        pagination_complete = False
367        initial_token = self._paginator.get_initial_token()
368        next_page_token: Optional[Mapping[str, Any]] = (
369            {"next_page_token": initial_token} if initial_token is not None else None
370        )
371        while not pagination_complete:
372            property_chunks: List[List[str]] = (
373                list(
374                    self.additional_query_properties.get_request_property_chunks(
375                        stream_slice=stream_slice
376                    )
377                )
378                if self.additional_query_properties
379                else [
380                    []
381                ]  # A single empty property chunk represents the case where property chunking is not configured
382            )
383
384            merged_records: MutableMapping[str, Any] = defaultdict(dict)
385            last_page_size = 0
386            last_record: Optional[Record] = None
387            response: Optional[requests.Response] = None
388            for properties in property_chunks:
389                if len(properties) > 0:
390                    stream_slice = StreamSlice(
391                        partition=stream_slice.partition or {},
392                        cursor_slice=stream_slice.cursor_slice or {},
393                        extra_fields={"query_properties": properties},
394                    )
395
396                response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
397                for current_record in records_generator_fn(response):
398                    if (
399                        current_record
400                        and self.additional_query_properties
401                        and self.additional_query_properties.property_chunking
402                    ):
403                        merge_key = (
404                            self.additional_query_properties.property_chunking.get_merge_key(
405                                current_record
406                            )
407                        )
408                        if merge_key:
409                            _deep_merge(merged_records[merge_key], current_record)
410                        else:
411                            # We should still emit records even if the record did not have a merge key
412                            last_page_size += 1
413                            last_record = current_record
414                            yield current_record
415                    else:
416                        last_page_size += 1
417                        last_record = current_record
418                        yield current_record
419
420            if (
421                self.additional_query_properties
422                and self.additional_query_properties.property_chunking
423            ):
424                for merged_record in merged_records.values():
425                    record = Record(
426                        data=merged_record, stream_name=self.name, associated_slice=stream_slice
427                    )
428                    last_page_size += 1
429                    last_record = record
430                    yield record
431
432            if not response:
433                pagination_complete = True
434            else:
435                last_page_token_value = (
436                    next_page_token.get("next_page_token") if next_page_token else None
437                )
438                next_page_token = self._next_page_token(
439                    response=response,
440                    last_page_size=last_page_size,
441                    last_record=last_record,
442                    last_page_token_value=last_page_token_value,
443                )
444                if not next_page_token:
445                    pagination_complete = True
446
447        # Always return an empty generator just in case no records were ever yielded
448        yield from []
449
450    def _read_single_page(
451        self,
452        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
453        stream_state: Mapping[str, Any],
454        stream_slice: StreamSlice,
455    ) -> Iterable[StreamData]:
456        initial_token = stream_state.get("next_page_token")
457        if initial_token is None:
458            initial_token = self._paginator.get_initial_token()
459        next_page_token: Optional[Mapping[str, Any]] = (
460            {"next_page_token": initial_token} if initial_token else None
461        )
462
463        response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
464
465        last_page_size = 0
466        last_record: Optional[Record] = None
467        for record in records_generator_fn(response):
468            last_page_size += 1
469            last_record = record
470            yield record
471
472        if not response:
473            next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
474        else:
475            last_page_token_value = (
476                next_page_token.get("next_page_token") if next_page_token else None
477            )
478            next_page_token = self._next_page_token(
479                response=response,
480                last_page_size=last_page_size,
481                last_record=last_record,
482                last_page_token_value=last_page_token_value,
483            ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
484
485        if self.cursor:
486            self.cursor.close_slice(
487                StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition)
488            )
489
490        # Always return an empty generator just in case no records were ever yielded
491        yield from []
492
493    def read_records(
494        self,
495        records_schema: Mapping[str, Any],
496        stream_slice: Optional[StreamSlice] = None,
497    ) -> Iterable[StreamData]:
498        """
499        Fetch a stream's records from an HTTP API source
500
501        :param records_schema: json schema to describe record
502        :param stream_slice: The stream slice to read data for
503        :return: The records read from the API source
504        """
505        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
506
507        most_recent_record_from_slice = None
508        record_generator = partial(
509            self._parse_records,
510            stream_slice=stream_slice,
511            stream_state=self.state or {},
512            records_schema=records_schema,
513        )
514
515        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
516            stream_state = self.state
517
518            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
519            # fetch more records. The platform deletes stream state for full refresh streams before starting a
520            # new job, so we don't need to worry about this value existing for the initial attempt
521            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
522                return
523
524            yield from self._read_single_page(record_generator, stream_state, _slice)
525        else:
526            for stream_data in self._read_pages(record_generator, self.state, _slice):
527                current_record = self._extract_record(stream_data, _slice)
528                if self.cursor and current_record:
529                    self.cursor.observe(_slice, current_record)
530
531                # Latest record read, not necessarily within slice boundaries.
532                # TODO Remove once all custom components implement `observe` method.
533                # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
534                most_recent_record_from_slice = self._get_most_recent_record(
535                    most_recent_record_from_slice, current_record, _slice
536                )
537                yield stream_data
538
539            if self.cursor:
540                self.cursor.close_slice(_slice, most_recent_record_from_slice)
541        return
542
543    def _get_most_recent_record(
544        self,
545        current_most_recent: Optional[Record],
546        current_record: Optional[Record],
547        stream_slice: StreamSlice,
548    ) -> Optional[Record]:
549        if self.cursor and current_record:
550            if not current_most_recent:
551                return current_record
552            else:
553                return (
554                    current_most_recent
555                    if self.cursor.is_greater_than_or_equal(current_most_recent, current_record)
556                    else current_record
557                )
558        else:
559            return None
560
561    def _extract_record(
562        self, stream_data: StreamData, stream_slice: StreamSlice
563    ) -> Optional[Record]:
564        """
565        As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
566        to data to streamline the rest of the process.
567        """
568        if isinstance(stream_data, Record):
569            # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
570            return stream_data
571        elif isinstance(stream_data, (dict, Mapping)):
572            return Record(
573                data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
574            )
575        elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
576            return Record(
577                data=stream_data.record.data,  # type:ignore # AirbyteMessage always has record.data
578                associated_slice=stream_slice,
579                stream_name=self.name,
580            )
581        return None
582
583    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
584    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
585        """
586        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
587
588        :param sync_mode:
589        :param cursor_field:
590        :param stream_state:
591        :return:
592        """
593        return self.stream_slicer.stream_slices()
594
595    @property
596    def state(self) -> Mapping[str, Any]:
597        return self.cursor.get_stream_state() if self.cursor else {}
598
599    @state.setter
600    def state(self, value: StreamState) -> None:
601        """State setter, accept state serialized by state getter."""
602        if self.cursor:
603            self.cursor.set_initial_state(value)
604
605    def _parse_records(
606        self,
607        response: Optional[requests.Response],
608        stream_state: Mapping[str, Any],
609        records_schema: Mapping[str, Any],
610        stream_slice: Optional[StreamSlice],
611    ) -> Iterable[Record]:
612        yield from self._parse_response(
613            response,
614            stream_slice=stream_slice,
615            stream_state=stream_state,
616            records_schema=records_schema,
617        )
618
619    def must_deduplicate_query_params(self) -> bool:
620        return True
621
622    @staticmethod
623    def _to_partition_key(to_serialize: Any) -> str:
624        # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
625        return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

Retrieves records by synchronously sending requests to fetch records.

The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.

For each stream slice, submit requests until there are no more pages of records to fetch.

This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.

Attributes:
  • stream_name (str): The stream's name
  • stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
  • requester (Requester): The HTTP requester
  • record_selector (HttpSelector): The record selector
  • paginator (Optional[Paginator]): The paginator
  • stream_slicer (Optional[StreamSlicer]): The stream slicer
  • cursor (Optional[cursor]): The cursor
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
SimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
107    @property  # type: ignore
108    def name(self) -> str:
109        """
110        :return: Stream name
111        """
112        return (
113            str(self._name.eval(self.config))
114            if isinstance(self._name, InterpolatedString)
115            else self._name
116        )
Returns

Stream name

primary_key: Union[str, List[str], List[List[str]], NoneType]
291    @property  # type: ignore
292    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
293        """The stream's primary key"""
294        return self._primary_key

The stream's primary key

ignore_stream_slicer_parameters_on_paginated_requests: bool = False
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
493    def read_records(
494        self,
495        records_schema: Mapping[str, Any],
496        stream_slice: Optional[StreamSlice] = None,
497    ) -> Iterable[StreamData]:
498        """
499        Fetch a stream's records from an HTTP API source
500
501        :param records_schema: json schema to describe record
502        :param stream_slice: The stream slice to read data for
503        :return: The records read from the API source
504        """
505        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
506
507        most_recent_record_from_slice = None
508        record_generator = partial(
509            self._parse_records,
510            stream_slice=stream_slice,
511            stream_state=self.state or {},
512            records_schema=records_schema,
513        )
514
515        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
516            stream_state = self.state
517
518            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
519            # fetch more records. The platform deletes stream state for full refresh streams before starting a
520            # new job, so we don't need to worry about this value existing for the initial attempt
521            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
522                return
523
524            yield from self._read_single_page(record_generator, stream_state, _slice)
525        else:
526            for stream_data in self._read_pages(record_generator, self.state, _slice):
527                current_record = self._extract_record(stream_data, _slice)
528                if self.cursor and current_record:
529                    self.cursor.observe(_slice, current_record)
530
531                # Latest record read, not necessarily within slice boundaries.
532                # TODO Remove once all custom components implement `observe` method.
533                # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
534                most_recent_record_from_slice = self._get_most_recent_record(
535                    most_recent_record_from_slice, current_record, _slice
536                )
537                yield stream_data
538
539            if self.cursor:
540                self.cursor.close_slice(_slice, most_recent_record_from_slice)
541        return

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
584    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
585        """
586        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
587
588        :param sync_mode:
589        :param cursor_field:
590        :param stream_state:
591        :return:
592        """
593        return self.stream_slicer.stream_slices()

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
state: Mapping[str, Any]
595    @property
596    def state(self) -> Mapping[str, Any]:
597        return self.cursor.get_stream_state() if self.cursor else {}

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def must_deduplicate_query_params(self) -> bool:
619    def must_deduplicate_query_params(self) -> bool:
620        return True
@dataclass
class SimpleRetrieverTestReadDecorator(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
648@dataclass
649class SimpleRetrieverTestReadDecorator(SimpleRetriever):
650    """
651    In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
652    slices that are queried throughout a read command.
653    """
654
655    maximum_number_of_slices: int = 5
656
657    def __post_init__(self, options: Mapping[str, Any]) -> None:
658        super().__post_init__(options)
659        if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
660            raise ValueError(
661                f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
662            )
663
664    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
665    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
666        return islice(super().stream_slices(), self.maximum_number_of_slices)
667
668    def _fetch_next_page(
669        self,
670        stream_state: Mapping[str, Any],
671        stream_slice: StreamSlice,
672        next_page_token: Optional[Mapping[str, Any]] = None,
673    ) -> Optional[requests.Response]:
674        return self.requester.send_request(
675            path=self._paginator_path(
676                next_page_token=next_page_token,
677                stream_state=stream_state,
678                stream_slice=stream_slice,
679            ),
680            stream_state=stream_state,
681            stream_slice=stream_slice,
682            next_page_token=next_page_token,
683            request_headers=self._request_headers(
684                stream_state=stream_state,
685                stream_slice=stream_slice,
686                next_page_token=next_page_token,
687            ),
688            request_params=self._request_params(
689                stream_state=stream_state,
690                stream_slice=stream_slice,
691                next_page_token=next_page_token,
692            ),
693            request_body_data=self._request_body_data(
694                stream_state=stream_state,
695                stream_slice=stream_slice,
696                next_page_token=next_page_token,
697            ),
698            request_body_json=self._request_body_json(
699                stream_state=stream_state,
700                stream_slice=stream_slice,
701                next_page_token=next_page_token,
702            ),
703            log_formatter=lambda response: format_http_message(
704                response,
705                f"Stream '{self.name}' request",
706                f"Request performed in order to extract records for stream '{self.name}'",
707                self.name,
708            ),
709        )

In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command.

SimpleRetrieverTestReadDecorator( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, maximum_number_of_slices: int = 5)
maximum_number_of_slices: int = 5
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
665    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
666        return islice(super().stream_slices(), self.maximum_number_of_slices)

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
@dataclass
class AsyncRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 19@dataclass
 20class AsyncRetriever(Retriever):
 21    config: Config
 22    parameters: InitVar[Mapping[str, Any]]
 23    record_selector: RecordSelector
 24    stream_slicer: AsyncJobPartitionRouter
 25    slice_logger: AlwaysLogSliceLogger = field(
 26        init=False,
 27        default_factory=lambda: AlwaysLogSliceLogger(),
 28    )
 29
 30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 31        self._parameters = parameters
 32
 33    @property
 34    def exit_on_rate_limit(self) -> bool:
 35        """
 36        Whether to exit on rate limit. This is a property of the job repository
 37        and not the stream slicer. The stream slicer is responsible for creating
 38        the jobs, but the job repository is responsible for managing the rate
 39        limits and other job-related properties.
 40
 41        Note:
 42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
 43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
 44           to complete the results.
 45        """
 46        job_orchestrator = self.stream_slicer._job_orchestrator
 47        if job_orchestrator is None:
 48            # Default value when orchestrator is not available
 49            return False
 50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore
 51
 52    @exit_on_rate_limit.setter
 53    def exit_on_rate_limit(self, value: bool) -> None:
 54        """
 55        Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
 56        meaning that the Job cannot be placed / created if the rate limit is reached.
 57        Thus no further work on managing jobs is expected to be done.
 58        """
 59        job_orchestrator = self.stream_slicer._job_orchestrator
 60        if job_orchestrator is not None:
 61            job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value  # type: ignore[attr-defined, assignment]
 62
 63    @property
 64    def state(self) -> StreamState:
 65        """
 66        As a first iteration for sendgrid, there is no state to be managed
 67        """
 68        return {}
 69
 70    @state.setter
 71    def state(self, value: StreamState) -> None:
 72        """
 73        As a first iteration for sendgrid, there is no state to be managed
 74        """
 75        pass
 76
 77    def _get_stream_state(self) -> StreamState:
 78        """
 79        Gets the current state of the stream.
 80
 81        Returns:
 82            StreamState: Mapping[str, Any]
 83        """
 84
 85        return self.state
 86
 87    def _validate_and_get_stream_slice_jobs(
 88        self, stream_slice: Optional[StreamSlice] = None
 89    ) -> Iterable[AsyncJob]:
 90        """
 91        Validates the stream_slice argument and returns the partition from it.
 92
 93        Args:
 94            stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
 95
 96        Returns:
 97            AsyncPartition: The partition extracted from the stream_slice.
 98
 99        Raises:
100            AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
101
102        """
103        return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
104
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()
107
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )
AsyncRetriever( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], record_selector: airbyte_cdk.RecordSelector, stream_slicer: airbyte_cdk.sources.declarative.partition_routers.AsyncJobPartitionRouter)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
record_selector: airbyte_cdk.RecordSelector
exit_on_rate_limit: bool
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore

Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.

Note:
  • If the creation_requester cannot place / create the job - it might be the case of the RateLimits
  • If the creation_requester can place / create the job - it means all other requesters should successfully manage to complete the results.
state: Mapping[str, Any]
63    @property
64    def state(self) -> StreamState:
65        """
66        As a first iteration for sendgrid, there is no state to be managed
67        """
68        return {}

As a first iteration for sendgrid, there is no state to be managed

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()

Returns the stream slices

def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class LazySimpleRetriever(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
712@deprecated(
713    "This class is experimental. Use at your own risk.",
714    category=ExperimentalClassWarning,
715)
716@dataclass
717class LazySimpleRetriever(SimpleRetriever):
718    """
719    A retriever that supports lazy loading from parent streams.
720    """
721
722    def _read_pages(
723        self,
724        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
725        stream_state: Mapping[str, Any],
726        stream_slice: StreamSlice,
727    ) -> Iterable[Record]:
728        response = stream_slice.extra_fields["child_response"]
729        if response:
730            last_page_size, last_record = 0, None
731            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
732                last_page_size += 1
733                last_record = record
734                yield record
735
736            next_page_token = self._next_page_token(response, last_page_size, last_record, None)
737            if next_page_token:
738                yield from self._paginate(
739                    next_page_token,
740                    records_generator_fn,
741                    stream_state,
742                    stream_slice,
743                )
744
745            yield from []
746        else:
747            yield from self._read_pages(records_generator_fn, stream_state, stream_slice)
748
749    def _paginate(
750        self,
751        next_page_token: Any,
752        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
753        stream_state: Mapping[str, Any],
754        stream_slice: StreamSlice,
755    ) -> Iterable[Record]:
756        """Handle pagination by fetching subsequent pages."""
757        pagination_complete = False
758
759        while not pagination_complete:
760            response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
761            last_page_size, last_record = 0, None
762
763            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
764                last_page_size += 1
765                last_record = record
766                yield record
767
768            if not response:
769                pagination_complete = True
770            else:
771                last_page_token_value = (
772                    next_page_token.get("next_page_token") if next_page_token else None
773                )
774                next_page_token = self._next_page_token(
775                    response, last_page_size, last_record, last_page_token_value
776                )
777
778                if not next_page_token:
779                    pagination_complete = True

A retriever that supports lazy loading from parent streams.

LazySimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None)