airbyte_cdk.sources.declarative.retrievers.retriever

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from abc import abstractmethod
 6from typing import Any, Iterable, Mapping, Optional
 7
 8from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import StreamSlice
 9from airbyte_cdk.sources.streams.core import StreamData
10from airbyte_cdk.sources.types import StreamState
11
12
13class Retriever:
14    """
15    Responsible for fetching a stream's records from an HTTP API source.
16    """
17
18    @abstractmethod
19    def read_records(
20        self,
21        records_schema: Mapping[str, Any],
22        stream_slice: Optional[StreamSlice] = None,
23    ) -> Iterable[StreamData]:
24        """
25        Fetch a stream's records from an HTTP API source
26
27        :param records_schema: json schema to describe record
28        :param stream_slice: The stream slice to read data for
29        :return: The records read from the API source
30        """
31
32    @abstractmethod
33    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
34        """Returns the stream slices"""
35
36    @property
37    @abstractmethod
38    def state(self) -> StreamState:
39        """State getter, should return state in form that can serialized to a string and send to the output
40        as a STATE AirbyteMessage.
41
42        A good example of a state is a cursor_value:
43            {
44                self.cursor_field: "cursor_value"
45            }
46
47         State should try to be as small as possible but at the same time descriptive enough to restore
48         syncing process from the point where it stopped.
49        """
50
51    @state.setter
52    @abstractmethod
53    def state(self, value: StreamState) -> None:
54        """State setter, accept state serialized by state getter."""
class Retriever:
14class Retriever:
15    """
16    Responsible for fetching a stream's records from an HTTP API source.
17    """
18
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """
32
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""
36
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """
51
52    @state.setter
53    @abstractmethod
54    def state(self, value: StreamState) -> None:
55        """State setter, accept state serialized by state getter."""

Responsible for fetching a stream's records from an HTTP API source.

@abstractmethod
def read_records( self, records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[Union[Mapping[str, Any], airbyte_cdk.AirbyteMessage]]:
19    @abstractmethod
20    def read_records(
21        self,
22        records_schema: Mapping[str, Any],
23        stream_slice: Optional[StreamSlice] = None,
24    ) -> Iterable[StreamData]:
25        """
26        Fetch a stream's records from an HTTP API source
27
28        :param records_schema: json schema to describe record
29        :param stream_slice: The stream slice to read data for
30        :return: The records read from the API source
31        """

Fetch a stream's records from an HTTP API source

Parameters
  • records_schema: json schema to describe record
  • stream_slice: The stream slice to read data for
Returns

The records read from the API source

@abstractmethod
def stream_slices(self) -> Iterable[Optional[airbyte_cdk.StreamSlice]]:
33    @abstractmethod
34    def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
35        """Returns the stream slices"""

Returns the stream slices

state: Mapping[str, Any]
37    @property
38    @abstractmethod
39    def state(self) -> StreamState:
40        """State getter, should return state in form that can serialized to a string and send to the output
41        as a STATE AirbyteMessage.
42
43        A good example of a state is a cursor_value:
44            {
45                self.cursor_field: "cursor_value"
46            }
47
48         State should try to be as small as possible but at the same time descriptive enough to restore
49         syncing process from the point where it stopped.
50        """

State getter, should return state in form that can serialized to a string and send to the output as a STATE AirbyteMessage.

A good example of a state is a cursor_value: { self.cursor_field: "cursor_value" }

State should try to be as small as possible but at the same time descriptive enough to restore syncing process from the point where it stopped.