airbyte_cdk.sources.declarative.checks

 1#
 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
10from airbyte_cdk.sources.declarative.checks.check_stream import (
11    CheckStream,
12    DynamicStreamCheckConfig,
13)
14from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
15from airbyte_cdk.sources.declarative.models import (
16    CheckDynamicStream as CheckDynamicStreamModel,
17)
18from airbyte_cdk.sources.declarative.models import (
19    CheckStream as CheckStreamModel,
20)
21
22COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
23    "CheckStream": CheckStreamModel,
24    "CheckDynamicStream": CheckDynamicStreamModel,
25}
26
27__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]
@dataclass
class CheckStream(airbyte_cdk.sources.declarative.checks.ConnectionChecker):
 45@dataclass
 46class CheckStream(ConnectionChecker):
 47    """
 48    Checks the connections by checking availability of one or many streams selected by the developer
 49
 50    Attributes:
 51        stream_name (List[str]): names of streams to check
 52    """
 53
 54    stream_names: List[str]
 55    parameters: InitVar[Mapping[str, Any]]
 56    dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
 57
 58    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 59        self._parameters = parameters
 60        if self.dynamic_streams_check_configs is None:
 61            self.dynamic_streams_check_configs = []
 62
 63    def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
 64        """Logs an error and returns a formatted error message."""
 65        error_message = f"Encountered an error while {action}. Error: {error}"
 66        logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
 67        return False, error_message
 68
 69    def check_connection(
 70        self,
 71        source: Source,
 72        logger: logging.Logger,
 73        config: Mapping[str, Any],
 74    ) -> Tuple[bool, Any]:
 75        """Checks the connection to the source and its streams."""
 76        stream_names = self._get_stream_names(config)
 77        try:
 78            streams: List[Union[Stream, AbstractStream]] = source.streams(config=config)  # type: ignore  # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
 79            if not streams:
 80                return False, f"No streams to connect to from source {source}"
 81        except Exception as error:
 82            return self._log_error(logger, "discovering streams", error)
 83
 84        stream_name_to_stream = {s.name: s for s in streams}
 85        for stream_name in stream_names:
 86            if stream_name not in stream_name_to_stream:
 87                raise ValueError(
 88                    f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
 89                )
 90
 91            stream_availability, message = self._check_stream_availability(
 92                stream_name_to_stream, stream_name, logger
 93            )
 94            if not stream_availability:
 95                return stream_availability, message
 96
 97        should_check_dynamic_streams = (
 98            hasattr(source, "resolved_manifest")
 99            and hasattr(source, "dynamic_streams")
100            and self.dynamic_streams_check_configs
101        )
102
103        if should_check_dynamic_streams:
104            return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
105
106        return True, None
107
108    def _get_stream_names(self, config: Mapping[str, Any]) -> List[str]:
109        if (
110            CHECK_STREAM_NAMES_CONFIG_KEY not in config
111            or config[CHECK_STREAM_NAMES_CONFIG_KEY] == []
112        ):
113            return self.stream_names
114        configured_stream_names = config[CHECK_STREAM_NAMES_CONFIG_KEY]
115        if not isinstance(configured_stream_names, list) or not all(
116            isinstance(stream_name, str) for stream_name in configured_stream_names
117        ):
118            raise ValueError(f"{CHECK_STREAM_NAMES_CONFIG_KEY} must be a list of strings.")
119        return configured_stream_names
120
121    def _check_stream_availability(
122        self,
123        stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
124        stream_name: str,
125        logger: logging.Logger,
126    ) -> Tuple[bool, Any]:
127        """Checks if streams are available."""
128        try:
129            stream = stream_name_to_stream[stream_name]
130            stream_is_available, reason = evaluate_availability(stream, logger)
131            if not stream_is_available:
132                message = f"Stream {stream_name} is not available: {reason}"
133                logger.warning(message)
134                return stream_is_available, message
135        except Exception as error:
136            return self._log_error(logger, f"checking availability of stream {stream_name}", error)
137        return True, None
138
139    def _check_dynamic_streams_availability(
140        self,
141        source: Source,
142        stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
143        logger: logging.Logger,
144    ) -> Tuple[bool, Any]:
145        """Checks the availability of dynamic streams."""
146        dynamic_streams = source.resolved_manifest.get("dynamic_streams", [])  # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
147        dynamic_stream_name_to_dynamic_stream = {
148            ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
149        }
150        generated_streams = self._map_generated_streams(source.dynamic_streams)  # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
151
152        for check_config in self.dynamic_streams_check_configs:  # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
153            if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
154                return (
155                    False,
156                    f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.",
157                )
158
159            generated = generated_streams.get(check_config.dynamic_stream_name, [])
160            stream_availability, message = self._check_generated_streams_availability(
161                generated, stream_name_to_stream, logger, check_config.stream_count
162            )
163            if not stream_availability:
164                return stream_availability, message
165
166        return True, None
167
168    def _map_generated_streams(
169        self, dynamic_streams: List[Dict[str, Any]]
170    ) -> Dict[str, List[Dict[str, Any]]]:
171        """Maps dynamic stream names to their corresponding generated streams."""
172        mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
173        for stream in dynamic_streams:
174            mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream)
175        return mapped_streams
176
177    def _check_generated_streams_availability(
178        self,
179        generated_streams: List[Dict[str, Any]],
180        stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]],
181        logger: logging.Logger,
182        max_count: Optional[int],
183    ) -> Tuple[bool, Any]:
184        """Checks availability of generated dynamic streams.
185
186        If `max_count` is `None`, all generated streams are checked. Otherwise, the
187        first `max_count` streams are checked (capped at the number of available streams).
188        """
189        streams_to_check = generated_streams if max_count is None else generated_streams[:max_count]
190        for declarative_stream in streams_to_check:
191            stream = stream_name_to_stream[declarative_stream["name"]]
192            try:
193                stream_is_available, reason = evaluate_availability(stream, logger)
194                if not stream_is_available:
195                    message = f"Dynamic Stream {stream.name} is not available: {reason}"
196                    logger.warning(message)
197                    return False, message
198            except Exception as error:
199                return self._log_error(
200                    logger, f"checking availability of dynamic stream {stream.name}", error
201                )
202        return True, None

