airbyte_cdk.sources.declarative.retrievers

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
 6from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
 7from airbyte_cdk.sources.declarative.retrievers.simple_retriever import (
 8    LazySimpleRetriever,
 9    SimpleRetriever,
10)
11
12__all__ = [
13    "Retriever",
14    "SimpleRetriever",
15    "AsyncRetriever",
16    "LazySimpleRetriever",
17]
class Retriever:
15class Retriever:
16    """
17    Responsible for fetching a stream's records from an HTTP API source.
18    """
19
20    @abstractmethod
21    def read_records(
22        self,
23        records_schema: Mapping[str, Any],
24        stream_slice: Optional[StreamSlice] = None,
25    ) -> Iterable[StreamData]:
26        """
27        Fetch a stream's records from an HTTP API source
28
29        :param records_schema: json schema to describe record
30        :param stream_slice: The stream slice to read data for
31        :return: The records read from the API source
32        """
33
34    @deprecated("Stream slicing is being moved to the stream level.")
35    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
36        """Does nothing as this method is deprecated, so underlying Retriever implementations
37        do not need to implement this.
38        """
39        yield from []
40
41    @property
42    @deprecated("State management is being moved to the stream level.")
43    def state(self) -> StreamState:
44        """
45        Does nothing as this method is deprecated, so underlying Retriever implementations
46        do not need to implement this.
47        """
48        return {}
49
50    @state.setter
51    @deprecated("State management is being moved to the stream level.")
52    def state(self, value: StreamState) -> None:
53        """
54        Does nothing as this method is deprecated, so underlying Retriever implementations
55        do not need to implement this.
56        """
57        pass

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
20    @abstractmethod
21    def read_records(
22        self,
23        records_schema: Mapping[str, Any],
24        stream_slice: Optional[StreamSlice] = None,
25    ) -> Iterable[StreamData]:
26        """
27        Fetch a stream's records from an HTTP API source
28
29        :param records_schema: json schema to describe record
30        :param stream_slice: The stream slice to read data for
31        :return: The records read from the API source
32        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@deprecated('Stream slicing is being moved to the stream level.')
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
34    @deprecated("Stream slicing is being moved to the stream level.")
35    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
36        """Does nothing as this method is deprecated, so underlying Retriever implementations
37        do not need to implement this.
38        """
39        yield from []

Does nothing as this method is deprecated, so underlying Retriever implementations do not need to implement this.

state: Mapping[str, Any]
41    @property
42    @deprecated("State management is being moved to the stream level.")
43    def state(self) -> StreamState:
44        """
45        Does nothing as this method is deprecated, so underlying Retriever implementations
46        do not need to implement this.
47        """
48        return {}

Does nothing as this method is deprecated, so underlying Retriever implementations do not need to implement this.

@dataclass
class SimpleRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
 55@dataclass
 56class SimpleRetriever(Retriever):
 57    """
 58    Retrieves records by synchronously sending requests to fetch records.
 59
 60    The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.
 61
 62    For each stream slice, submit requests until there are no more pages of records to fetch.
 63
 64    This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery.
 65    As a result, some of the parameters passed to some methods are unused.
 66    The two will be decoupled in a future release.
 67
 68    Attributes:
 69        stream_name (str): The stream's name
 70        stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
 71        requester (Requester): The HTTP requester
 72        record_selector (HttpSelector): The record selector
 73        paginator (Optional[Paginator]): The paginator
 74        stream_slicer (Optional[StreamSlicer]): The stream slicer
 75        parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
 76    """
 77
 78    requester: Requester
 79    record_selector: HttpSelector
 80    config: Config
 81    parameters: InitVar[Mapping[str, Any]]
 82    name: str
 83    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 84    primary_key: Optional[Union[str, List[str], List[List[str]]]]
 85    _primary_key: str = field(init=False, repr=False, default="")
 86    paginator: Optional[Paginator] = None
 87    stream_slicer: StreamSlicer = field(
 88        default_factory=lambda: SinglePartitionRouter(parameters={})
 89    )
 90    request_option_provider: RequestOptionsProvider = field(
 91        default_factory=lambda: DefaultRequestOptionsProvider(parameters={})
 92    )
 93    ignore_stream_slicer_parameters_on_paginated_requests: bool = False
 94    additional_query_properties: Optional[QueryProperties] = None
 95    log_formatter: Optional[Callable[[requests.Response], Any]] = None
 96    pagination_tracker_factory: Callable[[], PaginationTracker] = field(
 97        default_factory=lambda: lambda: PaginationTracker()
 98    )
 99
100    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
101        self._paginator = self.paginator or NoPagination(parameters=parameters)
102        self._parameters = parameters
103        self._name = (
104            InterpolatedString(self._name, parameters=parameters)
105            if isinstance(self._name, str)
106            else self._name
107        )
108
109    @property  # type: ignore
110    def name(self) -> str:
111        """
112        :return: Stream name
113        """
114        return (
115            str(self._name.eval(self.config))
116            if isinstance(self._name, InterpolatedString)
117            else self._name
118        )
119
120    @name.setter
121    def name(self, value: str) -> None:
122        if not isinstance(value, property):
123            self._name = value
124
125    def _get_mapping(
126        self, method: Callable[..., Optional[Union[Mapping[str, Any], str]]], **kwargs: Any
127    ) -> Tuple[Union[Mapping[str, Any], str], Set[str]]:
128        """
129        Get mapping from the provided method, and get the keys of the mapping.
130        If the method returns a string, it will return the string and an empty set.
131        If the method returns a dict, it will return the dict and its keys.
132        """
133        mapping = method(**kwargs) or {}
134        keys = set(mapping.keys()) if not isinstance(mapping, str) else set()
135        return mapping, keys
136
137    def _get_request_options(
138        self,
139        stream_slice: Optional[StreamSlice],
140        next_page_token: Optional[Mapping[str, Any]],
141        paginator_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
142        stream_slicer_method: Callable[..., Optional[Union[Mapping[str, Any], str]]],
143    ) -> Union[Mapping[str, Any], str]:
144        """
145        Get the request_option from the paginator and the stream slicer.
146        Raise a ValueError if there's a key collision
147        Returned merged mapping otherwise
148        """
149        is_body_json = paginator_method.__name__ == "get_request_body_json"
150
151        mappings = [
152            paginator_method(
153                stream_slice=stream_slice,
154                next_page_token=next_page_token,
155            ),
156        ]
157        if not next_page_token or not self.ignore_stream_slicer_parameters_on_paginated_requests:
158            mappings.append(
159                stream_slicer_method(
160                    stream_slice=stream_slice,
161                    next_page_token=next_page_token,
162                )
163            )
164        return combine_mappings(mappings, allow_same_value_merge=is_body_json)
165
166    def _request_headers(
167        self,
168        stream_slice: Optional[StreamSlice] = None,
169        next_page_token: Optional[Mapping[str, Any]] = None,
170    ) -> Mapping[str, Any]:
171        """
172        Specifies request headers.
173        Authentication headers will overwrite any overlapping headers returned from this method.
174        """
175        headers = self._get_request_options(
176            stream_slice,
177            next_page_token,
178            self._paginator.get_request_headers,
179            self.request_option_provider.get_request_headers,
180        )
181        if isinstance(headers, str):
182            raise ValueError("Request headers cannot be a string")
183        return {str(k): str(v) for k, v in headers.items()}
184
185    def _request_params(
186        self,
187        stream_slice: Optional[StreamSlice] = None,
188        next_page_token: Optional[Mapping[str, Any]] = None,
189    ) -> Mapping[str, Any]:
190        """
191        Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.
192
193        E.g: you might want to define query parameters for paging if next_page_token is not None.
194        """
195        params = self._get_request_options(
196            stream_slice,
197            next_page_token,
198            self._paginator.get_request_params,
199            self.request_option_provider.get_request_params,
200        )
201        if isinstance(params, str):
202            raise ValueError("Request params cannot be a string")
203        return params
204
205    def _request_body_data(
206        self,
207        stream_slice: Optional[StreamSlice] = None,
208        next_page_token: Optional[Mapping[str, Any]] = None,
209    ) -> Union[Mapping[str, Any], str]:
210        """
211        Specifies how to populate the body of the request with a non-JSON payload.
212
213        If returns a ready text that it will be sent as is.
214        If returns a dict that it will be converted to a urlencoded form.
215        E.g. {"key1": "value1", "key2": "value2"} => "key1=value1&key2=value2"
216
217        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
218        """
219        return self._get_request_options(
220            stream_slice,
221            next_page_token,
222            self._paginator.get_request_body_data,
223            self.request_option_provider.get_request_body_data,
224        )
225
226    def _request_body_json(
227        self,
228        stream_slice: Optional[StreamSlice] = None,
229        next_page_token: Optional[Mapping[str, Any]] = None,
230    ) -> Optional[Mapping[str, Any]]:
231        """
232        Specifies how to populate the body of the request with a JSON payload.
233
234        At the same time only one of the 'request_body_data' and 'request_body_json' functions can be overridden.
235        """
236        body_json = self._get_request_options(
237            stream_slice,
238            next_page_token,
239            self._paginator.get_request_body_json,
240            self.request_option_provider.get_request_body_json,
241        )
242        if isinstance(body_json, str):
243            raise ValueError("Request body json cannot be a string")
244        return body_json
245
246    def _paginator_path(
247        self,
248        next_page_token: Optional[Mapping[str, Any]] = None,
249        stream_slice: Optional[StreamSlice] = None,
250    ) -> Optional[str]:
251        """
252        If the paginator points to a path, follow it, else return nothing so the requester is used.
253        :param next_page_token:
254        :return:
255        """
256        return self._paginator.path(
257            next_page_token=next_page_token,
258            stream_state={},  # stream_state as an interpolation context is deprecated
259            stream_slice=stream_slice,
260        )
261
262    def _parse_response(
263        self,
264        response: Optional[requests.Response],
265        records_schema: Mapping[str, Any],
266        stream_slice: Optional[StreamSlice] = None,
267        next_page_token: Optional[Mapping[str, Any]] = None,
268    ) -> Iterable[Record]:
269        if not response:
270            yield from []
271        else:
272            yield from self.record_selector.select_records(
273                response=response,
274                stream_state={},  # stream_state as an interpolation context is deprecated
275                records_schema=records_schema,
276                stream_slice=stream_slice,
277                next_page_token=next_page_token,
278            )
279
280    @property  # type: ignore
281    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
282        """The stream's primary key"""
283        return self._primary_key
284
285    @primary_key.setter
286    def primary_key(self, value: str) -> None:
287        if not isinstance(value, property):
288            self._primary_key = value
289
290    def _next_page_token(
291        self,
292        response: requests.Response,
293        last_page_size: int,
294        last_record: Optional[Record],
295        last_page_token_value: Optional[Any],
296    ) -> Optional[Mapping[str, Any]]:
297        """
298        Specifies a pagination strategy.
299
300        The value returned from this method is passed to most other methods in this class. Use it to form a request e.g: set headers or query params.
301
302        :return: The token for the next page from the input response object. Returning None means there are no more pages to read in this response.
303        """
304        return self._paginator.next_page_token(
305            response=response,
306            last_page_size=last_page_size,
307            last_record=last_record,
308            last_page_token_value=last_page_token_value,
309        )
310
311    def _fetch_next_page(
312        self,
313        stream_slice: StreamSlice,
314        next_page_token: Optional[Mapping[str, Any]] = None,
315    ) -> Optional[requests.Response]:
316        return self.requester.send_request(
317            path=self._paginator_path(
318                next_page_token=next_page_token,
319                stream_slice=stream_slice,
320            ),
321            stream_state={},  # stream_state as an interpolation context is deprecated
322            stream_slice=stream_slice,
323            next_page_token=next_page_token,
324            request_headers=self._request_headers(
325                stream_slice=stream_slice,
326                next_page_token=next_page_token,
327            ),
328            request_params=self._request_params(
329                stream_slice=stream_slice,
330                next_page_token=next_page_token,
331            ),
332            request_body_data=self._request_body_data(
333                stream_slice=stream_slice,
334                next_page_token=next_page_token,
335            ),
336            request_body_json=self._request_body_json(
337                stream_slice=stream_slice,
338                next_page_token=next_page_token,
339            ),
340            log_formatter=self.log_formatter,
341        )
342
343    # This logic is similar to _read_pages in the HttpStream class. When making changes here, consider making changes there as well.
344    def _read_pages(
345        self,
346        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
347        stream_slice: StreamSlice,
348    ) -> Iterable[Record]:
349        original_stream_slice = stream_slice
350        pagination_tracker = self.pagination_tracker_factory()
351        reset_pagination = False
352        next_page_token = self._get_initial_next_page_token()
353        while True:
354            merged_records: MutableMapping[str, Any] = defaultdict(dict)
355            last_page_size = 0
356            last_record: Optional[Record] = None
357
358            response = None
359            try:
360                if self.additional_query_properties:
361                    for (
362                        properties
363                    ) in self.additional_query_properties.get_request_property_chunks():
364                        stream_slice = StreamSlice(
365                            partition=stream_slice.partition or {},
366                            cursor_slice=stream_slice.cursor_slice or {},
367                            extra_fields={"query_properties": properties},
368                        )
369                        response = self._fetch_next_page(stream_slice, next_page_token)
370
371                        for current_record in records_generator_fn(response):
372                            if self.additional_query_properties.property_chunking:
373                                merge_key = self.additional_query_properties.property_chunking.get_merge_key(
374                                    current_record
375                                )
376                                if merge_key:
377                                    _deep_merge(merged_records[merge_key], current_record)
378                                else:
379                                    # We should still emit records even if the record did not have a merge key
380                                    pagination_tracker.observe(current_record)
381                                    last_page_size += 1
382                                    last_record = current_record
383                                    yield current_record
384                            else:
385                                pagination_tracker.observe(current_record)
386                                last_page_size += 1
387                                last_record = current_record
388                                yield current_record
389
390                    for merged_record in merged_records.values():
391                        record = Record(
392                            data=merged_record, stream_name=self.name, associated_slice=stream_slice
393                        )
394                        pagination_tracker.observe(record)
395                        last_page_size += 1
396                        last_record = record
397                        yield record
398                else:
399                    response = self._fetch_next_page(stream_slice, next_page_token)
400                    for current_record in records_generator_fn(response):
401                        pagination_tracker.observe(current_record)
402                        last_page_size += 1
403                        last_record = current_record
404                        yield current_record
405            except PaginationResetRequiredException:
406                reset_pagination = True
407            else:
408                if not response:
409                    break
410
411            if reset_pagination or pagination_tracker.has_reached_limit():
412                next_page_token = self._get_initial_next_page_token()
413                previous_slice = stream_slice
414                stream_slice = pagination_tracker.reduce_slice_range_if_possible(
415                    stream_slice, original_stream_slice
416                )
417                LOGGER.info(
418                    f"Hitting PaginationReset event. StreamSlice used will go from {previous_slice} to {stream_slice}"
419                )
420                reset_pagination = False
421            else:
422                last_page_token_value = (
423                    next_page_token.get("next_page_token") if next_page_token else None
424                )
425                next_page_token = self._next_page_token(
426                    response=response,  # type:ignore # we are breaking from the loop on the try/else if there are no response so this should be fine
427                    last_page_size=last_page_size,
428                    last_record=last_record,
429                    last_page_token_value=last_page_token_value,
430                )
431                if not next_page_token:
432                    break
433
434        # Always return an empty generator just in case no records were ever yielded
435        yield from []
436
437    def _get_initial_next_page_token(self) -> Optional[Mapping[str, Any]]:
438        initial_token = self._paginator.get_initial_token()
439        next_page_token = {"next_page_token": initial_token} if initial_token is not None else None
440        return next_page_token
441
442    def read_records(
443        self,
444        records_schema: Mapping[str, Any],
445        stream_slice: Optional[StreamSlice] = None,
446    ) -> Iterable[StreamData]:
447        """
448        Fetch a stream's records from an HTTP API source
449
450        :param records_schema: json schema to describe record
451        :param stream_slice: The stream slice to read data for
452        :return: The records read from the API source
453        """
454        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
455
456        record_generator = partial(
457            self._parse_records,
458            stream_slice=stream_slice,
459            records_schema=records_schema,
460        )
461        yield from self._read_pages(record_generator, _slice)
462
463    def _parse_records(
464        self,
465        response: Optional[requests.Response],
466        records_schema: Mapping[str, Any],
467        stream_slice: Optional[StreamSlice],
468    ) -> Iterable[Record]:
469        yield from self._parse_response(
470            response,
471            stream_slice=stream_slice,
472            records_schema=records_schema,
473        )
474
475    def must_deduplicate_query_params(self) -> bool:
476        return True
477
478    @staticmethod
479    def _to_partition_key(to_serialize: Any) -> str:
480        # separators have changed in Python 3.4. To avoid being impacted by further change, we explicitly specify our own value
481        return json.dumps(to_serialize, indent=None, separators=(",", ":"), sort_keys=True)

