airbyte_cdk.sources.declarative.retrievers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
 8    LazySimpleRetriever,
 9    SimpleRetriever,
10    SimpleRetrieverTestReadDecorator,
11)
12
13__all__ = [
14    "Retriever",
15    "SimpleRetriever",
16    "SimpleRetrieverTestReadDecorator",
17    "AsyncRetriever",
18    "LazySimpleRetriever",
19]
class Retriever:
14class Retriever:
15    """
16    Responsible for fetching a stream's records from an HTTP API source.
17    """
18
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """
32
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""
36
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """
51
52    @state.setter
53    @abstractmethod
54    def state(self, value: StreamState) -> None:
55        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

@dataclass
class SimpleRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 51@dataclass
 52class SimpleRetriever(Retriever):
 53    """
 54    Retrieves records by synchronously sending requests to fetch records.
 55
 56    The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
 57
 58    For each stream slice, submit requests until there are no more pages of records to fetch.
 59
 60    This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
 61    As a result, some of the parameters passed to some methods are unused.
 62    The two will be decoupled in a future release.
 63
 64    Attributes:
 65        stream_name (str): The stream's name
 66        stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
 67        requester (Requester): The HTTP requester
 68        record_selector (HttpSelector): The record selector
 69        paginator (Optional[Paginator]): The paginator
 70        stream_slicer (Optional[StreamSlicer]): The stream slicer
 71        cursor (Optional[cursor]): The cursor
 72        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 73    """
 74
 75    requester: Requester
 76    record_selector: HttpSelector
 77    config: Config
 78    parameters: InitVar[Mapping[str, Any]]
 79    name: str
 80    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 81    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 82    _primary_key: str = field(init=False, repr=False, default="")
 83    paginator: Optional[Paginator] = None
 84    stream_slicer: StreamSlicer = field(
 85        default_factory=lambda: SinglePartitionRouter(parameters={})
 86    )
 87    request_option_provider: RequestOptionsProvider = field(
 88        default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
 89    )
 90    cursor: Optional[DeclarativeCursor] = None
 91    ignore_stream_slicer_parameters_on_paginated_requests: bool = False
 92
 93    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 94        self._paginator = self.paginator or NoPagination(parameters=parameters)
 95        self._parameters = parameters
 96        self._name = (
 97            InterpolatedString(self._name, parameters=parameters)
 98            if isinstance(self._name, str)
 99            else self._name
100        )
101
102    @property  # type: ignore
103    def name(self) -> str:
104        """
105        :return: Stream name
106        """
107        return (
108            str(self._name.eval(self.config))
109            if isinstance(self._name, InterpolatedString)
110            else self._name
111        )
112
113    @name.setter
114    def name(self, value: str) -> None:
115        if not isinstance(value, property):
116            self._name = value
117
118    def _get_mapping(
119        self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
120    ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
121        """
122        Get mapping from the provided method, and get the keys of the mapping.
123        If the method returns a string, it will return the string and an empty set.
124        If the method returns a dict, it will return the dict and its keys.
125        """
126        mapping = method(**kwargs) or {}
127        keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
128        return mapping, keys
129
130    def _get_request_options(
131        self,
132        stream_state: Optional[StreamData],
133        stream_slice: Optional[StreamSlice],
134        next_page_token: Optional[Mapping[str, Any]],
135        paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
136        stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
137    ) -> Union[Mapping[str, Any], str]:
138        """
139        Get the request_option from the paginator and the stream slicer.
140        Raise a ValueError if there's a key collision
141        Returned merged mapping otherwise
142        """
143        # FIXME we should eventually remove the usage of stream_state as part of the interpolation
144
145        is_body_json = paginator_method.__name__ == "get_request_body_json"
146
147        mappings = [
148            paginator_method(
149                stream_slice=stream_slice,
150                next_page_token=next_page_token,
151            ),
152        ]
153        if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
154            mappings.append(
155                stream_slicer_method(
156                    stream_slice=stream_slice,
157                    next_page_token=next_page_token,
158                )
159            )
160        return combine_mappings(mappings, allow_same_value_merge=is_body_json)
161
162    def _request_headers(
163        self,
164        stream_state: Optional[StreamData] = None,
165        stream_slice: Optional[StreamSlice] = None,
166        next_page_token: Optional[Mapping[str, Any]] = None,
167    ) -> Mapping[str, Any]:
168        """
169        Specifies request headers.
170        Authentication headers will overwrite any overlapping headers returned from this method.
171        """
172        headers = self._get_request_options(
173            stream_state,
174            stream_slice,
175            next_page_token,
176            self._paginator.get_request_headers,
177            self.request_option_provider.get_request_headers,
178        )
179        if isinstance(headers, str):
180            raise ValueError("Request headers cannot be a string")
181        return {str(k): str(v) for k, v in headers.items()}
182
183    def _request_params(
184        self,
185        stream_state: Optional[StreamData] = None,
186        stream_slice: Optional[StreamSlice] = None,
187        next_page_token: Optional[Mapping[str, Any]] = None,
188    ) -> Mapping[str, Any]:
189        """
190        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
191
192        E.g: you might want to define query parameters for paging if next_page_token is not None.
193        """
194        params = self._get_request_options(
195            stream_state,
196            stream_slice,
197            next_page_token,
198            self._paginator.get_request_params,
199            self.request_option_provider.get_request_params,
200        )
201        if isinstance(params, str):
202            raise ValueError("Request params cannot be a string")
203        return params
204
205    def _request_body_data(
206        self,
207        stream_state: Optional[StreamData] = None,
208        stream_slice: Optional[StreamSlice] = None,
209        next_page_token: Optional[Mapping[str, Any]] = None,
210    ) -> Union[Mapping[str, Any], str]:
211        """
212        Specifies how to populate the body of the request with a non-JSON payload.
213
214        If returns a ready text that it will be sent as is.
215        If returns a dict that it will be converted to a urlencoded form.
216        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
217
218        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
219        """
220        return self._get_request_options(
221            stream_state,
222            stream_slice,
223            next_page_token,
224            self._paginator.get_request_body_data,
225            self.request_option_provider.get_request_body_data,
226        )
227
228    def _request_body_json(
229        self,
230        stream_state: Optional[StreamData] = None,
231        stream_slice: Optional[StreamSlice] = None,
232        next_page_token: Optional[Mapping[str, Any]] = None,
233    ) -> Optional[Mapping[str, Any]]:
234        """
235        Specifies how to populate the body of the request with a JSON payload.
236
237        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
238        """
239        body_json = self._get_request_options(
240            stream_state,
241            stream_slice,
242            next_page_token,
243            self._paginator.get_request_body_json,
244            self.request_option_provider.get_request_body_json,
245        )
246        if isinstance(body_json, str):
247            raise ValueError("Request body json cannot be a string")
248        return body_json
249
250    def _paginator_path(
251        self,
252        next_page_token: Optional[Mapping[str, Any]] = None,
253        stream_state: Optional[Mapping[str, Any]] = None,
254        stream_slice: Optional[StreamSlice] = None,
255    ) -> Optional[str]:
256        """
257        If the paginator points to a path, follow it, else return nothing so the requester is used.
258        :param next_page_token:
259        :return:
260        """
261        return self._paginator.path(
262            next_page_token=next_page_token,
263            stream_state=stream_state,
264            stream_slice=stream_slice,
265        )
266
267    def _parse_response(
268        self,
269        response: Optional[requests.Response],
270        stream_state: StreamState,
271        records_schema: Mapping[str, Any],
272        stream_slice: Optional[StreamSlice] = None,
273        next_page_token: Optional[Mapping[str, Any]] = None,
274    ) -> Iterable[Record]:
275        if not response:
276            yield from []
277        else:
278            yield from self.record_selector.select_records(
279                response=response,
280                stream_state=stream_state,
281                records_schema=records_schema,
282                stream_slice=stream_slice,
283                next_page_token=next_page_token,
284            )
285
286    @property  # type: ignore
287    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
288        """The stream's primary key"""
289        return self._primary_key
290
291    @primary_key.setter
292    def primary_key(self, value: str) -> None:
293        if not isinstance(value, property):
294            self._primary_key = value
295
296    def _next_page_token(
297        self,
298        response: requests.Response,
299        last_page_size: int,
300        last_record: Optional[Record],
301        last_page_token_value: Optional[Any],
302    ) -> Optional[Mapping[str, Any]]:
303        """
304        Specifies a pagination strategy.
305
306        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
307
308        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
309        """
310        return self._paginator.next_page_token(
311            response=response,
312            last_page_size=last_page_size,
313            last_record=last_record,
314            last_page_token_value=last_page_token_value,
315        )
316
317    def _fetch_next_page(
318        self,
319        stream_state: Mapping[str, Any],
320        stream_slice: StreamSlice,
321        next_page_token: Optional[Mapping[str, Any]] = None,
322    ) -> Optional[requests.Response]:
323        return self.requester.send_request(
324            path=self._paginator_path(
325                next_page_token=next_page_token,
326                stream_state=stream_state,
327                stream_slice=stream_slice,
328            ),
329            stream_state=stream_state,
330            stream_slice=stream_slice,
331            next_page_token=next_page_token,
332            request_headers=self._request_headers(
333                stream_state=stream_state,
334                stream_slice=stream_slice,
335                next_page_token=next_page_token,
336            ),
337            request_params=self._request_params(
338                stream_state=stream_state,
339                stream_slice=stream_slice,
340                next_page_token=next_page_token,
341            ),
342            request_body_data=self._request_body_data(
343                stream_state=stream_state,
344                stream_slice=stream_slice,
345                next_page_token=next_page_token,
346            ),
347            request_body_json=self._request_body_json(
348                stream_state=stream_state,
349                stream_slice=stream_slice,
350                next_page_token=next_page_token,
351            ),
352        )
353
354    # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
355    def _read_pages(
356        self,
357        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
358        stream_state: Mapping[str, Any],
359        stream_slice: StreamSlice,
360    ) -> Iterable[Record]:
361        pagination_complete = False
362        initial_token = self._paginator.get_initial_token()
363        next_page_token: Optional[Mapping[str, Any]] = (
364            {"next_page_token": initial_token} if initial_token else None
365        )
366        while not pagination_complete:
367            response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
368
369            last_page_size = 0
370            last_record: Optional[Record] = None
371            for record in records_generator_fn(response):
372                last_page_size += 1
373                last_record = record
374                yield record
375
376            if not response:
377                pagination_complete = True
378            else:
379                last_page_token_value = (
380                    next_page_token.get("next_page_token") if next_page_token else None
381                )
382                next_page_token = self._next_page_token(
383                    response=response,
384                    last_page_size=last_page_size,
385                    last_record=last_record,
386                    last_page_token_value=last_page_token_value,
387                )
388                if not next_page_token:
389                    pagination_complete = True
390
391        # Always return an empty generator just in case no records were ever yielded
392        yield from []
393
394    def _read_single_page(
395        self,
396        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
397        stream_state: Mapping[str, Any],
398        stream_slice: StreamSlice,
399    ) -> Iterable[StreamData]:
400        initial_token = stream_state.get("next_page_token")
401        if initial_token is None:
402            initial_token = self._paginator.get_initial_token()
403        next_page_token: Optional[Mapping[str, Any]] = (
404            {"next_page_token": initial_token} if initial_token else None
405        )
406
407        response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
408
409        last_page_size = 0
410        last_record: Optional[Record] = None
411        for record in records_generator_fn(response):
412            last_page_size += 1
413            last_record = record
414            yield record
415
416        if not response:
417            next_page_token = {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
418        else:
419            last_page_token_value = (
420                next_page_token.get("next_page_token") if next_page_token else None
421            )
422            next_page_token = self._next_page_token(
423                response=response,
424                last_page_size=last_page_size,
425                last_record=last_record,
426                last_page_token_value=last_page_token_value,
427            ) or {FULL_REFRESH_SYNC_COMPLETE_KEY: True}
428
429        if self.cursor:
430            self.cursor.close_slice(
431                StreamSlice(cursor_slice=next_page_token, partition=stream_slice.partition)
432            )
433
434        # Always return an empty generator just in case no records were ever yielded
435        yield from []
436
437    def read_records(
438        self,
439        records_schema: Mapping[str, Any],
440        stream_slice: Optional[StreamSlice] = None,
441    ) -> Iterable[StreamData]:
442        """
443        Fetch a stream's records from an HTTP API source
444
445        :param records_schema: json schema to describe record
446        :param stream_slice: The stream slice to read data for
447        :return: The records read from the API source
448        """
449        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
450
451        most_recent_record_from_slice = None
452        record_generator = partial(
453            self._parse_records,
454            stream_slice=stream_slice,
455            stream_state=self.state or {},
456            records_schema=records_schema,
457        )
458
459        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
460            stream_state = self.state
461
462            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
463            # fetch more records. The platform deletes stream state for full refresh streams before starting a
464            # new job, so we don't need to worry about this value existing for the initial attempt
465            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
466                return
467
468            yield from self._read_single_page(record_generator, stream_state, _slice)
469        else:
470            for stream_data in self._read_pages(record_generator, self.state, _slice):
471                current_record = self._extract_record(stream_data, _slice)
472                if self.cursor and current_record:
473                    self.cursor.observe(_slice, current_record)
474
475                # Latest record read, not necessarily within slice boundaries.
476                # TODO Remove once all custom components implement `observe` method.
477                # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
478                most_recent_record_from_slice = self._get_most_recent_record(
479                    most_recent_record_from_slice, current_record, _slice
480                )
481                yield stream_data
482
483            if self.cursor:
484                self.cursor.close_slice(_slice, most_recent_record_from_slice)
485        return
486
487    def _get_most_recent_record(
488        self,
489        current_most_recent: Optional[Record],
490        current_record: Optional[Record],
491        stream_slice: StreamSlice,
492    ) -> Optional[Record]:
493        if self.cursor and current_record:
494            if not current_most_recent:
495                return current_record
496            else:
497                return (
498                    current_most_recent
499                    if self.cursor.is_greater_than_or_equal(current_most_recent, current_record)
500                    else current_record
501                )
502        else:
503            return None
504
505    def _extract_record(
506        self, stream_data: StreamData, stream_slice: StreamSlice
507    ) -> Optional[Record]:
508        """
509        As we allow the output of _read_pages to be StreamData, it can be multiple things. Therefore, we need to filter out and normalize
510        to data to streamline the rest of the process.
511        """
512        if isinstance(stream_data, Record):
513            # Record is not part of `StreamData` but is the most common implementation of `Mapping[str, Any]` which is part of `StreamData`
514            return stream_data
515        elif isinstance(stream_data, (dict, Mapping)):
516            return Record(
517                data=dict(stream_data), associated_slice=stream_slice, stream_name=self.name
518            )
519        elif isinstance(stream_data, AirbyteMessage) and stream_data.record:
520            return Record(
521                data=stream_data.record.data,  # type:ignore # AirbyteMessage always has record.data
522                associated_slice=stream_slice,
523                stream_name=self.name,
524            )
525        return None
526
527    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
528    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
529        """
530        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
531
532        :param sync_mode:
533        :param cursor_field:
534        :param stream_state:
535        :return:
536        """
537        return self.stream_slicer.stream_slices()
538
539    @property
540    def state(self) -> Mapping[str, Any]:
541        return self.cursor.get_stream_state() if self.cursor else {}
542
543    @state.setter
544    def state(self, value: StreamState) -> None:
545        """State setter, accept state serialized by state getter."""
546        if self.cursor:
547            self.cursor.set_initial_state(value)
548
549    def _parse_records(
550        self,
551        response: Optional[requests.Response],
552        stream_state: Mapping[str, Any],
553        records_schema: Mapping[str, Any],
554        stream_slice: Optional[StreamSlice],
555    ) -> Iterable[Record]:
556        yield from self._parse_response(
557            response,
558            stream_slice=stream_slice,
559            stream_state=stream_state,
560            records_schema=records_schema,
561        )
562
563    def must_deduplicate_query_params(self) -> bool:
564        return True
565
566    @staticmethod
567    def _to_partition_key(to_serialize: Any) -> str:
568        # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
569        return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

Retrieves records by synchronously sending requests to fetch records.

The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.

For each stream slice, submit requests until there are no more pages of records to fetch.

This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.

Attributes:
  • stream_name (str): The stream's name
  • stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
  • requester (Requester): The HTTP requester
  • record_selector (HttpSelector): The record selector
  • paginator (Optional[Paginator]): The paginator
  • stream_slicer (Optional[StreamSlicer]): The stream slicer
  • cursor (Optional[cursor]): The cursor
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
SimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
102    @property  # type: ignore
103    def name(self) -> str:
104        """
105        :return: Stream name
106        """
107        return (
108            str(self._name.eval(self.config))
109            if isinstance(self._name, InterpolatedString)
110            else self._name
111        )
Returns

Stream name

primary_key: Union[str, List[str], List[List[str]], NoneType]
286    @property  # type: ignore
287    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
288        """The stream's primary key"""
289        return self._primary_key

The stream's primary key

ignore_stream_slicer_parameters_on_paginated_requests: bool = False
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
437    def read_records(
438        self,
439        records_schema: Mapping[str, Any],
440        stream_slice: Optional[StreamSlice] = None,
441    ) -> Iterable[StreamData]:
442        """
443        Fetch a stream's records from an HTTP API source
444
445        :param records_schema: json schema to describe record
446        :param stream_slice: The stream slice to read data for
447        :return: The records read from the API source
448        """
449        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
450
451        most_recent_record_from_slice = None
452        record_generator = partial(
453            self._parse_records,
454            stream_slice=stream_slice,
455            stream_state=self.state or {},
456            records_schema=records_schema,
457        )
458
459        if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor):
460            stream_state = self.state
461
462            # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to
463            # fetch more records. The platform deletes stream state for full refresh streams before starting a
464            # new job, so we don't need to worry about this value existing for the initial attempt
465            if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY):
466                return
467
468            yield from self._read_single_page(record_generator, stream_state, _slice)
469        else:
470            for stream_data in self._read_pages(record_generator, self.state, _slice):
471                current_record = self._extract_record(stream_data, _slice)
472                if self.cursor and current_record:
473                    self.cursor.observe(_slice, current_record)
474
475                # Latest record read, not necessarily within slice boundaries.
476                # TODO Remove once all custom components implement `observe` method.
477                # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
478                most_recent_record_from_slice = self._get_most_recent_record(
479                    most_recent_record_from_slice, current_record, _slice
480                )
481                yield stream_data
482
483            if self.cursor:
484                self.cursor.close_slice(_slice, most_recent_record_from_slice)
485        return

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
528    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
529        """
530        Specifies the slices for this stream. See the stream slicing section of the docs for more information.
531
532        :param sync_mode:
533        :param cursor_field:
534        :param stream_state:
535        :return:
536        """
537        return self.stream_slicer.stream_slices()

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
state: Mapping[str, Any]
539    @property
540    def state(self) -> Mapping[str, Any]:
541        return self.cursor.get_stream_state() if self.cursor else {}

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.