Checks the connections by checking availability of one or many streams selected by the developer

Attributes:
  • stream_name (List[str]): names of streams to check
CheckStream( stream_names: List[str], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None)
stream_names: List[str]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
def check_connection( self, source: airbyte_cdk.Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
 69    def check_connection(
 70        self,
 71        source: Source,
 72        logger: logging.Logger,
 73        config: Mapping[str, Any],
 74    ) -> Tuple[bool, Any]:
 75        """Checks the connection to the source and its streams."""
 76        stream_names = self._get_stream_names(config)
 77        try:
 78            streams: List[Union[Stream, AbstractStream]] = source.streams(config=config)  # type: ignore  # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
 79            if not streams:
 80                return False, f"No streams to connect to from source {source}"
 81        except Exception as error:
 82            return self._log_error(logger, "discovering streams", error)
 83
 84        stream_name_to_stream = {s.name: s for s in streams}
 85        for stream_name in stream_names:
 86            if stream_name not in stream_name_to_stream:
 87                raise ValueError(
 88                    f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
 89                )
 90
 91            stream_availability, message = self._check_stream_availability(
 92                stream_name_to_stream, stream_name, logger
 93            )
 94            if not stream_availability:
 95                return stream_availability, message
 96
 97        should_check_dynamic_streams = (
 98            hasattr(source, "resolved_manifest")
 99            and hasattr(source, "dynamic_streams")
100            and self.dynamic_streams_check_configs
101        )
102
103        if should_check_dynamic_streams:
104            return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)
105
106        return True, None

Checks the connection to the source and its streams.

@dataclass
class CheckDynamicStream(airbyte_cdk.sources.declarative.checks.ConnectionChecker):
17@dataclass
18class CheckDynamicStream(ConnectionChecker):
19    """
20    Checks the connections by checking availability of one or many dynamic streams
21
22    Attributes:
23        stream_count (int): numbers of streams to check
24    """
25
26    # TODO: Add field stream_names to check_connection for static streams
27    #  https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483
28
29    stream_count: int
30    parameters: InitVar[Mapping[str, Any]]
31    use_check_availability: bool = True
32
33    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
34        self._parameters = parameters
35
36    def check_connection(
37        self,
38        source: Source,
39        logger: logging.Logger,
40        config: Mapping[str, Any],
41    ) -> Tuple[bool, Any]:
42        streams: List[Union[Stream, AbstractStream]] = source.streams(config=config)  # type: ignore  # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
43
44        if len(streams) == 0:
45            return False, f"No streams to connect to from source {source}"
46        if not self.use_check_availability:
47            return True, None
48
49        try:
50            for stream in streams[: min(self.stream_count, len(streams))]:
51                stream_is_available, reason = evaluate_availability(stream, logger)
52                if not stream_is_available:
53                    message = f"Stream {stream.name} is not available: {reason}"
54                    logger.warning(message)
55                    return False, message
56        except Exception as error:
57            error_message = (
58                f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
59            )
60            logger.error(error_message, exc_info=True)
61            return False, error_message
62
63        return True, None

