airbyte_cdk.sources.source

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5
 6import logging
 7from abc import ABC, abstractmethod
 8from typing import Any, Generic, Iterable, List, Mapping, Optional, TypeVar
 9
10from airbyte_cdk.connector import BaseConnector, DefaultConnectorMixin, TConfig
11from airbyte_cdk.models import (
12    AirbyteCatalog,
13    AirbyteMessage,
14    AirbyteStateMessage,
15    AirbyteStateMessageSerializer,
16    ConfiguredAirbyteCatalog,
17    ConfiguredAirbyteCatalogSerializer,
18)
19
20TState = TypeVar("TState")
21TCatalog = TypeVar("TCatalog")
22
23
24class ExperimentalClassWarning(DeprecationWarning):
25    pass
26
27
28class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]):
29    @abstractmethod
30    def read_state(self, state_path: str) -> TState: ...
31
32    @abstractmethod
33    def read_catalog(self, catalog_path: str) -> TCatalog: ...
34
35    @abstractmethod
36    def read(
37        self,
38        logger: logging.Logger,
39        config: TConfig,
40        catalog: TCatalog,
41        state: Optional[TState] = None,
42    ) -> Iterable[AirbyteMessage]:
43        """
44        Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
45        """
46
47    @abstractmethod
48    def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog:
49        """
50        Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
51        Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
52        """
53
54
55class Source(
56    DefaultConnectorMixin,
57    BaseSource[Mapping[str, Any], List[AirbyteStateMessage], ConfiguredAirbyteCatalog],
58    ABC,
59):
60    # can be overridden to change an input state.
61    @classmethod
62    def read_state(cls, state_path: str) -> List[AirbyteStateMessage]:
63        """
64        Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either
65        a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the
66        incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
67        :param state_path: The filepath to where the stream states are located
68        :return: The complete stream state based on the connector's previous sync
69        """
70        parsed_state_messages = []
71        if state_path:
72            state_obj = BaseConnector._read_json_file(state_path)
73            if state_obj:
74                for state in state_obj:  # type: ignore  # `isinstance(state_obj, List)` ensures that this is a list
75                    parsed_message = AirbyteStateMessageSerializer.load(state)
76                    if (
77                        not parsed_message.stream
78                        and not parsed_message.data
79                        and not parsed_message.global_
80                    ):
81                        raise ValueError(
82                            "AirbyteStateMessage should contain either a stream, global, or state field"
83                        )
84                    parsed_state_messages.append(parsed_message)
85        return parsed_state_messages
86
87    # can be overridden to change an input catalog
88    @classmethod
89    def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
90        return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path))
91
92    @property
93    def name(self) -> str:
94        """Source name"""
95        return self.__class__.__name__
class ExperimentalClassWarning(builtins.DeprecationWarning):
25class ExperimentalClassWarning(DeprecationWarning):
26    pass

Base class for warnings about deprecated features.

class BaseSource(airbyte_cdk.connector.BaseConnector[~TConfig], abc.ABC, typing.Generic[~TConfig, ~TState, ~TCatalog]):
29class BaseSource(BaseConnector[TConfig], ABC, Generic[TConfig, TState, TCatalog]):
30    @abstractmethod
31    def read_state(self, state_path: str) -> TState: ...
32
33    @abstractmethod
34    def read_catalog(self, catalog_path: str) -> TCatalog: ...
35
36    @abstractmethod
37    def read(
38        self,
39        logger: logging.Logger,
40        config: TConfig,
41        catalog: TCatalog,
42        state: Optional[TState] = None,
43    ) -> Iterable[AirbyteMessage]:
44        """
45        Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
46        """
47
48    @abstractmethod
49    def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog:
50        """
51        Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
52        Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
53        """

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def read_state(self, state_path: str) -> ~TState:
30    @abstractmethod
31    def read_state(self, state_path: str) -> TState: ...
@abstractmethod
def read_catalog(self, catalog_path: str) -> ~TCatalog:
33    @abstractmethod
34    def read_catalog(self, catalog_path: str) -> TCatalog: ...
@abstractmethod
def read( self, logger: logging.Logger, config: ~TConfig, catalog: ~TCatalog, state: Optional[~TState] = None) -> Iterable[airbyte_cdk.AirbyteMessage]:
36    @abstractmethod
37    def read(
38        self,
39        logger: logging.Logger,
40        config: TConfig,
41        catalog: TCatalog,
42        state: Optional[TState] = None,
43    ) -> Iterable[AirbyteMessage]:
44        """
45        Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.
46        """

Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, catalog, and state.

