airbyte_cdk.sources.declarative.retrievers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
 8    LazySimpleRetriever,
 9    SimpleRetriever,
10)
11
12__all__ = [
13    "Retriever",
14    "SimpleRetriever",
15    "AsyncRetriever",
16    "LazySimpleRetriever",
17]
class Retriever:
14class Retriever:
15    """
16    Responsible for fetching a stream's records from an HTTP API source.
17    """
18
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """
32
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""
36
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """
51
52    @state.setter
53    @abstractmethod
54    def state(self, value: StreamState) -> None:
55        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

@dataclass
class SimpleRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 53@dataclass
 54class SimpleRetriever(Retriever):
 55    """
 56    Retrieves records by synchronously sending requests to fetch records.
 57
 58    The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
 59
 60    For each stream slice, submit requests until there are no more pages of records to fetch.
 61
 62    This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
 63    As a result, some of the parameters passed to some methods are unused.
 64    The two will be decoupled in a future release.
 65
 66    Attributes:
 67        stream_name (str): The stream's name
 68        stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
 69        requester (Requester): The HTTP requester
 70        record_selector (HttpSelector): The record selector
 71        paginator (Optional[Paginator]): The paginator
 72        stream_slicer (Optional[StreamSlicer]): The stream slicer
 73        cursor (Optional[cursor]): The cursor
 74        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 75    """
 76
 77    requester: Requester
 78    record_selector: HttpSelector
 79    config: Config
 80    parameters: InitVar[Mapping[str, Any]]
 81    name: str
 82    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 83    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 84    _primary_key: str = field(init=False, repr=False, default="")
 85    paginator: Optional[Paginator] = None
 86    stream_slicer: StreamSlicer = field(
 87        default_factory=lambda: SinglePartitionRouter(parameters={})
 88    )
 89    request_option_provider: RequestOptionsProvider = field(
 90        default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
 91    )
 92    cursor: Optional[DeclarativeCursor] = None
 93    ignore_stream_slicer_parameters_on_paginated_requests: bool = False
 94    additional_query_properties: Optional[QueryProperties] = None
 95    log_formatter: Optional[Callable[[requests.Response], Any]] = None
 96
 97    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 98        self._paginator = self.paginator or NoPagination(parameters=parameters)
 99        self._parameters = parameters
