airbyte_cdk.sources.declarative.retrievers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
 8    LazySimpleRetriever,
 9    SimpleRetriever,
10    SimpleRetrieverTestReadDecorator,
11)
12
13__all__ = [
14    "Retriever",
15    "SimpleRetriever",
16    "SimpleRetrieverTestReadDecorator",
17    "AsyncRetriever",
18    "LazySimpleRetriever",
19]
class Retriever:
14class Retriever:
15    """
16    Responsible for fetching a stream's records from an HTTP API source.
17    """
18
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """
32
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""
36
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """
51
52    @state.setter
53    @abstractmethod
54    def state(self, value: StreamState) -> None:
55        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

@dataclass
class SimpleRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 54@dataclass
 55class SimpleRetriever(Retriever):
 56    """
 57    Retrieves records by synchronously sending requests to fetch records.
 58
 59    The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
 60
 61    For each stream slice, submit requests until there are no more pages of records to fetch.
 62
 63    This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
 64    As a result, some of the parameters passed to some methods are unused.
 65    The two will be decoupled in a future release.
 66
 67    Attributes:
 68        stream_name (str): The stream's name
 69        stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
 70        requester (Requester): The HTTP requester
 71        record_selector (HttpSelector): The record selector
 72        paginator (Optional[Paginator]): The paginator
 73        stream_slicer (Optional[StreamSlicer]): The stream slicer
 74        cursor (Optional[cursor]): The cursor
 75        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 76    """
 77
 78    requester: Requester
 79    record_selector: HttpSelector
 80    config: Config
 81    parameters: InitVar[Mapping[str, Any]]
 82    name: str
 83    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 84    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 85    _primary_key: str = field(init=False, repr=False, default="")
 86    paginator: Optional[Paginator] = None
 87    stream_slicer: StreamSlicer = field(
 88        default_factory=lambda: SinglePartitionRouter(parameters={})
 89    )
 90    request_option_provider: RequestOptionsProvider = field(
 91        default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
 92    )
 93    cursor: Optional[DeclarativeCursor] = None
 94    ignore_stream_slicer_parameters_on_paginated_requests: bool = False
 95    additional_query_properties: Optional[QueryProperties] = None
 96    log_formatter: Optional[Callable[[requests.Response], Any]] = None
 97
 98    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 99        self._paginator = self.paginator or NoPagination(parameters=parameters)