Checks the connections by checking availability of one or many dynamic streams

Attributes:
  • stream_count (int): numbers of streams to check
CheckDynamicStream( stream_count: int, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], use_check_availability: bool = True)
stream_count: int
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
use_check_availability: bool = True
def check_connection( self, source: airbyte_cdk.Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
36    def check_connection(
37        self,
38        source: Source,
39        logger: logging.Logger,
40        config: Mapping[str, Any],
41    ) -> Tuple[bool, Any]:
42        streams: List[Union[Stream, AbstractStream]] = source.streams(config=config)  # type: ignore  # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker
43
44        if len(streams) == 0:
45            return False, f"No streams to connect to from source {source}"
46        if not self.use_check_availability:
47            return True, None
48
49        try:
50            for stream in streams[: min(self.stream_count, len(streams))]:
51                stream_is_available, reason = evaluate_availability(stream, logger)
52                if not stream_is_available:
53                    message = f"Stream {stream.name} is not available: {reason}"
54                    logger.warning(message)
55                    return False, message
56        except Exception as error:
57            error_message = (
58                f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
59            )
60            logger.error(error_message, exc_info=True)
61            return False, error_message
62
63        return True, None

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

class ConnectionChecker(abc.ABC):
13class ConnectionChecker(ABC):
14    """
15    Abstract base class for checking a connection
16    """
17
18    @abstractmethod
19    def check_connection(
20        self,
21        source: Source,
22        logger: logging.Logger,
23        config: Mapping[str, Any],
24    ) -> Tuple[bool, Any]:
25        """
26        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
27        to the Stripe API.
28
29        :param source: source
30        :param logger: source logger
31        :param config: The user-provided configuration as specified by the source's spec.
32          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
33        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
34          and we can connect to the underlying data source using the provided configuration.
35          Otherwise, the input config cannot be used to connect to the underlying data source,
36          and the "error" object should describe what went wrong.
37          The error object will be cast to string to display the problem to the user.
38        """
39        pass

Abstract base class for checking a connection

@abstractmethod
def check_connection( self, source: airbyte_cdk.Source, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Any]:
18    @abstractmethod
19    def check_connection(
20        self,
21        source: Source,
22        logger: logging.Logger,
23        config: Mapping[str, Any],
24    ) -> Tuple[bool, Any]:
25        """
26        Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect
27        to the Stripe API.
28
29        :param source: source
30        :param logger: source logger
31        :param config: The user-provided configuration as specified by the source's spec.
32          This usually contains information required to check connection e.g. tokens, secrets and keys etc.
33        :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful
34          and we can connect to the underlying data source using the provided configuration.
35          Otherwise, the input config cannot be used to connect to the underlying data source,
36          and the "error" object should describe what went wrong.
37          The error object will be cast to string to display the problem to the user.
38        """
39        pass

Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.

Parameters
  • source: source
  • logger: source logger
  • config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns

A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.

@dataclass(frozen=True)
class DynamicStreamCheckConfig:
35@dataclass(frozen=True)
36class DynamicStreamCheckConfig:
37    """Defines the configuration for dynamic stream during connection checking. This class specifies
38    what dynamic streams  in the stream template should be updated with value, supporting dynamic interpolation
39    and type enforcement."""
40
41    dynamic_stream_name: str
42    stream_count: Optional[int] = None

Defines the configuration for dynamic stream during connection checking. This class specifies what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

DynamicStreamCheckConfig(dynamic_stream_name: str, stream_count: Optional[int] = None)
dynamic_stream_name: str
stream_count: Optional[int] = None