airbyte_cdk.sources.declarative.checks

 1#
 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
10from airbyte_cdk.sources.declarative.checks.check_stream import (
11    CheckStream,
12    DynamicStreamCheckConfig,
13)
14from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
15from airbyte_cdk.sources.declarative.models import (
16    CheckDynamicStream as CheckDynamicStreamModel,
17)
18from airbyte_cdk.sources.declarative.models import (
19    CheckStream as CheckStreamModel,
20)
21
22COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
23    "CheckStream": CheckStreamModel,
24    "CheckDynamicStream": CheckDynamicStreamModel,
25}
26
27__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]
@dataclass
class CheckStream(airbyte_cdk.sources.declarative.checks.ConnectionChecker):
 26@dataclass
 27class CheckStream(ConnectionChecker):
 28    """
 29    Checks the connections by checking availability of one or many streams selected by the developer
 30
 31    Attributes:
 32        stream_name (List[str]): names of streams to check
 33    """
 34
 35    stream_names: List[str]
 36    parameters: InitVar[Mapping[str, Any]]
 37    dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
 38
 39    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 40        self._parameters = parameters
 41        if self.dynamic_streams_check_configs is None:
 42            self.dynamic_streams_check_configs = []
 43
 44    def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
 45        """Logs an error and returns a formatted error message."""
 46        error_message = f"Encountered an error while {action}. Error: {error}"
 47        logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
 48        return False, error_message
 49
 50    def check_connection(
 51        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
 52    ) -> Tuple[bool, Any]:
 53        """Checks the connection to the source and its streams."""
 54        try:
 55            streams = source.streams(config=config)
 56            if not streams:
 57                return False, f"No streams to connect to from source {source}"
 58        except Exception as error:
 59            return self._log_error(logger, "discovering streams", error)
 60
 61        stream_name_to_stream = {s.name: s for s in streams}
 62        for stream_name in self.stream_names:
 63            if stream_name not in stream_name_to_stream:
 64                raise ValueError(
 65                    f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
 66                )
 67
 68            stream_availability, message = self._check_stream_availability(
 69                stream_name_to_stream, stream_name, logger
 70            )
 71            if not stream_availability:
 72                return stream_availability, message
 73
 74        should_check_dynamic_streams = (
 75            hasattr(source, "resolved_manifest")
 76            and hasattr(source, "dynamic_streams")
 77            and self.dynamic_streams_check_configs
 78        )
 79
 80        if should_check_dynamic_streams:
 81            return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
 82
 83        return True, None
 84
 85    def _check_stream_availability(
 86        self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
 87    ) -> Tuple[bool, Any]:
 88        """Checks if streams are available."""
 89        availability_strategy = HttpAvailabilityStrategy()
 90        try:
 91            stream = stream_name_to_stream[stream_name]
 92            stream_is_available, reason = availability_strategy.check_availability(stream, logger)
 93            if not stream_is_available:
 94                message = f"Stream {stream_name} is not available: {reason}"
 95                logger.warning(message)
 96                return stream_is_available, message
 97        except Exception as error:
 98            return self._log_error(logger, f"checking availability of stream {stream_name}", error)
 99        return True, None
100
101    def _check_dynamic_streams_availability(
102        self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
103    ) -> Tuple[bool, Any]:
104        """Checks the availability of dynamic streams."""
105        dynamic_streams = source.resolved_manifest.get("dynamic_streams", [])  # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
106        dynamic_stream_name_to_dynamic_stream = {
107            ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
108        }
109        generated_streams = self._map_generated_streams(source.dynamic_streams)  # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
110
111        for check_config in self.dynamic_streams_check_configs:  # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
112            if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
113                return (
114                    False,
115                    f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.",
116                )
117
118            generated = generated_streams.get(check_config.dynamic_stream_name, [])
119            stream_availability, message = self._check_generated_streams_availability(
120                generated, stream_name_to_stream, logger, check_config.stream_count
121            )
122            if not stream_availability:
123                return stream_availability, message
124
125        return True, None
126
127    def _map_generated_streams(
128        self, dynamic_streams: List[Dict[str, Any]]
129    ) -> Dict[str, List[Dict[str, Any]]]:
130        """Maps dynamic stream names to their corresponding generated streams."""
131        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
132        for stream in dynamic_streams:
133            mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream)
134        return mapped_streams
135
136    def _check_generated_streams_availability(
137        self,
138        generated_streams: List[Dict[str, Any]],
139        stream_name_to_stream: Dict[str, Any],
140        logger: logging.Logger,
141        max_count: int,
142    ) -> Tuple[bool, Any]:
143        """Checks availability of generated dynamic streams."""
144        availability_strategy = HttpAvailabilityStrategy()
145        for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
146            stream = stream_name_to_stream[declarative_stream["name"]]
147            try:
148                stream_is_available, reason = availability_strategy.check_availability(
149                    stream, logger
150                )
151                if not stream_is_available:
152                    message = f"Dynamic Stream {stream.name} is not available: {reason}"
153                    logger.warning(message)
154                    return False, message
155            except Exception as error:
156                return self._log_error(
157                    logger, f"checking availability of dynamic stream {stream.name}", error
158                )
159        return True, None

Checks the connections by checking availability of one or many streams selected by the developer

Attributes:
  • stream_name (List[str]): names of streams to check