100        self._parameters = parameters
101        self._name = (
102            InterpolatedString(self._name, parameters=parameters)
103            if isinstance(self._name, str)
104            else self._name
105        )
106
107    @property  # type: ignore
108    def name(self) -> str:
109        """
110        :return: Stream name
111        """
112        return (
113            str(self._name.eval(self.config))
114            if isinstance(self._name, InterpolatedString)
115            else self._name
116        )
117
118    @name.setter
119    def name(self, value: str) -> None:
120        if not isinstance(value, property):
121            self._name = value
122
123    def _get_mapping(
124        self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
125    ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
126        """
127        Get mapping from the provided method, and get the keys of the mapping.
128        If the method returns a string, it will return the string and an empty set.
129        If the method returns a dict, it will return the dict and its keys.
130        """
131        mapping = method(**kwargs) or {}
132        keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
133        return mapping, keys
134
135    def _get_request_options(
136        self,
137        stream_state: Optional[StreamData],
138        stream_slice: Optional[StreamSlice],
139        next_page_token: Optional[Mapping[str, Any]],
140        paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
141        stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
142    ) -> Union[Mapping[str, Any], str]:
143        """
144        Get the request_option from the paginator and the stream slicer.
145        Raise a ValueError if there's a key collision
146        Returned merged mapping otherwise
147        """
148        # FIXME we should eventually remove the usage of stream_state as part of the interpolation
149
150        is_body_json = paginator_method.__name__ == "get_request_body_json"
151
152        mappings = [
153            paginator_method(
154                stream_slice=stream_slice,
155                next_page_token=next_page_token,
156            ),
157        ]
158        if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
159            mappings.append(
160                stream_slicer_method(
161                    stream_slice=stream_slice,
162                    next_page_token=next_page_token,
163                )
164            )
165        return combine_mappings(mappings, allow_same_value_merge=is_body_json)
166
167    def _request_headers(
168        self,
169        stream_state: Optional[StreamData] = None,
170        stream_slice: Optional[StreamSlice] = None,
171        next_page_token: Optional[Mapping[str, Any]] = None,
172    ) -> Mapping[str, Any]:
173        """
174        Specifies request headers.
175        Authentication headers will overwrite any overlapping headers returned from this method.
176        """
177        headers = self._get_request_options(
178            stream_state,
179            stream_slice,
180            next_page_token,
181            self._paginator.get_request_headers,
182            self.request_option_provider.get_request_headers,
183        )
184        if isinstance(headers, str):
185            raise ValueError("Request headers cannot be a string")
186        return {str(k): str(v) for k, v in headers.items()}
187
188    def _request_params(
189        self,
190        stream_state: Optional[StreamData] = None,
191        stream_slice: Optional[StreamSlice] = None,
192        next_page_token: Optional[Mapping[str, Any]] = None,
193    ) -> Mapping[str, Any]:
194        """
195        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
196
197        E.g: you might want to define query parameters for paging if next_page_token is not None.
198        """
199        params = self._get_request_options(
200            stream_state,
201            stream_slice,
202            next_page_token,
203            self._paginator.get_request_params,
204            self.request_option_provider.get_request_params,
205        )
206        if isinstance(params, str):
207            raise ValueError("Request params cannot be a string")
208        return params
209
210    def _request_body_data(
211        self,
212        stream_state: Optional[StreamData] = None,
213        stream_slice: Optional[StreamSlice] = None,
214        next_page_token: Optional[Mapping[str, Any]] = None,
215    ) -> Union[Mapping[str, Any], str]:
216        """
217        Specifies how to populate the body of the request with a non-JSON payload.
218
219        If returns a ready text that it will be sent as is.
220        If returns a dict that it will be converted to a urlencoded form.
221        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
222
223        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
224        """
225        return self._get_request_options(
226            stream_state,
227            stream_slice,
228            next_page_token,
229            self._paginator.get_request_body_data,
230            self.request_option_provider.get_request_body_data,
231        )
232
233    def _request_body_json(
234        self,
235        stream_state: Optional[StreamData] = None,
236        stream_slice: Optional[StreamSlice] = None,
237        next_page_token: Optional[Mapping[str, Any]] = None,
238    ) -> Optional[Mapping[str, Any]]:
239        """
240        Specifies how to populate the body of the request with a JSON payload.
241
242        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
243        """
244        body_json = self._get_request_options(
245            stream_state,
246            stream_slice,
247            next_page_token,
248            self._paginator.get_request_body_json,
249            self.request_option_provider.get_request_body_json,
250        )
251        if isinstance(body_json, str):
252            raise ValueError("Request body json cannot be a string")
253        return body_json
254
255    def _paginator_path(
256        self,
257        next_page_token: Optional[Mapping[str, Any]] = None,
258        stream_state: Optional[Mapping[str, Any]] = None,
259        stream_slice: Optional[StreamSlice] = None,
260    ) -> Optional[str]:
261        """
262        If the paginator points to a path, follow it, else return nothing so the requester is used.
263        :param next_page_token:
264        :return:
265        """
266        return self._paginator.path(
267            next_page_token=next_page_token,
268            stream_state=stream_state,
269            stream_slice=stream_slice,
270        )
271
272    def _parse_response(
273        self,
274        response: Optional[requests.Response],
275        stream_state: StreamState,
276        records_schema: Mapping[str, Any],
277        stream_slice: Optional[StreamSlice] = None,
278        next_page_token: Optional[Mapping[str, Any]] = None,
279    ) -> Iterable[Record]:
280        if not response:
281            yield from []
282        else:
283            yield from self.record_selector.select_records(
284                response=response,
285                stream_state=stream_state,
286                records_schema=records_schema,
287                stream_slice=stream_slice,
288                next_page_token=next_page_token,
289            )
290
291    @property  # type: ignore
292    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
293        """The stream's primary key"""
294        return self._primary_key
295
296    @primary_key.setter
297    def primary_key(self, value: str) -> None:
298        if not isinstance(value, property):
299            self._primary_key = value
300
301    def _next_page_token(
302        self,
303        response: requests.Response,
304        last_page_size: int,
305        last_record: Optional[Record],
306        last_page_token_value: Optional[Any],
307    ) -> Optional[Mapping[str, Any]]:
308        """
309        Specifies a pagination strategy.
310
311        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
312
313        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
314        """
315        return self._paginator.next_page_token(
316            response=response,
317            last_page_size=last_page_size,
318            last_record=last_record,
319            last_page_token_value=last_page_token_value,
320        )
321
322    def _fetch_next_page(
323        self,
324        stream_state: Mapping[str, Any],
325        stream_slice: StreamSlice,
326        next_page_token: Optional[Mapping[str, Any]] = None,
327    ) -> Optional[requests.Response]:
328        return self.requester.send_request(
329            path=self._paginator_path(
330                next_page_token=next_page_token,
331                stream_state=stream_state,
332                stream_slice=stream_slice,
333            ),
334            stream_state=stream_state,
335            stream_slice=stream_slice,
336            next_page_token=next_page_token,
337            request_headers=self._request_headers(
338                stream_state=stream_state,
339                stream_slice=stream_slice,
340                next_page_token=next_page_token,
341            ),
342            request_params=self._request_params(
343                stream_state=stream_state,
344                stream_slice=stream_slice,
345                next_page_token=next_page_token,
346            ),
347            request_body_data=self._request_body_data(
348                stream_state=stream_state,
349                stream_slice=stream_slice,
350                next_page_token=next_page_token,
351            ),
352            request_body_json=self._request_body_json(
353                stream_state=stream_state,
354                stream_slice=stream_slice,
355                next_page_token=next_page_token,
356            ),
357            log_formatter=self.log_formatter,
358        )
359
360    # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
361    def _read_pages(
362        self,
363        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
364        stream_state: Mapping[str, Any],
365        stream_slice: StreamSlice,
366    ) -> Iterable[Record]:
367        pagination_complete = False
368        initial_token = self._paginator.get_initial_token()
369        next_page_token: Optional[Mapping[str, Any]] = (
370            {"next_page_token": initial_token} if initial_token is not None else None
371        )
372        while not pagination_complete:
373            property_chunks: List[List[str]] = (
374                list(
375                    self.additional_query_properties.get_request_property_chunks(
376                        stream_slice=stream_slice
377                    )
378                )
379                if self.additional_query_properties
380                else [
381                    []
382                ]  # A single empty property chunk represents the case where property chunking is not configured
383            )
384
385            merged_records: MutableMapping[str, Any] = defaultdict(dict)
386            last_page_size = 0
387            last_record: Optional[Record] = None
388            response: Optional[requests.Response] = None
389            for properties in property_chunks:
390                if len(properties) > 0:
391                    stream_slice = StreamSlice(
392                        partition=stream_slice.partition or {},
393                        cursor_slice=stream_slice.cursor_slice or {},
394                        extra_fields={"query_properties": properties},
395                    )
396
397                response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
398                for current_record in records_generator_fn(response):
399                    if (
400                        current_record
401                        and self.additional_query_properties
402                        and self.additional_query_properties.property_chunking
403                    ):
404                        merge_key = (
405                            self.additional_query_properties.property_chunking.get_merge_key(
406                                current_record
407                            )
408                        )
409                        if merge_key:
410                            _deep_merge(merged_records[merge_key], current_record)
411                        else:
412                            # We should still emit records even if the record did not have a merge key
413                            last_page_size += 1
414                            last_record = current_record
415                            yield current_record
416                    else:
417                        last_page_size += 1
418                        last_record = current_record
419                        yield current_record
420
421            if (
422                self.additional_query_properties
423                and self.additional_query_properties.property_chunking
424            ):
425                for merged_record in merged_records.values():
426                    record = Record(
427                        data=merged_record, stream_name=self.name, associated_slice=stream_slice
428                    )
429                    last_page_size += 1
430                    last_record = record
431                    yield record
432
433            if not response:
434                pagination_complete = True
435            else:
436                last_page_token_value = (
437                    next_page_token.get("next_page_token") if next_page_token else None
438                )
439                next_page_token = self._next_page_token(
440                    response=response,
441                    last_page_size=last_page_size,
442                    last_record=last_record,
443                    last_page_token_value=last_page_token_value,
444                )
445                if not next_page_token:
446                    pagination_complete = True
447
448        # Always return an empty generator just in case no records were ever yielded
449        yield from []
450
451    def _read_single_page(
452        self,
453        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
454        stream_state: Mapping[str, Any],
455        stream_slice: StreamSlice,
456    ) -> Iterable[StreamData]:
457        initial_token = stream_state.get("next_page_token")
458        if initial_token is None:
459            initial_token = self._paginator.get_initial_token()
460        next_page_token: Optional[Mapping[str, Any]] = (
461            {"next_page_token": initial_token} if initial_token else None
462        )
463
464        response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
465
466        last_page_size = 0
467        last_record: Optional[Record] = None
468        for record in records_generator_fn(response):
469            last_page_size += 1
470            last_record = record
471            yield record
472
473        if not response:
474            next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
475        else:
476            last_page_token_value = (
477                next_page_token.get("next_page_token") if next_page_token else None
478            )
479            next_page_token = self._next_page_token(
480                response=response,
481                last_page_size=last_page_size,
482                last_record=last_record,
483                last_page_token_value=last_page_token_value,
484            ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
485
486        if self.cursor:
487            self.cursor.close_slice(
488                StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition)
489            )
490
491        # Always return an empty generator just in case no records were ever yielded
492        yield from []
493
494    def read_records(
495        self,
496        records_schema: Mapping[str, Any],
497        stream_slice: Optional[StreamSlice] = None,
498    ) -> Iterable[StreamData]:
499        """
500        Fetch a stream's records from an HTTP API source
501
502        :param records_schema: json schema to describe record
503        :param stream_slice: The stream slice to read data for
504        :return: The records read from the API source
505        """
506        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
507
508        most_recent_record_from_slice = None
509        record_generator = partial(
510            self._parse_records,
511            stream_slice=stream_slice,
512            stream_state=self.state or {},
513            records_schema=records_schema,
514        )
515
516        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
517            stream_state = self.state
518
519            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
520            # fetch more records. The platform deletes stream state for full refresh streams before starting a
521            # new job, so we don't need to worry about this value existing for the initial attempt
522            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
523                return
524
525            yield from self._read_single_page(record_generator, stream_state, _slice)
526        else:
527            for stream_data in self._read_pages(record_generator, self.state, _slice):
528                current_record = self._extract_record(stream_data, _slice)
529                if self.cursor and current_record:
530                    self.cursor.observe(_slice, current_record)
531
532                # Latest record read, not necessarily within slice boundaries.
533                # TODO Remove once all custom components implement `observe` method.
534                # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
535                most_recent_record_from_slice = self._get_most_recent_record(
536                    most_recent_record_from_slice, current_record, _slice
537                )
538                yield stream_data
539
540            if self.cursor:
541                self.cursor.close_slice(_slice, most_recent_record_from_slice)
542        return
543
544    def _get_most_recent_record(
545        self,
546        current_most_recent: Optional[Record],
547        current_record: Optional[Record],
548        stream_slice: StreamSlice,
549    ) -> Optional[Record]:
550        if self.cursor and current_record:
551            if not current_most_recent:
552                return current_record
553            else:
554                return (
555                    current_most_recent
556                    if self.cursor.is_greater_than_or_equal(current_most_recent, current_record)
557                    else current_record
558                )
559        else:
560            return None
561
562    def _extract_record(
563        self, stream_data: StreamData, stream_slice: StreamSlice
564    ) -> Optional[Record]:
565        """
566        As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
567        to data to streamline the rest of the process.
568        """
569        if isinstance(stream_data, Record):
570            # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
571            return stream_data
572        elif isinstance(stream_data, (dict, Mapping)):
573            return Record(
574                data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
575            )
576        elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
577            return Record(
578                data=stream_data.record.data,  # type:ignore # AirbyteMessage always has record.data
579                associated_slice=stream_slice,
580                stream_name=self.name,
581            )
582        return None
583
584    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
585    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
586        """
587        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
588
589        :param sync_mode:
590        :param cursor_field:
591        :param stream_state:
592        :return:
593        """
594        return self.stream_slicer.stream_slices()
595
596    @property
597    def state(self) -> Mapping[str, Any]:
598        return self.cursor.get_stream_state() if self.cursor else {}
599
600    @state.setter
601    def state(self, value: StreamState) -> None:
602        """State setter, accept state serialized by state getter."""
603        if self.cursor:
604            self.cursor.set_initial_state(value)
605
606    def _parse_records(
607        self,
608        response: Optional[requests.Response],
609        stream_state: Mapping[str, Any],
610        records_schema: Mapping[str, Any],
611        stream_slice: Optional[StreamSlice],
612    ) -> Iterable[Record]:
613        yield from self._parse_response(
614            response,
615            stream_slice=stream_slice,
616            stream_state=stream_state,
617            records_schema=records_schema,
618        )
619
620    def must_deduplicate_query_params(self) -> bool:
621        return True
622
623    @staticmethod
624    def _to_partition_key(to_serialize: Any) -> str:
625        # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
626        return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

Retrieves records by synchronously sending requests to fetch records.

The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.

For each stream slice, submit requests until there are no more pages of records to fetch.

This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.

Attributes:
  • stream_name (str): The stream's name
  • stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
  • requester (Requester): The HTTP requester
  • record_selector (HttpSelector): The record selector
  • paginator (Optional[Paginator]): The paginator
  • stream_slicer (Optional[StreamSlicer]): The stream slicer
  • cursor (Optional[cursor]): The cursor
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
SimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
107    @property  # type: ignore
108    def name(self) -> str:
109        """
110        :return: Stream name
111        """
112        return (
113            str(self._name.eval(self.config))
114            if isinstance(self._name, InterpolatedString)
115            else self._name
116        )
Returns

Stream name

primary_key: Union[str, List[str], List[List[str]], NoneType]
291    @property  # type: ignore
292    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
293        """The stream's primary key"""
294        return self._primary_key

The stream's primary key

ignore_stream_slicer_parameters_on_paginated_requests: bool = False
log_formatter: Optional[Callable[[requests.models.Response], Any]] = None
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
494    def read_records(
495        self,
496        records_schema: Mapping[str, Any],
497        stream_slice: Optional[StreamSlice] = None,
498    ) -> Iterable[StreamData]:
499        """
500        Fetch a stream's records from an HTTP API source
501
502        :param records_schema: json schema to describe record
503        :param stream_slice: The stream slice to read data for
504        :return: The records read from the API source
505        """
506        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
507
508        most_recent_record_from_slice = None
509        record_generator = partial(
510            self._parse_records,
511            stream_slice=stream_slice,
512            stream_state=self.state or {},
513            records_schema=records_schema,
514        )
515
516        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
517            stream_state = self.state
518
519            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
520            # fetch more records. The platform deletes stream state for full refresh streams before starting a
521            # new job, so we don't need to worry about this value existing for the initial attempt
522            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
523                return
524
525            yield from self._read_single_page(record_generator, stream_state, _slice)
526        else:
527            for stream_data in self._read_pages(record_generator, self.state, _slice):
528                current_record = self._extract_record(stream_data, _slice)
529                if self.cursor and current_record:
530                    self.cursor.observe(_slice, current_record)
531
532                # Latest record read, not necessarily within slice boundaries.
533                # TODO Remove once all custom components implement `observe` method.
534                # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
535                most_recent_record_from_slice = self._get_most_recent_record(
536                    most_recent_record_from_slice, current_record, _slice
537                )
538                yield stream_data
539
540            if self.cursor:
541                self.cursor.close_slice(_slice, most_recent_record_from_slice)
542        return

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
585    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
586        """
587        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
588
589        :param sync_mode:
590        :param cursor_field:
591        :param stream_state:
592        :return:
593        """
594        return self.stream_slicer.stream_slices()

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
state: Mapping[str, Any]
596    @property
597    def state(self) -> Mapping[str, Any]:
598        return self.cursor.get_stream_state() if self.cursor else {}

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def must_deduplicate_query_params(self) -> bool:
620    def must_deduplicate_query_params(self) -> bool:
621        return True
@dataclass
class SimpleRetrieverTestReadDecorator(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
649@dataclass
650class SimpleRetrieverTestReadDecorator(SimpleRetriever):
651    """
652    In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
653    slices that are queried throughout a read command.
654    """
655
656    maximum_number_of_slices: int = 5
657
658    def __post_init__(self, options: Mapping[str, Any]) -> None:
659        super().__post_init__(options)
660        self.log_formatter = (
661            (
662                lambda response: format_http_message(
663                    response,
664                    f"Stream '{self.name}' request",
665                    f"Request performed in order to extract records for stream '{self.name}'",
666                    self.name,
667                )
668            )
669            if not self.log_formatter
670            else self.log_formatter
671        )
672
673        if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
674            raise ValueError(
675                f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
676            )
677
678    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
679    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
680        return islice(super().stream_slices(), self.maximum_number_of_slices)

In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command.

SimpleRetrieverTestReadDecorator( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, maximum_number_of_slices: int = 5)
maximum_number_of_slices: int = 5
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
679    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
680        return islice(super().stream_slices(), self.maximum_number_of_slices)

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
@dataclass
class AsyncRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 19@dataclass
 20class AsyncRetriever(Retriever):
 21    config: Config
 22    parameters: InitVar[Mapping[str, Any]]
 23    record_selector: RecordSelector
 24    stream_slicer: AsyncJobPartitionRouter
 25    slice_logger: AlwaysLogSliceLogger = field(
 26        init=False,
 27        default_factory=lambda: AlwaysLogSliceLogger(),
 28    )
 29
 30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 31        self._parameters = parameters
 32
 33    @property
 34    def exit_on_rate_limit(self) -> bool:
 35        """
 36        Whether to exit on rate limit. This is a property of the job repository
 37        and not the stream slicer. The stream slicer is responsible for creating
 38        the jobs, but the job repository is responsible for managing the rate
 39        limits and other job-related properties.
 40
 41        Note:
 42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
 43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
 44           to complete the results.
 45        """
 46        job_orchestrator = self.stream_slicer._job_orchestrator
 47        if job_orchestrator is None:
 48            # Default value when orchestrator is not available
 49            return False
 50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore
 51
 52    @exit_on_rate_limit.setter
 53    def exit_on_rate_limit(self, value: bool) -> None:
 54        """
 55        Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
 56        meaning that the Job cannot be placed / created if the rate limit is reached.
 57        Thus no further work on managing jobs is expected to be done.
 58        """
 59        job_orchestrator = self.stream_slicer._job_orchestrator
 60        if job_orchestrator is not None:
 61            job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value  # type: ignore[attr-defined, assignment]
 62
 63    @property
 64    def state(self) -> StreamState:
 65        """
 66        As a first iteration for sendgrid, there is no state to be managed
 67        """
 68        return {}
 69
 70    @state.setter
 71    def state(self, value: StreamState) -> None:
 72        """
 73        As a first iteration for sendgrid, there is no state to be managed
 74        """
 75        pass
 76
 77    def _get_stream_state(self) -> StreamState:
 78        """
 79        Gets the current state of the stream.
 80
 81        Returns:
 82            StreamState: Mapping[str, Any]
 83        """
 84
 85        return self.state
 86
 87    def _validate_and_get_stream_slice_jobs(
 88        self, stream_slice: Optional[StreamSlice] = None
 89    ) -> Iterable[AsyncJob]:
 90        """
 91        Validates the stream_slice argument and returns the partition from it.
 92
 93        Args:
 94            stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
 95
 96        Returns:
 97            AsyncPartition: The partition extracted from the stream_slice.
 98
 99        Raises:
100            AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
101
102        """
103        return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
104
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()
107
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )
AsyncRetriever( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], record_selector: airbyte_cdk.RecordSelector, stream_slicer: airbyte_cdk.sources.declarative.partition_routers.AsyncJobPartitionRouter)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
record_selector: airbyte_cdk.RecordSelector
exit_on_rate_limit: bool
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore

Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.