100        self._name = (
101            InterpolatedString(self._name, parameters=parameters)
102            if isinstance(self._name, str)
103            else self._name
104        )
105
106    @property  # type: ignore
107    def name(self) -> str:
108        """
109        :return: Stream name
110        """
111        return (
112            str(self._name.eval(self.config))
113            if isinstance(self._name, InterpolatedString)
114            else self._name
115        )
116
117    @name.setter
118    def name(self, value: str) -> None:
119        if not isinstance(value, property):
120            self._name = value
121
122    def _get_mapping(
123        self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
124    ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
125        """
126        Get mapping from the provided method, and get the keys of the mapping.
127        If the method returns a string, it will return the string and an empty set.
128        If the method returns a dict, it will return the dict and its keys.
129        """
130        mapping = method(**kwargs) or {}
131        keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
132        return mapping, keys
133
134    def _get_request_options(
135        self,
136        stream_state: Optional[StreamData],
137        stream_slice: Optional[StreamSlice],
138        next_page_token: Optional[Mapping[str, Any]],
139        paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
140        stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
141    ) -> Union[Mapping[str, Any], str]:
142        """
143        Get the request_option from the paginator and the stream slicer.
144        Raise a ValueError if there's a key collision
145        Returned merged mapping otherwise
146        """
147        # FIXME we should eventually remove the usage of stream_state as part of the interpolation
148
149        is_body_json = paginator_method.__name__ == "get_request_body_json"
150
151        mappings = [
152            paginator_method(
153                stream_slice=stream_slice,
154                next_page_token=next_page_token,
155            ),
156        ]
157        if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
158            mappings.append(
159                stream_slicer_method(
160                    stream_slice=stream_slice,
161                    next_page_token=next_page_token,
162                )
163            )
164        return combine_mappings(mappings, allow_same_value_merge=is_body_json)
165
166    def _request_headers(
167        self,
168        stream_state: Optional[StreamData] = None,
169        stream_slice: Optional[StreamSlice] = None,
170        next_page_token: Optional[Mapping[str, Any]] = None,
171    ) -> Mapping[str, Any]:
172        """
173        Specifies request headers.
174        Authentication headers will overwrite any overlapping headers returned from this method.
175        """
176        headers = self._get_request_options(
177            stream_state,
178            stream_slice,
179            next_page_token,
180            self._paginator.get_request_headers,
181            self.request_option_provider.get_request_headers,
182        )
183        if isinstance(headers, str):
184            raise ValueError("Request headers cannot be a string")
185        return {str(k): str(v) for k, v in headers.items()}
186
187    def _request_params(
188        self,
189        stream_state: Optional[StreamData] = None,
190        stream_slice: Optional[StreamSlice] = None,
191        next_page_token: Optional[Mapping[str, Any]] = None,
192    ) -> Mapping[str, Any]:
193        """
194        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
195
196        E.g: you might want to define query parameters for paging if next_page_token is not None.
197        """
198        params = self._get_request_options(
199            stream_state,
200            stream_slice,
201            next_page_token,
202            self._paginator.get_request_params,
203            self.request_option_provider.get_request_params,
204        )
205        if isinstance(params, str):
206            raise ValueError("Request params cannot be a string")
207        return params
208
209    def _request_body_data(
210        self,
211        stream_state: Optional[StreamData] = None,
212        stream_slice: Optional[StreamSlice] = None,
213        next_page_token: Optional[Mapping[str, Any]] = None,
214    ) -> Union[Mapping[str, Any], str]:
215        """
216        Specifies how to populate the body of the request with a non-JSON payload.
217
218        If returns a ready text that it will be sent as is.
219        If returns a dict that it will be converted to a urlencoded form.
220        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
221
222        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
223        """
224        return self._get_request_options(
225            stream_state,
226            stream_slice,
227            next_page_token,
228            self._paginator.get_request_body_data,
229            self.request_option_provider.get_request_body_data,
230        )
231
232    def _request_body_json(
233        self,
234        stream_state: Optional[StreamData] = None,
235        stream_slice: Optional[StreamSlice] = None,
236        next_page_token: Optional[Mapping[str, Any]] = None,
237    ) -> Optional[Mapping[str, Any]]:
238        """
239        Specifies how to populate the body of the request with a JSON payload.
240
241        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
242        """
243        body_json = self._get_request_options(
244            stream_state,
245            stream_slice,
246            next_page_token,
247            self._paginator.get_request_body_json,
248            self.request_option_provider.get_request_body_json,
249        )
250        if isinstance(body_json, str):
251            raise ValueError("Request body json cannot be a string")
252        return body_json
253
254    def _paginator_path(
255        self,
256        next_page_token: Optional[Mapping[str, Any]] = None,
257        stream_state: Optional[Mapping[str, Any]] = None,
258        stream_slice: Optional[StreamSlice] = None,
259    ) -> Optional[str]:
260        """
261        If the paginator points to a path, follow it, else return nothing so the requester is used.
262        :param next_page_token:
263        :return:
264        """
265        return self._paginator.path(
266            next_page_token=next_page_token,
267            stream_state=stream_state,
268            stream_slice=stream_slice,
269        )
270
271    def _parse_response(
272        self,
273        response: Optional[requests.Response],
274        stream_state: StreamState,
275        records_schema: Mapping[str, Any],
276        stream_slice: Optional[StreamSlice] = None,
277        next_page_token: Optional[Mapping[str, Any]] = None,
278    ) -> Iterable[Record]:
279        if not response:
280            yield from []
281        else:
282            yield from self.record_selector.select_records(
283                response=response,
284                stream_state=stream_state,
285                records_schema=records_schema,
286                stream_slice=stream_slice,
287                next_page_token=next_page_token,
288            )
289
290    @property  # type: ignore
291    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
292        """The stream's primary key"""
293        return self._primary_key
294
295    @primary_key.setter
296    def primary_key(self, value: str) -> None:
297        if not isinstance(value, property):
298            self._primary_key = value
299
300    def _next_page_token(
301        self,
302        response: requests.Response,
303        last_page_size: int,
304        last_record: Optional[Record],
305        last_page_token_value: Optional[Any],
306    ) -> Optional[Mapping[str, Any]]:
307        """
308        Specifies a pagination strategy.
309
310        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
311
312        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
313        """
314        return self._paginator.next_page_token(
315            response=response,
316            last_page_size=last_page_size,
317            last_record=last_record,
318            last_page_token_value=last_page_token_value,
319        )
320
321    def _fetch_next_page(
322        self,
323        stream_state: Mapping[str, Any],
324        stream_slice: StreamSlice,
325        next_page_token: Optional[Mapping[str, Any]] = None,
326    ) -> Optional[requests.Response]:
327        return self.requester.send_request(
328            path=self._paginator_path(
329                next_page_token=next_page_token,
330                stream_state=stream_state,
331                stream_slice=stream_slice,
332            ),
333            stream_state=stream_state,
334            stream_slice=stream_slice,
335            next_page_token=next_page_token,
336            request_headers=self._request_headers(
337                stream_state=stream_state,
338                stream_slice=stream_slice,
339                next_page_token=next_page_token,
340            ),
341            request_params=self._request_params(
342                stream_state=stream_state,
343                stream_slice=stream_slice,
344                next_page_token=next_page_token,
345            ),
346            request_body_data=self._request_body_data(
347                stream_state=stream_state,
348                stream_slice=stream_slice,
349                next_page_token=next_page_token,
350            ),
351            request_body_json=self._request_body_json(
352                stream_state=stream_state,
353                stream_slice=stream_slice,
354                next_page_token=next_page_token,
355            ),
356            log_formatter=self.log_formatter,
357        )
358
359    # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
360    def _read_pages(
361        self,
362        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
363        stream_state: Mapping[str, Any],
364        stream_slice: StreamSlice,
365    ) -> Iterable[Record]:
366        pagination_complete = False
367        initial_token = self._paginator.get_initial_token()
368        next_page_token: Optional[Mapping[str, Any]] = (
369            {"next_page_token": initial_token} if initial_token is not None else None
370        )
371        while not pagination_complete:
372            property_chunks: List[List[str]] = (
373                list(
374                    self.additional_query_properties.get_request_property_chunks(
375                        stream_slice=stream_slice
376                    )
377                )
378                if self.additional_query_properties
379                else [
380                    []
381                ]  # A single empty property chunk represents the case where property chunking is not configured
382            )
383
384            merged_records: MutableMapping[str, Any] = defaultdict(dict)
385            last_page_size = 0
386            last_record: Optional[Record] = None
387            response: Optional[requests.Response] = None
388            for properties in property_chunks:
389                if len(properties) > 0:
390                    stream_slice = StreamSlice(
391                        partition=stream_slice.partition or {},
392                        cursor_slice=stream_slice.cursor_slice or {},
393                        extra_fields={"query_properties": properties},
394                    )
395
396                response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
397                for current_record in records_generator_fn(response):
398                    if (
399                        current_record
400                        and self.additional_query_properties
401                        and self.additional_query_properties.property_chunking
402                    ):
403                        merge_key = (
404                            self.additional_query_properties.property_chunking.get_merge_key(
405                                current_record
406                            )
407                        )
408                        if merge_key:
409                            _deep_merge(merged_records[merge_key], current_record)
410                        else:
411                            # We should still emit records even if the record did not have a merge key
412                            last_page_size += 1
413                            last_record = current_record
414                            yield current_record
415                    else:
416                        last_page_size += 1
417                        last_record = current_record
418                        yield current_record
419
420            if (
421                self.additional_query_properties
422                and self.additional_query_properties.property_chunking
423            ):
424                for merged_record in merged_records.values():
425                    record = Record(
426                        data=merged_record, stream_name=self.name, associated_slice=stream_slice
427                    )
428                    last_page_size += 1
429                    last_record = record
430                    yield record
431
432            if not response:
433                pagination_complete = True
434            else:
435                last_page_token_value = (
436                    next_page_token.get("next_page_token") if next_page_token else None
437                )
438                next_page_token = self._next_page_token(
439                    response=response,
440                    last_page_size=last_page_size,
441                    last_record=last_record,
442                    last_page_token_value=last_page_token_value,
443                )
444                if not next_page_token:
445                    pagination_complete = True
446
447        # Always return an empty generator just in case no records were ever yielded
448        yield from []
449
450    def _read_single_page(
451        self,
452        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
453        stream_state: Mapping[str, Any],
454        stream_slice: StreamSlice,
455    ) -> Iterable[StreamData]:
456        initial_token = stream_state.get("next_page_token")
457        if initial_token is None:
458            initial_token = self._paginator.get_initial_token()
459        next_page_token: Optional[Mapping[str, Any]] = (
460            {"next_page_token": initial_token} if initial_token else None
461        )
462
463        response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
464
465        last_page_size = 0
466        last_record: Optional[Record] = None
467        for record in records_generator_fn(response):
468            last_page_size += 1
469            last_record = record
470            yield record
471
472        if not response:
473            next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
474        else:
475            last_page_token_value = (
476                next_page_token.get("next_page_token") if next_page_token else None
477            )
478            next_page_token = self._next_page_token(
479                response=response,
480                last_page_size=last_page_size,
481                last_record=last_record,
482                last_page_token_value=last_page_token_value,
483            ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
484
485        if self.cursor:
486            self.cursor.close_slice(
487                StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition)
488            )
489
490        # Always return an empty generator just in case no records were ever yielded
491        yield from []
492
493    def read_records(
494        self,
495        records_schema: Mapping[str, Any],
496        stream_slice: Optional[StreamSlice] = None,
497    ) -> Iterable[StreamData]:
498        """
499        Fetch a stream's records from an HTTP API source
500
501        :param records_schema: json schema to describe record
502        :param stream_slice: The stream slice to read data for
503        :return: The records read from the API source
504        """
505        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
506
507        most_recent_record_from_slice = None
508        record_generator = partial(
509            self._parse_records,
510            stream_slice=stream_slice,
511            stream_state=self.state or {},
512            records_schema=records_schema,
513        )
514
515        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
516            stream_state = self.state
517
518            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
519            # fetch more records. The platform deletes stream state for full refresh streams before starting a
520            # new job, so we don't need to worry about this value existing for the initial attempt
521            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
522                return
523
524            yield from self._read_single_page(record_generator, stream_state, _slice)
525        else:
526            for stream_data in self._read_pages(record_generator, self.state, _slice):
527                current_record = self._extract_record(stream_data, _slice)
528                if self.cursor and current_record:
529                    self.cursor.observe(_slice, current_record)
530
531                yield stream_data
532
533            if self.cursor:
534                self.cursor.close_slice(_slice)
535        return
536
537    # FIXME based on the comment above in SimpleRetriever.read_records, it seems like we can tackle https://github.com/airbytehq/airbyte-internal-issues/issues/6955 and remove this
538
539    def _extract_record(
540        self, stream_data: StreamData, stream_slice: StreamSlice
541    ) -> Optional[Record]:
542        """
543        As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
544        to data to streamline the rest of the process.
545        """
546        if isinstance(stream_data, Record):
547            # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
548            return stream_data
549        elif isinstance(stream_data, (dict, Mapping)):
550            return Record(
551                data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
552            )
553        elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
554            return Record(
555                data=stream_data.record.data,  # type:ignore # AirbyteMessage always has record.data
556                associated_slice=stream_slice,
557                stream_name=self.name,
558            )
559        return None
560
561    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
562    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
563        """
564        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
565
566        :param sync_mode:
567        :param cursor_field:
568        :param stream_state:
569        :return:
570        """
571        return self.stream_slicer.stream_slices()
572
573    @property
574    def state(self) -> Mapping[str, Any]:
575        return self.cursor.get_stream_state() if self.cursor else {}
576
577    @state.setter
578    def state(self, value: StreamState) -> None:
579        """State setter, accept state serialized by state getter."""
580        if self.cursor:
581            self.cursor.set_initial_state(value)
582
583    def _parse_records(
584        self,
585        response: Optional[requests.Response],
586        stream_state: Mapping[str, Any],
587        records_schema: Mapping[str, Any],
588        stream_slice: Optional[StreamSlice],
589    ) -> Iterable[Record]:
590        yield from self._parse_response(
591            response,
592            stream_slice=stream_slice,
593            stream_state=stream_state,
594            records_schema=records_schema,
595        )
596
597    def must_deduplicate_query_params(self) -> bool:
598        return True
599
600    @staticmethod
601    def _to_partition_key(to_serialize: Any) -> str:
602        # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
603        return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

Retrieves records by synchronously sending requests to fetch records.

The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.

For each stream slice, submit requests until there are no more pages of records to fetch.

This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.

Attributes:
  • stream_name (str): The stream's name
  • stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
  • requester (Requester): The HTTP requester
  • record_selector (HttpSelector): The record selector
  • paginator (Optional[Paginator]): The paginator
  • stream_slicer (Optional[StreamSlicer]): The stream slicer
  • cursor (Optional[cursor]): The cursor
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
SimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
106    @property  # type: ignore
107    def name(self) -> str:
108        """
109        :return: Stream name
110        """
111        return (
112            str(self._name.eval(self.config))
113            if isinstance(self._name, InterpolatedString)
114            else self._name
115        )
Returns

Stream name

primary_key: Union[str, List[str], List[List[str]], NoneType]
290    @property  # type: ignore
291    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
292        """The stream's primary key"""
293        return self._primary_key

The stream's primary key

ignore_stream_slicer_parameters_on_paginated_requests: bool = False
log_formatter: Optional[Callable[[requests.models.Response], Any]] = None
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
493    def read_records(
494        self,
495        records_schema: Mapping[str, Any],
496        stream_slice: Optional[StreamSlice] = None,
497    ) -> Iterable[StreamData]:
498        """
499        Fetch a stream's records from an HTTP API source
500
501        :param records_schema: json schema to describe record
502        :param stream_slice: The stream slice to read data for
503        :return: The records read from the API source
504        """
505        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
506
507        most_recent_record_from_slice = None
508        record_generator = partial(
509            self._parse_records,
510            stream_slice=stream_slice,
511            stream_state=self.state or {},
512            records_schema=records_schema,
513        )
514
515        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
516            stream_state = self.state
517
518            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
519            # fetch more records. The platform deletes stream state for full refresh streams before starting a
520            # new job, so we don't need to worry about this value existing for the initial attempt
521            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
522                return
523
524            yield from self._read_single_page(record_generator, stream_state, _slice)
525        else:
526            for stream_data in self._read_pages(record_generator, self.state, _slice):
527                current_record = self._extract_record(stream_data, _slice)
528                if self.cursor and current_record:
529                    self.cursor.observe(_slice, current_record)
530
531                yield stream_data
532
533            if self.cursor:
534                self.cursor.close_slice(_slice)
535        return

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
562    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
563        """
564        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
565
566        :param sync_mode:
567        :param cursor_field:
568        :param stream_state:
569        :return:
570        """
571        return self.stream_slicer.stream_slices()

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
state: Mapping[str, Any]
573    @property
574    def state(self) -> Mapping[str, Any]:
575        return self.cursor.get_stream_state() if self.cursor else {}

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def must_deduplicate_query_params(self) -> bool:
597    def must_deduplicate_query_params(self) -> bool:
598        return True
@dataclass
class AsyncRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 19@dataclass
 20class AsyncRetriever(Retriever):
 21    config: Config
 22    parameters: InitVar[Mapping[str, Any]]
 23    record_selector: RecordSelector
 24    stream_slicer: AsyncJobPartitionRouter
 25    slice_logger: AlwaysLogSliceLogger = field(
 26        init=False,
 27        default_factory=lambda: AlwaysLogSliceLogger(),
 28    )
 29
 30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 31        self._parameters = parameters
 32
 33    @property
 34    def exit_on_rate_limit(self) -> bool:
 35        """
 36        Whether to exit on rate limit. This is a property of the job repository
 37        and not the stream slicer. The stream slicer is responsible for creating
 38        the jobs, but the job repository is responsible for managing the rate
 39        limits and other job-related properties.
 40
 41        Note:
 42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
 43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
 44           to complete the results.
 45        """
 46        job_orchestrator = self.stream_slicer._job_orchestrator
 47        if job_orchestrator is None:
 48            # Default value when orchestrator is not available
 49            return False
 50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore
 51
 52    @exit_on_rate_limit.setter
 53    def exit_on_rate_limit(self, value: bool) -> None:
 54        """
 55        Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
 56        meaning that the Job cannot be placed / created if the rate limit is reached.
 57        Thus no further work on managing jobs is expected to be done.
 58        """
 59        job_orchestrator = self.stream_slicer._job_orchestrator
 60        if job_orchestrator is not None:
 61            job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value  # type: ignore[attr-defined, assignment]
 62
 63    @property
 64    def state(self) -> StreamState:
 65        """
 66        As a first iteration for sendgrid, there is no state to be managed
 67        """
 68        return {}
 69
 70    @state.setter
 71    def state(self, value: StreamState) -> None:
 72        """
 73        As a first iteration for sendgrid, there is no state to be managed
 74        """
 75        pass
 76
 77    def _get_stream_state(self) -> StreamState:
 78        """
 79        Gets the current state of the stream.
 80
 81        Returns:
 82            StreamState: Mapping[str, Any]
 83        """
 84
 85        return self.state
 86
 87    def _validate_and_get_stream_slice_jobs(
 88        self, stream_slice: Optional[StreamSlice] = None
 89    ) -> Iterable[AsyncJob]:
 90        """
 91        Validates the stream_slice argument and returns the partition from it.
 92
 93        Args:
 94            stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
 95
 96        Returns:
 97            AsyncPartition: The partition extracted from the stream_slice.
 98
 99        Raises:
100            AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
101
102        """
103        return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
104
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()
107
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )
AsyncRetriever( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], record_selector: airbyte_cdk.RecordSelector, stream_slicer: airbyte_cdk.sources.declarative.partition_routers.AsyncJobPartitionRouter)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
record_selector: airbyte_cdk.RecordSelector
exit_on_rate_limit: bool
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore

Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.

