airbyte_cdk.sources.declarative.checks
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream 10from airbyte_cdk.sources.declarative.checks.check_stream import ( 11 CheckStream, 12 DynamicStreamCheckConfig, 13) 14from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 15from airbyte_cdk.sources.declarative.models import ( 16 CheckDynamicStream as CheckDynamicStreamModel, 17) 18from airbyte_cdk.sources.declarative.models import ( 19 CheckStream as CheckStreamModel, 20) 21 22COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 23 "CheckStream": CheckStreamModel, 24 "CheckDynamicStream": CheckDynamicStreamModel, 25} 26 27__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]
26@dataclass 27class CheckStream(ConnectionChecker): 28 """ 29 Checks the connections by checking availability of one or many streams selected by the developer 30 31 Attributes: 32 stream_name (List[str]): names of streams to check 33 """ 34 35 stream_names: List[str] 36 parameters: InitVar[Mapping[str, Any]] 37 dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None 38 39 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 40 self._parameters = parameters 41 if self.dynamic_streams_check_configs is None: 42 self.dynamic_streams_check_configs = [] 43 44 def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]: 45 """Logs an error and returns a formatted error message.""" 46 error_message = f"Encountered an error while {action}. Error: {error}" 47 logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True) 48 return False, error_message 49 50 def check_connection( 51 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 52 ) -> Tuple[bool, Any]: 53 """Checks the connection to the source and its streams.""" 54 try: 55 streams = source.streams(config=config) 56 if not streams: 57 return False, f"No streams to connect to from source {source}" 58 except Exception as error: 59 return self._log_error(logger, "discovering streams", error) 60 61 stream_name_to_stream = {s.name: s for s in streams} 62 for stream_name in self.stream_names: 63 if stream_name not in stream_name_to_stream: 64 raise ValueError( 65 f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." 66 ) 67 68 stream_availability, message = self._check_stream_availability( 69 stream_name_to_stream, stream_name, logger 70 ) 71 if not stream_availability: 72 return stream_availability, message 73 74 should_check_dynamic_streams = ( 75 hasattr(source, "resolved_manifest") 76 and hasattr(source, "dynamic_streams") 77 and self.dynamic_streams_check_configs 78 ) 79 80 if should_check_dynamic_streams: 81 return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) 82 83 return True, None 84 85 def _check_stream_availability( 86 self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger 87 ) -> Tuple[bool, Any]: 88 """Checks if streams are available.""" 89 availability_strategy = HttpAvailabilityStrategy() 90 try: 91 stream = stream_name_to_stream[stream_name] 92 stream_is_available, reason = availability_strategy.check_availability(stream, logger) 93 if not stream_is_available: 94 message = f"Stream {stream_name} is not available: {reason}" 95 logger.warning(message) 96 return stream_is_available, message 97 except Exception as error: 98 return self._log_error(logger, f"checking availability of stream {stream_name}", error) 99 return True, None 100 101 def _check_dynamic_streams_availability( 102 self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger 103 ) -> Tuple[bool, Any]: 104 """Checks the availability of dynamic streams.""" 105 dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method 106 dynamic_stream_name_to_dynamic_stream = { 107 ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams) 108 } 109 generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method 110 111 for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__ 112 if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream: 113 return ( 114 False, 115 f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.", 116 ) 117 118 generated = generated_streams.get(check_config.dynamic_stream_name, []) 119 stream_availability, message = self._check_generated_streams_availability( 120 generated, stream_name_to_stream, logger, check_config.stream_count 121 ) 122 if not stream_availability: 123 return stream_availability, message 124 125 return True, None 126 127 def _map_generated_streams( 128 self, dynamic_streams: List[Dict[str, Any]] 129 ) -> Dict[str, List[Dict[str, Any]]]: 130 """Maps dynamic stream names to their corresponding generated streams.""" 131 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 132 for stream in dynamic_streams: 133 mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream) 134 return mapped_streams 135 136 def _check_generated_streams_availability( 137 self, 138 generated_streams: List[Dict[str, Any]], 139 stream_name_to_stream: Dict[str, Any], 140 logger: logging.Logger, 141 max_count: int, 142 ) -> Tuple[bool, Any]: 143 """Checks availability of generated dynamic streams.""" 144 availability_strategy = HttpAvailabilityStrategy() 145 for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: 146 stream = stream_name_to_stream[declarative_stream["name"]] 147 try: 148 stream_is_available, reason = availability_strategy.check_availability( 149 stream, logger 150 ) 151 if not stream_is_available: 152 message = f"Dynamic Stream {stream.name} is not available: {reason}" 153 logger.warning(message) 154 return False, message 155 except Exception as error: 156 return self._log_error( 157 logger, f"checking availability of dynamic stream {stream.name}", error 158 ) 159 return True, None
Checks the connections by checking availability of one or many streams selected by the developer
Attributes:
- stream_name (List[str]): names of streams to check
50 def check_connection( 51 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 52 ) -> Tuple[bool, Any]: 53 """Checks the connection to the source and its streams.""" 54 try: 55 streams = source.streams(config=config) 56 if not streams: 57 return False, f"No streams to connect to from source {source}" 58 except Exception as error: 59 return self._log_error(logger, "discovering streams", error) 60 61 stream_name_to_stream = {s.name: s for s in streams} 62 for stream_name in self.stream_names: 63 if stream_name not in stream_name_to_stream: 64 raise ValueError( 65 f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." 66 ) 67 68 stream_availability, message = self._check_stream_availability( 69 stream_name_to_stream, stream_name, logger 70 ) 71 if not stream_availability: 72 return stream_availability, message 73 74 should_check_dynamic_streams = ( 75 hasattr(source, "resolved_manifest") 76 and hasattr(source, "dynamic_streams") 77 and self.dynamic_streams_check_configs 78 ) 79 80 if should_check_dynamic_streams: 81 return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) 82 83 return True, None
Checks the connection to the source and its streams.
16@dataclass 17class CheckDynamicStream(ConnectionChecker): 18 """ 19 Checks the connections by checking availability of one or many dynamic streams 20 21 Attributes: 22 stream_count (int): numbers of streams to check 23 """ 24 25 # TODO: Add field stream_names to check_connection for static streams 26 # https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483 27 28 stream_count: int 29 parameters: InitVar[Mapping[str, Any]] 30 use_check_availability: bool = True 31 32 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 33 self._parameters = parameters 34 35 def check_connection( 36 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 37 ) -> Tuple[bool, Any]: 38 streams = source.streams(config=config) 39 40 if len(streams) == 0: 41 return False, f"No streams to connect to from source {source}" 42 if not self.use_check_availability: 43 return True, None 44 45 availability_strategy = HttpAvailabilityStrategy() 46 47 try: 48 for stream in streams[: min(self.stream_count, len(streams))]: 49 stream_is_available, reason = availability_strategy.check_availability( 50 stream, logger 51 ) 52 if not stream_is_available: 53 logger.warning(f"Stream {stream.name} is not available: {reason}") 54 return False, reason 55 except Exception as error: 56 error_message = ( 57 f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" 58 ) 59 logger.error(error_message, exc_info=True) 60 return False, error_message 61 62 return True, None
Checks the connections by checking availability of one or many dynamic streams
Attributes:
- stream_count (int): numbers of streams to check
35 def check_connection( 36 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 37 ) -> Tuple[bool, Any]: 38 streams = source.streams(config=config) 39 40 if len(streams) == 0: 41 return False, f"No streams to connect to from source {source}" 42 if not self.use_check_availability: 43 return True, None 44 45 availability_strategy = HttpAvailabilityStrategy() 46 47 try: 48 for stream in streams[: min(self.stream_count, len(streams))]: 49 stream_is_available, reason = availability_strategy.check_availability( 50 stream, logger 51 ) 52 if not stream_is_available: 53 logger.warning(f"Stream {stream.name} is not available: {reason}") 54 return False, reason 55 except Exception as error: 56 error_message = ( 57 f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" 58 ) 59 logger.error(error_message, exc_info=True) 60 return False, error_message 61 62 return True, None
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
13class ConnectionChecker(ABC): 14 """ 15 Abstract base class for checking a connection 16 """ 17 18 @abstractmethod 19 def check_connection( 20 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 21 ) -> Tuple[bool, Any]: 22 """ 23 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 24 to the Stripe API. 25 26 :param source: source 27 :param logger: source logger 28 :param config: The user-provided configuration as specified by the source's spec. 29 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 30 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 31 and we can connect to the underlying data source using the provided configuration. 32 Otherwise, the input config cannot be used to connect to the underlying data source, 33 and the "error" object should describe what went wrong. 34 The error object will be cast to string to display the problem to the user. 35 """ 36 pass
Abstract base class for checking a connection
18 @abstractmethod 19 def check_connection( 20 self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] 21 ) -> Tuple[bool, Any]: 22 """ 23 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 24 to the Stripe API. 25 26 :param source: source 27 :param logger: source logger 28 :param config: The user-provided configuration as specified by the source's spec. 29 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 30 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 31 and we can connect to the underlying data source using the provided configuration. 32 Otherwise, the input config cannot be used to connect to the underlying data source, 33 and the "error" object should describe what went wrong. 34 The error object will be cast to string to display the problem to the user. 35 """ 36 pass
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
16@dataclass(frozen=True) 17class DynamicStreamCheckConfig: 18 """Defines the configuration for dynamic stream during connection checking. This class specifies 19 what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation 20 and type enforcement.""" 21 22 dynamic_stream_name: str 23 stream_count: int = 0
Defines the configuration for dynamic stream during connection checking. This class specifies what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.