Retrieves records by synchronously sending requests to fetch records.

The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the stream slicer.

For each stream slice, submit requests until there are no more pages of records to fetch.

This retriever currently inherits from HttpStream to reuse the request submission and pagination machinery. As a result, some of the parameters passed to some methods are unused. The two will be decoupled in a future release.

Attributes:
  • stream_name (str): The stream's name
  • stream_primary_key (Optional[Union[str, List[str], List[List[str]]]]): The stream's primary key
  • requester (Requester): The HTTP requester
  • record_selector (HttpSelector): The record selector
  • paginator (Optional[Paginator]): The paginator
  • stream_slicer (Optional[StreamSlicer]): The stream slicer
  • parameters (Mapping[str, Any]): Additional runtime parameters to be used for string interpolation
SimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, pagination_tracker_factory: Callable[[], airbyte_cdk.sources.declarative.retrievers.pagination_tracker.PaginationTracker] = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
name: str
109    @property  # type: ignore
110    def name(self) -> str:
111        """
112        :return: Stream name
113        """
114        return (
115            str(self._name.eval(self.config))
116            if isinstance(self._name, InterpolatedString)
117            else self._name
118        )
Returns

Stream name

primary_key: Union[str, List[str], List[List[str]], NoneType]
280    @property  # type: ignore
281    def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]:
282        """The stream's primary key"""
283        return self._primary_key