Note:
  • If the creation_requester cannot place / create the job - it might be the case of the RateLimits
  • If the creation_requester can place / create the job - it means all other requesters should successfully manage to complete the results.
state: Mapping[str, Any]
63    @property
64    def state(self) -> StreamState:
65        """
66        As a first iteration for sendgrid, there is no state to be managed
67        """
68        return {}

As a first iteration for sendgrid, there is no state to be managed

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()

Returns the stream slices

def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class LazySimpleRetriever(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
626@deprecated(
627    "This class is experimental. Use at your own risk.",
628    category=ExperimentalClassWarning,
629)
630@dataclass
631class LazySimpleRetriever(SimpleRetriever):
632    """
633    A retriever that supports lazy loading from parent streams.
634    """
635
636    def _read_pages(
637        self,
638        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
639        stream_state: Mapping[str, Any],
640        stream_slice: StreamSlice,
641    ) -> Iterable[Record]:
642        response = stream_slice.extra_fields["child_response"]
643        if response:
644            last_page_size, last_record = 0, None
645            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
646                last_page_size += 1
647                last_record = record
648                yield record
649
650            next_page_token = self._next_page_token(response, last_page_size, last_record, None)
651            if next_page_token:
652                yield from self._paginate(
653                    next_page_token,
654                    records_generator_fn,
655                    stream_state,
656                    stream_slice,
657                )
658
659            yield from []
660        else:
661            yield from self._read_pages(records_generator_fn, stream_state, stream_slice)
662
663    def _paginate(
664        self,
665        next_page_token: Any,
666        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
667        stream_state: Mapping[str, Any],
668        stream_slice: StreamSlice,
669    ) -> Iterable[Record]:
670        """Handle pagination by fetching subsequent pages."""
671        pagination_complete = False
672
673        while not pagination_complete:
674            response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
675            last_page_size, last_record = 0, None
676
677            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
678                last_page_size += 1
679                last_record = record
680                yield record
681
682            if not response:
683                pagination_complete = True
684            else:
685                last_page_token_value = (
686                    next_page_token.get("next_page_token") if next_page_token else None
687                )
688                next_page_token = self._next_page_token(
689                    response, last_page_size, last_record, last_page_token_value
690                )
691
692                if not next_page_token:
693                    pagination_complete = True

A retriever that supports lazy loading from parent streams.

LazySimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None)