airbyte_cdk.sources.declarative.checks
1# 2# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream 10from airbyte_cdk.sources.declarative.checks.check_stream import ( 11 CheckStream, 12 DynamicStreamCheckConfig, 13) 14from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker 15from airbyte_cdk.sources.declarative.models import ( 16 CheckDynamicStream as CheckDynamicStreamModel, 17) 18from airbyte_cdk.sources.declarative.models import ( 19 CheckStream as CheckStreamModel, 20) 21 22COMPONENTS_CHECKER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 23 "CheckStream": CheckStreamModel, 24 "CheckDynamicStream": CheckDynamicStreamModel, 25} 26 27__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]
45@dataclass 46class CheckStream(ConnectionChecker): 47 """ 48 Checks the connections by checking availability of one or many streams selected by the developer 49 50 Attributes: 51 stream_name (List[str]): names of streams to check 52 """ 53 54 stream_names: List[str] 55 parameters: InitVar[Mapping[str, Any]] 56 dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None 57 58 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 59 self._parameters = parameters 60 if self.dynamic_streams_check_configs is None: 61 self.dynamic_streams_check_configs = [] 62 63 def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]: 64 """Logs an error and returns a formatted error message.""" 65 error_message = f"Encountered an error while {action}. Error: {error}" 66 logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True) 67 return False, error_message 68 69 def check_connection( 70 self, 71 source: Source, 72 logger: logging.Logger, 73 config: Mapping[str, Any], 74 ) -> Tuple[bool, Any]: 75 """Checks the connection to the source and its streams.""" 76 stream_names = self._get_stream_names(config) 77 try: 78 streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker 79 if not streams: 80 return False, f"No streams to connect to from source {source}" 81 except Exception as error: 82 return self._log_error(logger, "discovering streams", error) 83 84 stream_name_to_stream = {s.name: s for s in streams} 85 for stream_name in stream_names: 86 if stream_name not in stream_name_to_stream: 87 raise ValueError( 88 f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." 89 ) 90 91 stream_availability, message = self._check_stream_availability( 92 stream_name_to_stream, stream_name, logger 93 ) 94 if not stream_availability: 95 return stream_availability, message 96 97 should_check_dynamic_streams = ( 98 hasattr(source, "resolved_manifest") 99 and hasattr(source, "dynamic_streams") 100 and self.dynamic_streams_check_configs 101 ) 102 103 if should_check_dynamic_streams: 104 return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) 105 106 return True, None 107 108 def _get_stream_names(self, config: Mapping[str, Any]) -> List[str]: 109 if ( 110 CHECK_STREAM_NAMES_CONFIG_KEY not in config 111 or config[CHECK_STREAM_NAMES_CONFIG_KEY] == [] 112 ): 113 return self.stream_names 114 configured_stream_names = config[CHECK_STREAM_NAMES_CONFIG_KEY] 115 if not isinstance(configured_stream_names, list) or not all( 116 isinstance(stream_name, str) for stream_name in configured_stream_names 117 ): 118 raise ValueError(f"{CHECK_STREAM_NAMES_CONFIG_KEY} must be a list of strings.") 119 return configured_stream_names 120 121 def _check_stream_availability( 122 self, 123 stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], 124 stream_name: str, 125 logger: logging.Logger, 126 ) -> Tuple[bool, Any]: 127 """Checks if streams are available.""" 128 try: 129 stream = stream_name_to_stream[stream_name] 130 stream_is_available, reason = evaluate_availability(stream, logger) 131 if not stream_is_available: 132 message = f"Stream {stream_name} is not available: {reason}" 133 logger.warning(message) 134 return stream_is_available, message 135 except Exception as error: 136 return self._log_error(logger, f"checking availability of stream {stream_name}", error) 137 return True, None 138 139 def _check_dynamic_streams_availability( 140 self, 141 source: Source, 142 stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], 143 logger: logging.Logger, 144 ) -> Tuple[bool, Any]: 145 """Checks the availability of dynamic streams.""" 146 dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method 147 dynamic_stream_name_to_dynamic_stream = { 148 ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams) 149 } 150 generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method 151 152 for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__ 153 if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream: 154 return ( 155 False, 156 f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.", 157 ) 158 159 generated = generated_streams.get(check_config.dynamic_stream_name, []) 160 stream_availability, message = self._check_generated_streams_availability( 161 generated, stream_name_to_stream, logger, check_config.stream_count 162 ) 163 if not stream_availability: 164 return stream_availability, message 165 166 return True, None 167 168 def _map_generated_streams( 169 self, dynamic_streams: List[Dict[str, Any]] 170 ) -> Dict[str, List[Dict[str, Any]]]: 171 """Maps dynamic stream names to their corresponding generated streams.""" 172 mapped_streams: Dict[str, List[Dict[str, Any]]] = {} 173 for stream in dynamic_streams: 174 mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream) 175 return mapped_streams 176 177 def _check_generated_streams_availability( 178 self, 179 generated_streams: List[Dict[str, Any]], 180 stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], 181 logger: logging.Logger, 182 max_count: Optional[int], 183 ) -> Tuple[bool, Any]: 184 """Checks availability of generated dynamic streams. 185 186 If `max_count` is `None`, all generated streams are checked. Otherwise, the 187 first `max_count` streams are checked (capped at the number of available streams). 188 """ 189 streams_to_check = generated_streams if max_count is None else generated_streams[:max_count] 190 for declarative_stream in streams_to_check: 191 stream = stream_name_to_stream[declarative_stream["name"]] 192 try: 193 stream_is_available, reason = evaluate_availability(stream, logger) 194 if not stream_is_available: 195 message = f"Dynamic Stream {stream.name} is not available: {reason}" 196 logger.warning(message) 197 return False, message 198 except Exception as error: 199 return self._log_error( 200 logger, f"checking availability of dynamic stream {stream.name}", error 201 ) 202 return True, None
Checks the connections by checking availability of one or many streams selected by the developer
Attributes:
- stream_name (List[str]): names of streams to check
69 def check_connection( 70 self, 71 source: Source, 72 logger: logging.Logger, 73 config: Mapping[str, Any], 74 ) -> Tuple[bool, Any]: 75 """Checks the connection to the source and its streams.""" 76 stream_names = self._get_stream_names(config) 77 try: 78 streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker 79 if not streams: 80 return False, f"No streams to connect to from source {source}" 81 except Exception as error: 82 return self._log_error(logger, "discovering streams", error) 83 84 stream_name_to_stream = {s.name: s for s in streams} 85 for stream_name in stream_names: 86 if stream_name not in stream_name_to_stream: 87 raise ValueError( 88 f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." 89 ) 90 91 stream_availability, message = self._check_stream_availability( 92 stream_name_to_stream, stream_name, logger 93 ) 94 if not stream_availability: 95 return stream_availability, message 96 97 should_check_dynamic_streams = ( 98 hasattr(source, "resolved_manifest") 99 and hasattr(source, "dynamic_streams") 100 and self.dynamic_streams_check_configs 101 ) 102 103 if should_check_dynamic_streams: 104 return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) 105 106 return True, None
Checks the connection to the source and its streams.
17@dataclass 18class CheckDynamicStream(ConnectionChecker): 19 """ 20 Checks the connections by checking availability of one or many dynamic streams 21 22 Attributes: 23 stream_count (int): numbers of streams to check 24 """ 25 26 # TODO: Add field stream_names to check_connection for static streams 27 # https://github.com/airbytehq/airbyte-python-cdk/pull/293#discussion_r1934933483 28 29 stream_count: int 30 parameters: InitVar[Mapping[str, Any]] 31 use_check_availability: bool = True 32 33 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 34 self._parameters = parameters 35 36 def check_connection( 37 self, 38 source: Source, 39 logger: logging.Logger, 40 config: Mapping[str, Any], 41 ) -> Tuple[bool, Any]: 42 streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker 43 44 if len(streams) == 0: 45 return False, f"No streams to connect to from source {source}" 46 if not self.use_check_availability: 47 return True, None 48 49 try: 50 for stream in streams[: min(self.stream_count, len(streams))]: 51 stream_is_available, reason = evaluate_availability(stream, logger) 52 if not stream_is_available: 53 message = f"Stream {stream.name} is not available: {reason}" 54 logger.warning(message) 55 return False, message 56 except Exception as error: 57 error_message = ( 58 f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" 59 ) 60 logger.error(error_message, exc_info=True) 61 return False, error_message 62 63 return True, None
Checks the connections by checking availability of one or many dynamic streams
Attributes:
- stream_count (int): numbers of streams to check
36 def check_connection( 37 self, 38 source: Source, 39 logger: logging.Logger, 40 config: Mapping[str, Any], 41 ) -> Tuple[bool, Any]: 42 streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker 43 44 if len(streams) == 0: 45 return False, f"No streams to connect to from source {source}" 46 if not self.use_check_availability: 47 return True, None 48 49 try: 50 for stream in streams[: min(self.stream_count, len(streams))]: 51 stream_is_available, reason = evaluate_availability(stream, logger) 52 if not stream_is_available: 53 message = f"Stream {stream.name} is not available: {reason}" 54 logger.warning(message) 55 return False, message 56 except Exception as error: 57 error_message = ( 58 f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" 59 ) 60 logger.error(error_message, exc_info=True) 61 return False, error_message 62 63 return True, None
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
13class ConnectionChecker(ABC): 14 """ 15 Abstract base class for checking a connection 16 """ 17 18 @abstractmethod 19 def check_connection( 20 self, 21 source: Source, 22 logger: logging.Logger, 23 config: Mapping[str, Any], 24 ) -> Tuple[bool, Any]: 25 """ 26 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 27 to the Stripe API. 28 29 :param source: source 30 :param logger: source logger 31 :param config: The user-provided configuration as specified by the source's spec. 32 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 33 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 34 and we can connect to the underlying data source using the provided configuration. 35 Otherwise, the input config cannot be used to connect to the underlying data source, 36 and the "error" object should describe what went wrong. 37 The error object will be cast to string to display the problem to the user. 38 """ 39 pass
Abstract base class for checking a connection
18 @abstractmethod 19 def check_connection( 20 self, 21 source: Source, 22 logger: logging.Logger, 23 config: Mapping[str, Any], 24 ) -> Tuple[bool, Any]: 25 """ 26 Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect 27 to the Stripe API. 28 29 :param source: source 30 :param logger: source logger 31 :param config: The user-provided configuration as specified by the source's spec. 32 This usually contains information required to check connection e.g. tokens, secrets and keys etc. 33 :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful 34 and we can connect to the underlying data source using the provided configuration. 35 Otherwise, the input config cannot be used to connect to the underlying data source, 36 and the "error" object should describe what went wrong. 37 The error object will be cast to string to display the problem to the user. 38 """ 39 pass
Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect to the Stripe API.
Parameters
- source: source
- logger: source logger
- config: The user-provided configuration as specified by the source's spec. This usually contains information required to check connection e.g. tokens, secrets and keys etc.
Returns
A tuple of (boolean, error). If boolean is true, then the connection check is successful and we can connect to the underlying data source using the provided configuration. Otherwise, the input config cannot be used to connect to the underlying data source, and the "error" object should describe what went wrong. The error object will be cast to string to display the problem to the user.
35@dataclass(frozen=True) 36class DynamicStreamCheckConfig: 37 """Defines the configuration for dynamic stream during connection checking. This class specifies 38 what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation 39 and type enforcement.""" 40 41 dynamic_stream_name: str 42 stream_count: Optional[int] = None
Defines the configuration for dynamic stream during connection checking. This class specifies what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.