The stream's primary key

ignore_stream_slicer_parameters_on_paginated_requests: bool = False
log_formatter: Optional[Callable[[requests.models.Response], Any]] = None
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
442    def read_records(
443        self,
444        records_schema: Mapping[str, Any],
445        stream_slice: Optional[StreamSlice] = None,
446    ) -> Iterable[StreamData]:
447        """
448        Fetch a stream's records from an HTTP API source
449
450        :param records_schema: json schema to describe record
451        :param stream_slice: The stream slice to read data for
452        :return: The records read from the API source
453        """
454        _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
455
456        record_generator = partial(
457            self._parse_records,
458            stream_slice=stream_slice,
459            records_schema=records_schema,
460        )
461        yield from self._read_pages(record_generator, _slice)

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

def must_deduplicate_query_params(self) -> bool:
475    def must_deduplicate_query_params(self) -> bool:
476        return True
Inherited Members
Retriever
stream_slices
state
@dataclass
class AsyncRetriever(airbyte_cdk.sources.declarative.retrievers.Retriever):
19@dataclass
20class AsyncRetriever(Retriever):
21    config: Config
22    parameters: InitVar[Mapping[str, Any]]
23    record_selector: RecordSelector
24    stream_slicer: AsyncJobPartitionRouter
25    slice_logger: AlwaysLogSliceLogger = field(
26        init=False,
27        default_factory=lambda: AlwaysLogSliceLogger(),
28    )
29
30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
31        self._parameters = parameters
32
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore
51
52    @exit_on_rate_limit.setter
53    def exit_on_rate_limit(self, value: bool) -> None:
54        """
55        Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
56        meaning that the Job cannot be placed / created if the rate limit is reached.
57        Thus no further work on managing jobs is expected to be done.
58        """
59        job_orchestrator = self.stream_slicer._job_orchestrator
60        if job_orchestrator is not None:
61            job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value  # type: ignore[attr-defined, assignment]
62
63    def _validate_and_get_stream_slice_jobs(
64        self, stream_slice: Optional[StreamSlice] = None
65    ) -> Iterable[AsyncJob]:
66        """
67        Validates the stream_slice argument and returns the partition from it.
68
69        Args:
70            stream_slice (Optional[StreamSlice]): The stream slice to validate and extract the partition from.
71
72        Returns:
73            AsyncPartition: The partition extracted from the stream_slice.
74
75        Raises:
76            AirbyteTracedException: If the stream_slice is not an instance of StreamSlice or if the partition is not present in the stream_slice.
77
78        """
79        return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
80
81    def read_records(
82        self,
83        records_schema: Mapping[str, Any],
84        stream_slice: Optional[StreamSlice] = None,
85    ) -> Iterable[StreamData]:
86        # emit the slice_descriptor log message, for connector builder TestRead
87        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
88
89        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
90        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
91
92        yield from self.record_selector.filter_and_transform(
93            all_data=records,
94            stream_state={},  # stream_state as an interpolation context is deprecated
95            records_schema=records_schema,
96            stream_slice=stream_slice,
97        )
AsyncRetriever( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], record_selector: airbyte_cdk.RecordSelector, stream_slicer: airbyte_cdk.sources.declarative.partition_routers.AsyncJobPartitionRouter)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
record_selector: airbyte_cdk.RecordSelector
exit_on_rate_limit: bool
33    @property
34    def exit_on_rate_limit(self) -> bool:
35        """
36        Whether to exit on rate limit. This is a property of the job repository
37        and not the stream slicer. The stream slicer is responsible for creating
38        the jobs, but the job repository is responsible for managing the rate
39        limits and other job-related properties.
40
41        Note:
42         - If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
43         - If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
44           to complete the results.
45        """
46        job_orchestrator = self.stream_slicer._job_orchestrator
47        if job_orchestrator is None:
48            # Default value when orchestrator is not available
49            return False
50        return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit  # type: ignore