CheckStream( stream_names: List[str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None)
stream_names: List[str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
def check_connection( self, source: airbyte_cdk.AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
50    def check_connection(
51        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
52    ) -> Tuple[bool, Any]:
53        """Checks the connection to the source and its streams."""
54        try:
55            streams = source.streams(config=config)
56            if not streams:
57                return False, f"No streams to connect to from source {source}"
58        except Exception as error:
59            return self._log_error(logger, "discovering streams", error)
60
61        stream_name_to_stream = {s.name: s for s in streams}
62        for stream_name in self.stream_names:
63            if stream_name not in stream_name_to_stream:
64                raise ValueError(
65                    f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
66                )
67
68            stream_availability, message = self._check_stream_availability(
69                stream_name_to_stream, stream_name, logger
70            )
71            if not stream_availability:
72                return stream_availability, message
73
74        should_check_dynamic_streams = (
75            hasattr(source, "resolved_manifest")
76            and hasattr(source, "dynamic_streams")
77            and self.dynamic_streams_check_configs
78        )
79
80        if should_check_dynamic_streams:
81            return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
82
83        return True, None

Checks the connection to the source and its streams.

@dataclass
class CheckDynamicStream(airbyte_cdk.sources.declarative.checks.ConnectionChecker):
16@dataclass
17class CheckDynamicStream(ConnectionChecker):
18    """
19    Checks the connections by checking availability of one or many dynamic streams
20
21    Attributes:
22        stream_count (int): numbers of streams to check
23    """
24
25    # TODO: Add field stream_names to check_connection for static streams
26    #  https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483
27
28    stream_count: int
29    parameters: InitVar[Mapping[str, Any]]
30    use_check_availability: bool = True
31
32    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
33        self._parameters = parameters
34
35    def check_connection(
36        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
37    ) -> Tuple[bool, Any]:
38        streams = source.streams(config=config)
39
40        if len(streams) == 0:
41            return False, f"No streams to connect to from source {source}"
42        if not self.use_check_availability:
43            return True, None
44
45        availability_strategy = HttpAvailabilityStrategy()
46
47        try:
48            for stream in streams[: min(self.stream_count, len(streams))]:
49                stream_is_available, reason = availability_strategy.check_availability(
50                    stream, logger
51                )
52                if not stream_is_available:
53                    logger.warning(f"Stream {stream.name} is not available: {reason}")
54                    return False, reason
55        except Exception as error:
56            error_message = (
57                f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
58            )
59            logger.error(error_message, exc_info=True)
60            return False, error_message
61
62        return True, None

Checks the connections by checking availability of one or many dynamic streams

Attributes:
  • stream_count (int): numbers of streams to check
CheckDynamicStream( stream_count: int, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], use_check_availability: bool = True)
stream_count: int
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
use_check_availability: bool = True
def check_connection( self, source: airbyte_cdk.AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
35    def check_connection(
36        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
37    ) -> Tuple[bool, Any]:
38        streams = source.streams(config=config)
39
40        if len(streams) == 0:
41            return False, f"No streams to connect to from source {source}"
42        if not self.use_check_availability:
43            return True, None
44
45        availability_strategy = HttpAvailabilityStrategy()
46
47        try:
48            for stream in streams[: min(self.stream_count, len(streams))]:
49                stream_is_available, reason = availability_strategy.check_availability(
50                    stream, logger
51                )
52                if not stream_is_available:
53                    logger.warning(f"Stream {stream.name} is not available: {reason}")
54                    return False, reason
55        except Exception as error:
56            error_message = (
57                f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
58            )
59            logger.error(error_message, exc_info=True)
60            return False, error_message
61
62        return True, None

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

class ConnectionChecker(abc.ABC):
13class ConnectionChecker(ABC):
14    """
15    Abstract base class for checking a connection
16    """
17
18    @abstractmethod
19    def check_connection(
20        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
21    ) -> Tuple[bool, Any]:
22        """
23        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
24        to the Stripe API.
25
26        :param source: source
27        :param logger: source logger
28        :param config: The user-provided configuration as specified by the source's spec.
29          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
30        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
31          and we can connect to the underlying data source using the provided configuration.
32          Otherwise, the input config cannot be used to connect to the underlying data source,
33          and the "error" object should describe what went wrong.
34          The error object will be cast to string to display the problem to the user.
35        """
36        pass

Abstract base class for checking a connection

@abstractmethod
def check_connection( self, source: airbyte_cdk.AbstractSource, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
18    @abstractmethod
19    def check_connection(
20        self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
21    ) -> Tuple[bool, Any]:
22        """
23        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
24        to the Stripe API.
25
26        :param source: source
27        :param logger: source logger
28        :param config: The user-provided configuration as specified by the source's spec.
29          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
30        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
31          and we can connect to the underlying data source using the provided configuration.
32          Otherwise, the input config cannot be used to connect to the underlying data source,
33          and the "error" object should describe what went wrong.
34          The error object will be cast to string to display the problem to the user.
35        """
36        pass

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

@dataclass(frozen=True)
class DynamicStreamCheckConfig:
16@dataclass(frozen=True)
17class DynamicStreamCheckConfig:
18    """Defines the configuration for dynamic stream during connection checking. This class specifies
19    what dynamic streams  in the stream template should be updated with value, supporting dynamic interpolation
20    and type enforcement."""
21
22    dynamic_stream_name: str
23    stream_count: int = 0

Defines the configuration for dynamic stream during connection checking. This class specifies what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

DynamicStreamCheckConfig(dynamic_stream_name: str, stream_count: int = 0)
dynamic_stream_name: str
stream_count: int = 0