@abstractmethod
def discover( self, logger: logging.Logger, config: ~TConfig) -> airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteCatalog:
48    @abstractmethod
49    def discover(self, logger: logging.Logger, config: TConfig) -> AirbyteCatalog:
50        """
51        Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a
52        Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
53        """

Returns an AirbyteCatalog representing the available streams and fields in this integration. For example, given valid credentials to a Postgres database, returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.

56class Source(
57    DefaultConnectorMixin,
58    BaseSource[Mapping[str, Any], List[AirbyteStateMessage], ConfiguredAirbyteCatalog],
59    ABC,
60):
61    # can be overridden to change an input state.
62    @classmethod
63    def read_state(cls, state_path: str) -> List[AirbyteStateMessage]:
64        """
65        Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either
66        a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the
67        incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
68        :param state_path: The filepath to where the stream states are located
69        :return: The complete stream state based on the connector's previous sync
70        """
71        parsed_state_messages = []
72        if state_path:
73            state_obj = BaseConnector._read_json_file(state_path)
74            if state_obj:
75                for state in state_obj:  # type: ignore  # `isinstance(state_obj, List)` ensures that this is a list
76                    parsed_message = AirbyteStateMessageSerializer.load(state)
77                    if (
78                        not parsed_message.stream
79                        and not parsed_message.data
80                        and not parsed_message.global_
81                    ):
82                        raise ValueError(
83                            "AirbyteStateMessage should contain either a stream, global, or state field"
84                        )
85                    parsed_state_messages.append(parsed_message)
86        return parsed_state_messages
87
88    # can be overridden to change an input catalog
89    @classmethod
90    def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
91        return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path))
92
93    @property
94    def name(self) -> str:
95        """Source name"""
96        return self.__class__.__name__

Helper class that provides a standard way to create an ABC using inheritance.

@classmethod
def read_state( cls, state_path: str) -> List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]:
62    @classmethod
63    def read_state(cls, state_path: str) -> List[AirbyteStateMessage]:
64        """
65        Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either
66        a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the
67        incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).
68        :param state_path: The filepath to where the stream states are located
69        :return: The complete stream state based on the connector's previous sync
70        """
71        parsed_state_messages = []
72        if state_path:
73            state_obj = BaseConnector._read_json_file(state_path)
74            if state_obj:
75                for state in state_obj:  # type: ignore  # `isinstance(state_obj, List)` ensures that this is a list
76                    parsed_message = AirbyteStateMessageSerializer.load(state)
77                    if (
78                        not parsed_message.stream
79                        and not parsed_message.data
80                        and not parsed_message.global_
81                    ):
82                        raise ValueError(
83                            "AirbyteStateMessage should contain either a stream, global, or state field"
84                        )
85                    parsed_state_messages.append(parsed_message)
86        return parsed_state_messages

Retrieves the input state of a sync by reading from the specified JSON file. Incoming state can be deserialized into either a JSON object for legacy state input or as a list of AirbyteStateMessages for the per-stream state format. Regardless of the incoming input type, it will always be transformed and output as a list of AirbyteStateMessage(s).

Parameters
  • state_path: The filepath to where the stream states are located
Returns

The complete stream state based on the connector's previous sync

@classmethod
def read_catalog( cls, catalog_path: str) -> airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog:
89    @classmethod
90    def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
91        return ConfiguredAirbyteCatalogSerializer.load(cls._read_json_file(catalog_path))
name: str
93    @property
94    def name(self) -> str:
95        """Source name"""
96        return self.__class__.__name__

Source name