airbyte_cdk.sources.declarative.checks
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream 10from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream 11from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 12from airbyte_cdk.sources.declarative.models import ( 13 CheckDynamicStream as CheckDynamicStreamModel, 14) 15from airbyte_cdk.sources.declarative.models import ( 16 CheckStream as CheckStreamModel, 17) 18 19COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 20 "CheckStream": CheckStreamModel, 21 "CheckDynamicStream": CheckDynamicStreamModel, 22} 23 24__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
16@dataclass 17class CheckStream(ConnectionChecker): 18 """ 19 Checks the connections by checking availability of one or many streams selected by the developer 20 21 Attributes: 22 stream_name (List[str]): names of streams to check 23 """ 24 25 stream_names: List[str] 26 parameters: InitVar[Mapping[str, Any]] 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 self._parameters = parameters 30 31 def check_connection( 32 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 33 ) -> Tuple[bool, Any]: 34 streams = source.streams(config=config) 35 stream_name_to_stream = {s.name: s for s in streams} 36 if len(streams) == 0: 37 return False, f"No streams to connect to from source {source}" 38 for stream_name in self.stream_names: 39 if stream_name not in stream_name_to_stream.keys(): 40 raise ValueError( 41 f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}." 42 ) 43 44 stream = stream_name_to_stream[stream_name] 45 availability_strategy = HttpAvailabilityStrategy() 46 try: 47 stream_is_available, reason = availability_strategy.check_availability( 48 stream, logger 49 ) 50 if not stream_is_available: 51 return False, reason 52 except Exception as error: 53 logger.error( 54 f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}" 55 ) 56 return False, f"Unable to connect to stream {stream_name} - {error}" 57 return True, None
Checks the connections by checking availability of one or many streams selected by the developer
Attributes:
- stream_name (List[str]): names of streams to check
31 def check_connection( 32 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 33 ) -> Tuple[bool, Any]: 34 streams = source.streams(config=config) 35 stream_name_to_stream = {s.name: s for s in streams} 36 if len(streams) == 0: 37 return False, f"No streams to connect to from source {source}" 38 for stream_name in self.stream_names: 39 if stream_name not in stream_name_to_stream.keys(): 40 raise ValueError( 41 f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}." 42 ) 43 44 stream = stream_name_to_stream[stream_name] 45 availability_strategy = HttpAvailabilityStrategy() 46 try: 47 stream_is_available, reason = availability_strategy.check_availability( 48 stream, logger 49 ) 50 if not stream_is_available: 51 return False, reason 52 except Exception as error: 53 logger.error( 54 f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}" 55 ) 56 return False, f"Unable to connect to stream {stream_name} - {error}" 57 return True, None
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
16@dataclass 17class CheckDynamicStream(ConnectionChecker): 18 """ 19 Checks the connections by checking availability of one or many dynamic streams 20 21 Attributes: 22 stream_count (int): numbers of streams to check 23 """ 24 25 # TODO: Add field stream_names to check_connection for static streams 26 # https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483 27 28 stream_count: int 29 parameters: InitVar[Mapping[str, Any]] 30 use_check_availability: bool = True 31 32 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 33 self._parameters = parameters 34 35 def check_connection( 36 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 37 ) -> Tuple[bool, Any]: 38 streams = source.streams(config=config) 39 40 if len(streams) == 0: 41 return False, f"No streams to connect to from source {source}" 42 if not self.use_check_availability: 43 return True, None 44 45 availability_strategy = HttpAvailabilityStrategy() 46 47 try: 48 for stream in streams[: min(self.stream_count, len(streams))]: 49 stream_is_available, reason = availability_strategy.check_availability( 50 stream, logger 51 ) 52 if not stream_is_available: 53 logger.warning(f"Stream {stream.name} is not available: {reason}") 54 return False, reason 55 except Exception as error: 56 error_message = ( 57 f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" 58 ) 59 logger.error(error_message, exc_info=True) 60 return False, error_message 61 62 return True, None
Checks the connections by checking availability of one or many dynamic streams
Attributes:
- stream_count (int): numbers of streams to check
35 def check_connection( 36 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 37 ) -> Tuple[bool, Any]: 38 streams = source.streams(config=config) 39 40 if len(streams) == 0: 41 return False, f"No streams to connect to from source {source}" 42 if not self.use_check_availability: 43 return True, None 44 45 availability_strategy = HttpAvailabilityStrategy() 46 47 try: 48 for stream in streams[: min(self.stream_count, len(streams))]: 49 stream_is_available, reason = availability_strategy.check_availability( 50 stream, logger 51 ) 52 if not stream_is_available: 53 logger.warning(f"Stream {stream.name} is not available: {reason}") 54 return False, reason 55 except Exception as error: 56 error_message = ( 57 f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" 58 ) 59 logger.error(error_message, exc_info=True) 60 return False, error_message 61 62 return True, None
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
13class ConnectionChecker(ABC): 14 """ 15 Abstract base class for checking a connection 16 """ 17 18 @abstractmethod 19 def check_connection( 20 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 21 ) -> Tuple[bool, Any]: 22 """ 23 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 24 to the Stripe API. 25 26 :param source: source 27 :param logger: source logger 28 :param config: The user-provided configuration as specified by the source's spec. 29 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 30 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 31 and we can connect to the underlying data source using the provided configuration. 32 Otherwise, the input config cannot be used to connect to the underlying data source, 33 and the "error" object should describe what went wrong. 34 The error object will be cast to string to display the problem to the user. 35 """ 36 pass
Abstract base class for checking a connection
18 @abstractmethod 19 def check_connection( 20 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 21 ) -> Tuple[bool, Any]: 22 """ 23 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 24 to the Stripe API. 25 26 :param source: source 27 :param logger: source logger 28 :param config: The user-provided configuration as specified by the source's spec. 29 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 30 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 31 and we can connect to the underlying data source using the provided configuration. 32 Otherwise, the input config cannot be used to connect to the underlying data source, 33 and the "error" object should describe what went wrong. 34 The error object will be cast to string to display the problem to the user. 35 """ 36 pass
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.