def must_deduplicate_query_params(self) -> bool:
563    def must_deduplicate_query_params(self) -> bool:
564        return True
@dataclass
class SimpleRetrieverTestReadDecorator(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
572@dataclass
573class SimpleRetrieverTestReadDecorator(SimpleRetriever):
574    """
575    In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of
576    slices that are queried throughout a read command.
577    """
578
579    maximum_number_of_slices: int = 5
580
581    def __post_init__(self, options: Mapping[str, Any]) -> None:
582        super().__post_init__(options)
583        if self.maximum_number_of_slices and self.maximum_number_of_slices < 1:
584            raise ValueError(
585                f"The maximum number of slices on a test read needs to be strictly positive. Got {self.maximum_number_of_slices}"
586            )
587
588    # stream_slices is defined with arguments on http stream and fixing this has a long tail of dependencies. Will be resolved by the decoupling of http stream and simple retriever
589    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
590        return islice(super().stream_slices(), self.maximum_number_of_slices)
591
592    def _fetch_next_page(
593        self,
594        stream_state: Mapping[str, Any],
595        stream_slice: StreamSlice,
596        next_page_token: Optional[Mapping[str, Any]] = None,
597    ) -> Optional[requests.Response]:
598        return self.requester.send_request(
599            path=self._paginator_path(
600                next_page_token=next_page_token,
601                stream_state=stream_state,
602                stream_slice=stream_slice,
603            ),
604            stream_state=stream_state,
605            stream_slice=stream_slice,
606            next_page_token=next_page_token,
607            request_headers=self._request_headers(
608                stream_state=stream_state,
609                stream_slice=stream_slice,
610                next_page_token=next_page_token,
611            ),
612            request_params=self._request_params(
613                stream_state=stream_state,
614                stream_slice=stream_slice,
615                next_page_token=next_page_token,
616            ),
617            request_body_data=self._request_body_data(
618                stream_state=stream_state,
619                stream_slice=stream_slice,
620                next_page_token=next_page_token,
621            ),
622            request_body_json=self._request_body_json(
623                stream_state=stream_state,
624                stream_slice=stream_slice,
625                next_page_token=next_page_token,
626            ),
627            log_formatter=lambda response: format_http_message(
628                response,
629                f"Stream '{self.name}' request",
630                f"Request performed in order to extract records for stream '{self.name}'",
631                self.name,
632            ),
633        )

In some cases, we want to limit the number of requests that are made to the backend source. This class allows for limiting the number of slices that are queried throughout a read command.

SimpleRetrieverTestReadDecorator( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, maximum_number_of_slices: int = 5)
maximum_number_of_slices: int = 5
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
589    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:  # type: ignore
590        return islice(super().stream_slices(), self.maximum_number_of_slices)

Specifies the slices for this stream. See the stream slicing section of the docs for more information.

Parameters
  • sync_mode:
  • cursor_field:
  • stream_state:
Returns
@dataclass
class AsyncRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 19@dataclass
 20class AsyncRetriever(Retriever):
 21    config: Config
 22    parameters: InitVar[Mapping[str, Any]]
 23    record_selector: RecordSelector
 24    stream_slicer: AsyncJobPartitionRouter
 25    slice_logger: AlwaysLogSliceLogger = field(
 26        init=False,
 27        default_factory=lambda: AlwaysLogSliceLogger(),
 28    )
 29
 30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 31        self._parameters = parameters
 32
 33    @property
 34    def exit_on_rate_limit(self) -> bool:
 35        """
 36        Whether to exit on rate limit. This is a property of the job repository
 37        and not the stream slicer. The stream slicer is responsible for creating
 38        the jobs, but the job repository is responsible for managing the rate
 39        limits and other job-related properties.
 40
 41        Note:
 42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
 43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
 44           to complete the results.
 45        """
 46        job_orchestrator = self.stream_slicer._job_orchestrator
 47        if job_orchestrator is None:
 48            # Default value when orchestrator is not available
 49            return False
 50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore
 51
 52    @exit_on_rate_limit.setter
 53    def exit_on_rate_limit(self, value: bool) -> None:
 54        """
 55        Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
 56        meaning that the Job cannot be placed / created if the rate limit is reached.
 57        Thus no further work on managing jobs is expected to be done.
 58        """
 59        job_orchestrator = self.stream_slicer._job_orchestrator
 60        if job_orchestrator is not None:
 61            job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value  # type: ignore[attr-defined, assignment]
 62
 63    @property
 64    def state(self) -> StreamState:
 65        """
 66        As a first iteration for sendgrid, there is no state to be managed
 67        """
 68        return {}
 69
 70    @state.setter
 71    def state(self, value: StreamState) -> None:
 72        """
 73        As a first iteration for sendgrid, there is no state to be managed
 74        """
 75        pass
 76
 77    def _get_stream_state(self) -> StreamState:
 78        """
 79        Gets the current state of the stream.
 80
 81        Returns:
 82            StreamState: Mapping[str, Any]
 83        """
 84
 85        return self.state
 86
 87    def _validate_and_get_stream_slice_jobs(
 88        self, stream_slice: Optional[StreamSlice] = None
 89    ) -> Iterable[AsyncJob]:
 90        """
 91        Validates the stream_slice argument and returns the partition from it.
 92
 93        Args:
 94            stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
 95
 96        Returns:
 97            AsyncPartition: The partition extracted from the stream_slice.
 98
 99        Raises:
100            AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
101
102        """
103        return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
104
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()
107
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )
AsyncRetriever( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], record_selector: airbyte_cdk.RecordSelector, stream_slicer: airbyte_cdk.sources.declarative.partition_routers.AsyncJobPartitionRouter)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
record_selector: airbyte_cdk.RecordSelector
exit_on_rate_limit: bool
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore

Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.

