airbyte_cdk.sources.declarative.checks

 1#
 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
10from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
11from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
12from airbyte_cdk.sources.declarative.models import (
13    CheckDynamicStream as CheckDynamicStreamModel,
14)
15from airbyte_cdk.sources.declarative.models import (
16    CheckStream as CheckStreamModel,
17)
18
19COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
20    "CheckStream": CheckStreamModel,
21    "CheckDynamicStream": CheckDynamicStreamModel,
22}
23
24__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
@dataclass
class CheckStream(airbyte_cdk.sources.declarative.checks.ConnectionChecker):
16@dataclass
17class CheckStream(ConnectionChecker):
18    """
19    Checks the connections by checking availability of one or many streams selected by the developer
20
21    Attributes:
22        stream_name (List[str]): names of streams to check
23    """
24
25    stream_names: List[str]
26    parameters: InitVar[Mapping[str, Any]]
27
28    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29        self._parameters = parameters
30
31    def check_connection(
32        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
33    ) -> Tuple[bool, Any]:
34        streams = source.streams(config=config)
35        stream_name_to_stream = {s.name: s for s in streams}
36        if len(streams) == 0:
37            return False, f"No streams to connect to from source {source}"
38        for stream_name in self.stream_names:
39            if stream_name not in stream_name_to_stream.keys():
40                raise ValueError(
41                    f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}."
42                )
43
44            stream = stream_name_to_stream[stream_name]
45            availability_strategy = HttpAvailabilityStrategy()
46            try:
47                stream_is_available, reason = availability_strategy.check_availability(
48                    stream, logger
49                )
50                if not stream_is_available:
51                    return False, reason
52            except Exception as error:
53                logger.error(
54                    f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
55                )
56                return False, f"Unable to connect to stream {stream_name} - {error}"
57        return True, None

Checks the connections by checking availability of one or many streams selected by the developer

Attributes:
  • stream_name (List[str]): names of streams to check
CheckStream( stream_names: List[str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_names: List[str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def check_connection( self, source: airbyte_cdk.AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
31    def check_connection(
32        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
33    ) -> Tuple[bool, Any]:
34        streams = source.streams(config=config)
35        stream_name_to_stream = {s.name: s for s in streams}
36        if len(streams) == 0:
37            return False, f"No streams to connect to from source {source}"
38        for stream_name in self.stream_names:
39            if stream_name not in stream_name_to_stream.keys():
40                raise ValueError(
41                    f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}."
42                )
43
44            stream = stream_name_to_stream[stream_name]
45            availability_strategy = HttpAvailabilityStrategy()
46            try:
47                stream_is_available, reason = availability_strategy.check_availability(
48                    stream, logger
49                )
50                if not stream_is_available:
51                    return False, reason
52            except Exception as error:
53                logger.error(
54                    f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
55                )
56                return False, f"Unable to connect to stream {stream_name} - {error}"
57        return True, None

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

@dataclass
class CheckDynamicStream(airbyte_cdk.sources.declarative.checks.ConnectionChecker):
16@dataclass
17class CheckDynamicStream(ConnectionChecker):
18    """
19    Checks the connections by checking availability of one or many dynamic streams
20
21    Attributes:
22        stream_count (int): numbers of streams to check
23    """
24
25    # TODO: Add field stream_names to check_connection for static streams
26    #  https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483
27
28    stream_count: int
29    parameters: InitVar[Mapping[str, Any]]
30    use_check_availability: bool = True
31
32    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
33        self._parameters = parameters
34
35    def check_connection(
36        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
37    ) -> Tuple[bool, Any]:
38        streams = source.streams(config=config)
39
40        if len(streams) == 0:
41            return False, f"No streams to connect to from source {source}"
42        if not self.use_check_availability:
43            return True, None
44
45        availability_strategy = HttpAvailabilityStrategy()
46
47        try:
48            for stream in streams[: min(self.stream_count, len(streams))]:
49                stream_is_available, reason = availability_strategy.check_availability(
50                    stream, logger
51                )
52                if not stream_is_available:
53                    logger.warning(f"Stream {stream.name} is not available: {reason}")
54                    return False, reason
55        except Exception as error:
56            error_message = (
57                f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
58            )
59            logger.error(error_message, exc_info=True)
60            return False, error_message
61
62        return True, None

Checks the connections by checking availability of one or many dynamic streams

Attributes:
  • stream_count (int): numbers of streams to check
CheckDynamicStream( stream_count: int, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], use_check_availability: bool = True)
stream_count: int
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
use_check_availability: bool = True
def check_connection( self, source: airbyte_cdk.AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
35    def check_connection(
36        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
37    ) -> Tuple[bool, Any]:
38        streams = source.streams(config=config)
39
40        if len(streams) == 0:
41            return False, f"No streams to connect to from source {source}"
42        if not self.use_check_availability:
43            return True, None
44
45        availability_strategy = HttpAvailabilityStrategy()
46
47        try:
48            for stream in streams[: min(self.stream_count, len(streams))]:
49                stream_is_available, reason = availability_strategy.check_availability(
50                    stream, logger
51                )
52                if not stream_is_available:
53                    logger.warning(f"Stream {stream.name} is not available: {reason}")
54                    return False, reason
55        except Exception as error:
56            error_message = (
57                f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
58            )
59            logger.error(error_message, exc_info=True)
60            return False, error_message
61
62        return True, None

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

class ConnectionChecker(abc.ABC):
13class ConnectionChecker(ABC):
14    """
15    Abstract base class for checking a connection
16    """
17
18    @abstractmethod
19    def check_connection(
20        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
21    ) -> Tuple[bool, Any]:
22        """
23        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
24        to the Stripe API.
25
26        :param source: source
27        :param logger: source logger
28        :param config: The user-provided configuration as specified by the source's spec.
29          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
30        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
31          and we can connect to the underlying data source using the provided configuration.
32          Otherwise, the input config cannot be used to connect to the underlying data source,
33          and the "error" object should describe what went wrong.
34          The error object will be cast to string to display the problem to the user.
35        """
36        pass

Abstract base class for checking a connection

@abstractmethod
def check_connection( self, source: airbyte_cdk.AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
18    @abstractmethod
19    def check_connection(
20        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
21    ) -> Tuple[bool, Any]:
22        """
23        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
24        to the Stripe API.
25
26        :param source: source
27        :param logger: source logger
28        :param config: The user-provided configuration as specified by the source's spec.
29          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
30        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
31          and we can connect to the underlying data source using the provided configuration.
32          Otherwise, the input config cannot be used to connect to the underlying data source,
33          and the "error" object should describe what went wrong.
34          The error object will be cast to string to display the problem to the user.
35        """
36        pass

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.