Note:
  • If the creation_requester cannot place / create the job - it might be the case of the RateLimits
  • If the creation_requester can place / create the job - it means all other requesters should successfully manage to complete the results.
state: Mapping[str, Any]
63    @property
64    def state(self) -> StreamState:
65        """
66        As a first iteration for sendgrid, there is no state to be managed
67        """
68        return {}

As a first iteration for sendgrid, there is no state to be managed

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()

Returns the stream slices

def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class LazySimpleRetriever(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
683@deprecated(
684    "This class is experimental. Use at your own risk.",
685    category=ExperimentalClassWarning,
686)
687@dataclass
688class LazySimpleRetriever(SimpleRetriever):
689    """
690    A retriever that supports lazy loading from parent streams.
691    """
692
693    def _read_pages(
694        self,
695        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
696        stream_state: Mapping[str, Any],
697        stream_slice: StreamSlice,
698    ) -> Iterable[Record]:
699        response = stream_slice.extra_fields["child_response"]
700        if response:
701            last_page_size, last_record = 0, None
702            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
703                last_page_size += 1
704                last_record = record
705                yield record
706
707            next_page_token = self._next_page_token(response, last_page_size, last_record, None)
708            if next_page_token:
709                yield from self._paginate(
710                    next_page_token,
711                    records_generator_fn,
712                    stream_state,
713                    stream_slice,
714                )
715
716            yield from []
717        else:
718            yield from self._read_pages(records_generator_fn, stream_state, stream_slice)
719
720    def _paginate(
721        self,
722        next_page_token: Any,
723        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
724        stream_state: Mapping[str, Any],
725        stream_slice: StreamSlice,
726    ) -> Iterable[Record]:
727        """Handle pagination by fetching subsequent pages."""
728        pagination_complete = False
729
730        while not pagination_complete:
731            response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
732            last_page_size, last_record = 0, None
733
734            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
735                last_page_size += 1
736                last_record = record
737                yield record
738
739            if not response:
740                pagination_complete = True
741            else:
742                last_page_token_value = (
743                    next_page_token.get("next_page_token") if next_page_token else None
744                )
745                next_page_token = self._next_page_token(
746                    response, last_page_size, last_record, last_page_token_value
747                )
748
749                if not next_page_token:
750                    pagination_complete = True

A retriever that supports lazy loading from parent streams.

LazySimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None)