Note:
  • If the creation_requester cannot place / create the job - it might be the case of the RateLimits
  • If the creation_requester can place / create the job - it means all other requesters should successfully manage to complete the results.
state: Mapping[str, Any]
63    @property
64    def state(self) -> StreamState:
65        """
66        As a first iteration for sendgrid, there is no state to be managed
67        """
68        return {}

As a first iteration for sendgrid, there is no state to be managed

def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
105    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
106        yield from self.stream_slicer.stream_slices()

Returns the stream slices

def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
108    def read_records(
109        self,
110        records_schema: Mapping[str, Any],
111        stream_slice: Optional[StreamSlice] = None,
112    ) -> Iterable[StreamData]:
113        # emit the slice_descriptor log message, for connector builder TestRead
114        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
115
116        stream_state: StreamState = self._get_stream_state()
117        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
118        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
119
120        yield from self.record_selector.filter_and_transform(
121            all_data=records,
122            stream_state=stream_state,
123            records_schema=records_schema,
124            stream_slice=stream_slice,
125        )

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class LazySimpleRetriever(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
636@deprecated(
637    "This class is experimental. Use at your own risk.",
638    category=ExperimentalClassWarning,
639)
640@dataclass
641class LazySimpleRetriever(SimpleRetriever):
642    """
643    A retriever that supports lazy loading from parent streams.
644    """
645
646    def _read_pages(
647        self,
648        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
649        stream_state: Mapping[str, Any],
650        stream_slice: StreamSlice,
651    ) -> Iterable[Record]:
652        response = stream_slice.extra_fields["child_response"]
653        if response:
654            last_page_size, last_record = 0, None
655            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
656                last_page_size += 1
657                last_record = record
658                yield record
659
660            next_page_token = self._next_page_token(response, last_page_size, last_record, None)
661            if next_page_token:
662                yield from self._paginate(
663                    next_page_token,
664                    records_generator_fn,
665                    stream_state,
666                    stream_slice,
667                )
668
669            yield from []
670        else:
671            yield from self._read_pages(records_generator_fn, stream_state, stream_slice)
672
673    def _paginate(
674        self,
675        next_page_token: Any,
676        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
677        stream_state: Mapping[str, Any],
678        stream_slice: StreamSlice,
679    ) -> Iterable[Record]:
680        """Handle pagination by fetching subsequent pages."""
681        pagination_complete = False
682
683        while not pagination_complete:
684            response = self._fetch_next_page(stream_state, stream_slice, next_page_token)
685            last_page_size, last_record = 0, None
686
687            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
688                last_page_size += 1
689                last_record = record
690                yield record
691
692            if not response:
693                pagination_complete = True
694            else:
695                last_page_token_value = (
696                    next_page_token.get("next_page_token") if next_page_token else None
697                )
698                next_page_token = self._next_page_token(
699                    response, last_page_size, last_record, last_page_token_value
700                )
701
702                if not next_page_token:
703                    pagination_complete = True

A retriever that supports lazy loading from parent streams.

LazySimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, cursor: Optional[airbyte_cdk.sources.declarative.incremental.DeclarativeCursor] = None, ignore_stream_slicer_parameters_on_paginated_requests: bool = False)