Whether to exit on rate limit. This is a property of the job repository and not the stream slicer. The stream slicer is responsible for creating the jobs, but the job repository is responsible for managing the rate limits and other job-related properties.

Note:
  • If the creation_requester cannot place / create the job - it might be the case of the RateLimits
  • If the creation_requester can place / create the job - it means all other requesters should successfully manage to complete the results.
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
81    def read_records(
82        self,
83        records_schema: Mapping[str, Any],
84        stream_slice: Optional[StreamSlice] = None,
85    ) -> Iterable[StreamData]:
86        # emit the slice_descriptor log message, for connector builder TestRead
87        yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice)  # type: ignore
88
89        jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
90        records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
91
92        yield from self.record_selector.filter_and_transform(
93            all_data=records,
94            stream_state={},  # stream_state as an interpolation context is deprecated
95            records_schema=records_schema,
96            stream_slice=stream_slice,
97        )

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

Inherited Members
Retriever
stream_slices
state
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class LazySimpleRetriever(airbyte_cdk.sources.declarative.retrievers.SimpleRetriever):
504@deprecated(
505    "This class is experimental. Use at your own risk.",
506    category=ExperimentalClassWarning,
507)
508@dataclass
509class LazySimpleRetriever(SimpleRetriever):
510    """
511    A retriever that supports lazy loading from parent streams.
512    """
513
514    def _read_pages(
515        self,
516        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
517        stream_slice: StreamSlice,
518    ) -> Iterable[Record]:
519        response = stream_slice.extra_fields["child_response"]
520        if response:
521            last_page_size, last_record = 0, None
522            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
523                last_page_size += 1
524                last_record = record
525                yield record
526
527            next_page_token = self._next_page_token(response, last_page_size, last_record, None)
528            if next_page_token:
529                yield from self._paginate(
530                    next_page_token,
531                    records_generator_fn,
532                    stream_slice,
533                )
534
535            yield from []
536        else:
537            # coderabbit detected an interesting bug/gap where if we were to not get a child_response, we
538            # might recurse forever. This might not be the case, but it is worth noting that this code path
539            # isn't comprehensively tested.
540            yield from self._read_pages(records_generator_fn, stream_slice)
541
542    def _paginate(
543        self,
544        next_page_token: Any,
545        records_generator_fn: Callable[[Optional[requests.Response]], Iterable[Record]],
546        stream_slice: StreamSlice,
547    ) -> Iterable[Record]:
548        """Handle pagination by fetching subsequent pages."""
549        pagination_complete = False
550
551        while not pagination_complete:
552            response = self._fetch_next_page(stream_slice, next_page_token)
553            last_page_size, last_record = 0, None
554
555            for record in records_generator_fn(response):  # type: ignore[call-arg] # only _parse_records expected as a func
556                last_page_size += 1
557                last_record = record
558                yield record
559
560            if not response:
561                pagination_complete = True
562            else:
563                last_page_token_value = (
564                    next_page_token.get("next_page_token") if next_page_token else None
565                )
566                next_page_token = self._next_page_token(
567                    response, last_page_size, last_record, last_page_token_value
568                )
569
570                if not next_page_token:
571                    pagination_complete = True

A retriever that supports lazy loading from parent streams.

LazySimpleRetriever( requester: airbyte_cdk.Requester, record_selector: airbyte_cdk.sources.declarative.extractors.HttpSelector, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], name: str = <property object>, primary_key: Union[str, List[str], List[List[str]], NoneType] = <property object>, paginator: Optional[airbyte_cdk.sources.declarative.requesters.paginators.Paginator] = None, stream_slicer: airbyte_cdk.sources.declarative.stream_slicers.StreamSlicer = <factory>, request_option_provider: airbyte_cdk.sources.declarative.requesters.request_options.RequestOptionsProvider = <factory>, ignore_stream_slicer_parameters_on_paginated_requests: bool = False, additional_query_properties: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.QueryProperties] = None, log_formatter: Optional[Callable[[requests.models.Response], Any]] = None, pagination_tracker_factory: Callable[[], airbyte_cdk.sources.declarative.retrievers.pagination_tracker.PaginationTracker] = <factory>)