airbyte_cdk.sources.declarative.retrievers.retriever

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import abstractmethod
 6from typing import Any, Iterable, Mapping, Optional
 7
 8from typing_extensions import deprecated
 9
10from airbyte_cdk.sources.streams.core import StreamData
11from airbyte_cdk.sources.types import StreamSlice, StreamState
12
13
14class Retriever:
15    """
16    Responsible for fetching a stream's records from an HTTP API source.
17    """
18
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """
32
33    @abstractmethod
34    @deprecated("Stream slicing is being moved to the stream level.")
35    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
36        """Returns the stream slices"""
37
38    @property
39    @abstractmethod
40    @deprecated("State management is being moved to the stream level.")
41    def state(self) -> StreamState:
42        """State getter, should return state in form that can serialized to a string and send to the output
43        as a STATE AirbyteMessage.
44
45        A good example of a state is a cursor_value:
46            {
47                self.cursor_field: "cursor_value"
48            }
49
50         State should try to be as small as possible but at the same time descriptive enough to restore
51         syncing process from the point where it stopped.
52        """
53
54    @state.setter
55    @abstractmethod
56    @deprecated("State management is being moved to the stream level.")
57    def state(self, value: StreamState) -> None:
58        """State setter, accept state serialized by state getter."""
class Retriever:
15class Retriever:
16    """
17    Responsible for fetching a stream's records from an HTTP API source.
18    """
19
20    @abstractmethod
21    def read_records(
22        self,
23        records_schema: Mapping[str, Any],
24        stream_slice: Optional[StreamSlice] = None,
25    ) -> Iterable[StreamData]:
26        """
27        Fetch a stream's records from an HTTP API source
28
29        :param records_schema: json schema to describe record
30        :param stream_slice: The stream slice to read data for
31        :return: The records read from the API source
32        """
33
34    @abstractmethod
35    @deprecated("Stream slicing is being moved to the stream level.")
36    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
37        """Returns the stream slices"""
38
39    @property
40    @abstractmethod
41    @deprecated("State management is being moved to the stream level.")
42    def state(self) -> StreamState:
43        """State getter, should return state in form that can serialized to a string and send to the output
44        as a STATE AirbyteMessage.
45
46        A good example of a state is a cursor_value:
47            {
48                self.cursor_field: "cursor_value"
49            }
50
51         State should try to be as small as possible but at the same time descriptive enough to restore
52         syncing process from the point where it stopped.
53        """
54
55    @state.setter
56    @abstractmethod
57    @deprecated("State management is being moved to the stream level.")
58    def state(self, value: StreamState) -> None:
59        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
20    @abstractmethod
21    def read_records(
22        self,
23        records_schema: Mapping[str, Any],
24        stream_slice: Optional[StreamSlice] = None,
25    ) -> Iterable[StreamData]:
26        """
27        Fetch a stream's records from an HTTP API source
28
29        :param records_schema: json schema to describe record
30        :param stream_slice: The stream slice to read data for
31        :return: The records read from the API source
32        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
@deprecated('Stream slicing is being moved to the stream level.')
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
34    @abstractmethod
35    @deprecated("Stream slicing is being moved to the stream level.")
36    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
37        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
39    @property
40    @abstractmethod
41    @deprecated("State management is being moved to the stream level.")
42    def state(self) -> StreamState:
43        """State getter, should return state in form that can serialized to a string and send to the output
44        as a STATE AirbyteMessage.
45
46        A good example of a state is a cursor_value:
47            {
48                self.cursor_field: "cursor_value"
49            }
50
51         State should try to be as small as possible but at the same time descriptive enough to restore
52         syncing process from the point where it stopped.
53        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.