airbyte_ops_mcp.regression_tests
Live tests module for running connector validation and regression tests.
This module provides tools for testing Airbyte connectors against live data without using Dagger. It uses Docker SDK directly for container orchestration.
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Live tests module for running connector validation and regression tests. 3 4This module provides tools for testing Airbyte connectors against live data 5without using Dagger. It uses Docker SDK directly for container orchestration. 6""" 7 8from airbyte_ops_mcp.regression_tests.connection_fetcher import ( 9 ConnectionData, 10 fetch_connection_data, 11) 12from airbyte_ops_mcp.regression_tests.connection_secret_retriever import ( 13 SecretRetrievalError, 14 enrich_config_with_secrets, 15 is_secret_retriever_enabled, 16 retrieve_unmasked_config, 17 should_use_secret_retriever, 18) 19from airbyte_ops_mcp.regression_tests.models import ( 20 Command, 21 ConnectorUnderTest, 22 ExecutionResult, 23 TargetOrControl, 24) 25 26__all__ = [ 27 "Command", 28 "ConnectionData", 29 "ConnectorUnderTest", 30 "ExecutionResult", 31 "SecretRetrievalError", 32 "TargetOrControl", 33 "enrich_config_with_secrets", 34 "fetch_connection_data", 35 "is_secret_retriever_enabled", 36 "retrieve_unmasked_config", 37 "should_use_secret_retriever", 38]
25class Command(Enum): 26 """Airbyte connector commands.""" 27 28 CHECK = "check" 29 DISCOVER = "discover" 30 READ = "read" 31 READ_WITH_STATE = "read-with-state" 32 SPEC = "spec" 33 34 def needs_config(self) -> bool: 35 return self in { 36 Command.CHECK, 37 Command.DISCOVER, 38 Command.READ, 39 Command.READ_WITH_STATE, 40 } 41 42 def needs_catalog(self) -> bool: 43 return self in {Command.READ, Command.READ_WITH_STATE} 44 45 def needs_state(self) -> bool: 46 return self in {Command.READ_WITH_STATE}
Airbyte connector commands.
23@dataclass 24class ConnectionData: 25 """Data fetched from an Airbyte Cloud connection.""" 26 27 connection_id: str 28 source_id: str 29 source_name: str 30 source_definition_id: str 31 config: dict[str, Any] 32 catalog: dict[str, Any] 33 stream_names: list[str] 34 workspace_id: str | None = None 35 docker_repository: str | None = None 36 docker_image_tag: str | None = None 37 state: list[dict[str, Any]] | None = field(default=None) 38 39 @property 40 def connector_image(self) -> str | None: 41 """Get the full connector image name with tag.""" 42 if self.docker_repository and self.docker_image_tag: 43 return f"{self.docker_repository}:{self.docker_image_tag}" 44 return None
Data fetched from an Airbyte Cloud connection.
39 @property 40 def connector_image(self) -> str | None: 41 """Get the full connector image name with tag.""" 42 if self.docker_repository and self.docker_image_tag: 43 return f"{self.docker_repository}:{self.docker_image_tag}" 44 return None
Get the full connector image name with tag.
63@dataclass 64class ConnectorUnderTest: 65 """Represents a connector being tested. 66 67 In validation tests, there would be one connector under test. 68 When running regression tests, there would be two connectors under test: 69 the target and the control versions of the same connector. 70 """ 71 72 image_name: str 73 target_or_control: TargetOrControl 74 75 @property 76 def name(self) -> str: 77 """Get connector name without registry prefix.""" 78 return self.image_name.replace("airbyte/", "").split(":")[0] 79 80 @property 81 def name_without_type_prefix(self) -> str: 82 """Get connector name without actor type prefix.""" 83 return self.name.replace(f"{self.actor_type.value}-", "") 84 85 @property 86 def version(self) -> str: 87 """Get connector version from image tag.""" 88 return self.image_name.replace("airbyte/", "").split(":")[1] 89 90 @property 91 def actor_type(self) -> ActorType: 92 """Infer actor type from image name.""" 93 if "airbyte/destination-" in self.image_name: 94 return ActorType.DESTINATION 95 elif "airbyte/source-" in self.image_name: 96 return ActorType.SOURCE 97 else: 98 raise ValueError( 99 f"Can't infer the actor type. Connector image name {self.image_name} " 100 "does not contain 'airbyte/source' or 'airbyte/destination'" 101 ) 102 103 @classmethod 104 def from_image_name( 105 cls, 106 image_name: str, 107 target_or_control: TargetOrControl, 108 ) -> ConnectorUnderTest: 109 """Create a ConnectorUnderTest from an image name.""" 110 return cls(image_name, target_or_control)
Represents a connector being tested.
In validation tests, there would be one connector under test. When running regression tests, there would be two connectors under test: the target and the control versions of the same connector.
75 @property 76 def name(self) -> str: 77 """Get connector name without registry prefix.""" 78 return self.image_name.replace("airbyte/", "").split(":")[0]
Get connector name without registry prefix.
80 @property 81 def name_without_type_prefix(self) -> str: 82 """Get connector name without actor type prefix.""" 83 return self.name.replace(f"{self.actor_type.value}-", "")
Get connector name without actor type prefix.
85 @property 86 def version(self) -> str: 87 """Get connector version from image tag.""" 88 return self.image_name.replace("airbyte/", "").split(":")[1]
Get connector version from image tag.
90 @property 91 def actor_type(self) -> ActorType: 92 """Infer actor type from image name.""" 93 if "airbyte/destination-" in self.image_name: 94 return ActorType.DESTINATION 95 elif "airbyte/source-" in self.image_name: 96 return ActorType.SOURCE 97 else: 98 raise ValueError( 99 f"Can't infer the actor type. Connector image name {self.image_name} " 100 "does not contain 'airbyte/source' or 'airbyte/destination'" 101 )
Infer actor type from image name.
103 @classmethod 104 def from_image_name( 105 cls, 106 image_name: str, 107 target_or_control: TargetOrControl, 108 ) -> ConnectorUnderTest: 109 """Create a ConnectorUnderTest from an image name.""" 110 return cls(image_name, target_or_control)
Create a ConnectorUnderTest from an image name.
135@dataclass 136class ExecutionResult: 137 """Result of executing a connector command.""" 138 139 connector_under_test: ConnectorUnderTest 140 command: Command 141 stdout_file_path: Path 142 stderr_file_path: Path 143 success: bool 144 exit_code: int 145 configured_catalog: ConfiguredAirbyteCatalog | None = None 146 config: dict[str, Any] | None = None 147 _airbyte_messages: list[AirbyteMessage] = field(default_factory=list) 148 _messages_loaded: bool = field(default=False, repr=False) 149 150 @property 151 def logger(self) -> logging.Logger: 152 return logging.getLogger( 153 f"{self.connector_under_test.target_or_control.value}-{self.command.value}" 154 ) 155 156 @cached_property 157 def airbyte_messages(self) -> list[AirbyteMessage]: 158 """Parse and return all Airbyte messages from stdout.""" 159 if self._messages_loaded: 160 return self._airbyte_messages 161 162 messages = [] 163 for line in self.stdout_file_path.read_text().splitlines(): 164 line = line.strip() 165 if not line: 166 continue 167 with contextlib.suppress(ValidationError): 168 messages.append(AirbyteMessage.parse_raw(line)) 169 self._airbyte_messages = messages 170 self._messages_loaded = True 171 return messages 172 173 @property 174 def configured_streams(self) -> list[str]: 175 """Get list of configured stream names.""" 176 if not self.configured_catalog: 177 return [] 178 return [stream.stream.name for stream in self.configured_catalog.streams] 179 180 def get_records(self) -> Iterator[AirbyteMessage]: 181 """Iterate over record messages.""" 182 for message in self.airbyte_messages: 183 if message.type is AirbyteMessageType.RECORD: 184 yield message 185 186 def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]: 187 """Get records for a specific stream.""" 188 for message in self.get_records(): 189 if message.record.stream == stream: 190 yield message 191 192 def get_states(self) -> Iterator[AirbyteMessage]: 193 """Iterate over state messages.""" 194 for message in self.airbyte_messages: 195 if message.type is AirbyteMessageType.STATE: 196 yield message 197 198 def get_message_count_per_type(self) -> dict[AirbyteMessageType, int]: 199 """Count messages by type.""" 200 counts: dict[AirbyteMessageType, int] = defaultdict(int) 201 for message in self.airbyte_messages: 202 counts[message.type] += 1 203 return dict(counts) 204 205 def get_record_count_per_stream(self) -> dict[str, int]: 206 """Count records by stream name. 207 208 Returns: 209 Dictionary mapping stream names to record counts. 210 """ 211 counts: dict[str, int] = defaultdict(int) 212 for message in self.get_records(): 213 counts[message.record.stream] += 1 214 return dict(counts) 215 216 def get_catalog(self) -> AirbyteCatalog | None: 217 """Get discovered catalog from messages.""" 218 for message in self.airbyte_messages: 219 if message.type is AirbyteMessageType.CATALOG: 220 return message.catalog 221 return None 222 223 def get_spec(self) -> Any | None: 224 """Get connector spec from messages.""" 225 for message in self.airbyte_messages: 226 if message.type is AirbyteMessageType.SPEC: 227 return message.spec 228 return None 229 230 def get_connection_status(self) -> Any | None: 231 """Get connection status from check command.""" 232 for message in self.airbyte_messages: 233 if message.type is AirbyteMessageType.CONNECTION_STATUS: 234 return message.connectionStatus 235 return None 236 237 def is_check_successful(self) -> bool: 238 """Check if the check command was successful.""" 239 status = self.get_connection_status() 240 if status is None: 241 return False 242 return status.status.value == "SUCCEEDED" 243 244 def save_artifacts(self, output_dir: Path) -> None: 245 """Save execution artifacts to the output directory.""" 246 output_dir.mkdir(parents=True, exist_ok=True) 247 248 airbyte_messages_dir = output_dir / "airbyte_messages" 249 airbyte_messages_dir.mkdir(parents=True, exist_ok=True) 250 251 messages_by_type: dict[str, list[str]] = defaultdict(list) 252 for message in self.airbyte_messages: 253 type_name = message.type.value.lower() 254 messages_by_type[type_name].append(message.model_dump_json()) 255 256 for type_name, messages in messages_by_type.items(): 257 file_path = airbyte_messages_dir / f"{type_name}.jsonl" 258 file_path.write_text("\n".join(messages)) 259 260 # Save configured catalog (input) if available 261 if self.configured_catalog is not None: 262 catalog_path = output_dir / "configured_catalog.json" 263 catalog_path.write_text(self.configured_catalog.model_dump_json(indent=2)) 264 self.logger.info(f"Saved configured catalog to {catalog_path}") 265 266 self.logger.info(f"Artifacts saved to {output_dir}")
Result of executing a connector command.
156 @cached_property 157 def airbyte_messages(self) -> list[AirbyteMessage]: 158 """Parse and return all Airbyte messages from stdout.""" 159 if self._messages_loaded: 160 return self._airbyte_messages 161 162 messages = [] 163 for line in self.stdout_file_path.read_text().splitlines(): 164 line = line.strip() 165 if not line: 166 continue 167 with contextlib.suppress(ValidationError): 168 messages.append(AirbyteMessage.parse_raw(line)) 169 self._airbyte_messages = messages 170 self._messages_loaded = True 171 return messages
Parse and return all Airbyte messages from stdout.
173 @property 174 def configured_streams(self) -> list[str]: 175 """Get list of configured stream names.""" 176 if not self.configured_catalog: 177 return [] 178 return [stream.stream.name for stream in self.configured_catalog.streams]
Get list of configured stream names.
180 def get_records(self) -> Iterator[AirbyteMessage]: 181 """Iterate over record messages.""" 182 for message in self.airbyte_messages: 183 if message.type is AirbyteMessageType.RECORD: 184 yield message
Iterate over record messages.
186 def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]: 187 """Get records for a specific stream.""" 188 for message in self.get_records(): 189 if message.record.stream == stream: 190 yield message
Get records for a specific stream.
192 def get_states(self) -> Iterator[AirbyteMessage]: 193 """Iterate over state messages.""" 194 for message in self.airbyte_messages: 195 if message.type is AirbyteMessageType.STATE: 196 yield message
Iterate over state messages.
198 def get_message_count_per_type(self) -> dict[AirbyteMessageType, int]: 199 """Count messages by type.""" 200 counts: dict[AirbyteMessageType, int] = defaultdict(int) 201 for message in self.airbyte_messages: 202 counts[message.type] += 1 203 return dict(counts)
Count messages by type.
205 def get_record_count_per_stream(self) -> dict[str, int]: 206 """Count records by stream name. 207 208 Returns: 209 Dictionary mapping stream names to record counts. 210 """ 211 counts: dict[str, int] = defaultdict(int) 212 for message in self.get_records(): 213 counts[message.record.stream] += 1 214 return dict(counts)
Count records by stream name.
Returns:
Dictionary mapping stream names to record counts.
216 def get_catalog(self) -> AirbyteCatalog | None: 217 """Get discovered catalog from messages.""" 218 for message in self.airbyte_messages: 219 if message.type is AirbyteMessageType.CATALOG: 220 return message.catalog 221 return None
Get discovered catalog from messages.
223 def get_spec(self) -> Any | None: 224 """Get connector spec from messages.""" 225 for message in self.airbyte_messages: 226 if message.type is AirbyteMessageType.SPEC: 227 return message.spec 228 return None
Get connector spec from messages.
230 def get_connection_status(self) -> Any | None: 231 """Get connection status from check command.""" 232 for message in self.airbyte_messages: 233 if message.type is AirbyteMessageType.CONNECTION_STATUS: 234 return message.connectionStatus 235 return None
Get connection status from check command.
237 def is_check_successful(self) -> bool: 238 """Check if the check command was successful.""" 239 status = self.get_connection_status() 240 if status is None: 241 return False 242 return status.status.value == "SUCCEEDED"
Check if the check command was successful.
244 def save_artifacts(self, output_dir: Path) -> None: 245 """Save execution artifacts to the output directory.""" 246 output_dir.mkdir(parents=True, exist_ok=True) 247 248 airbyte_messages_dir = output_dir / "airbyte_messages" 249 airbyte_messages_dir.mkdir(parents=True, exist_ok=True) 250 251 messages_by_type: dict[str, list[str]] = defaultdict(list) 252 for message in self.airbyte_messages: 253 type_name = message.type.value.lower() 254 messages_by_type[type_name].append(message.model_dump_json()) 255 256 for type_name, messages in messages_by_type.items(): 257 file_path = airbyte_messages_dir / f"{type_name}.jsonl" 258 file_path.write_text("\n".join(messages)) 259 260 # Save configured catalog (input) if available 261 if self.configured_catalog is not None: 262 catalog_path = output_dir / "configured_catalog.json" 263 catalog_path.write_text(self.configured_catalog.model_dump_json(indent=2)) 264 self.logger.info(f"Saved configured catalog to {catalog_path}") 265 266 self.logger.info(f"Artifacts saved to {output_dir}")
Save execution artifacts to the output directory.
111class SecretRetrievalError(Exception): 112 """Raised when secret retrieval fails. 113 114 This exception is raised when USE_CONNECTION_SECRET_RETRIEVER is enabled 115 but secrets cannot be retrieved (e.g., EU data residency restrictions). 116 """
Raised when secret retrieval fails.
This exception is raised when USE_CONNECTION_SECRET_RETRIEVER is enabled but secrets cannot be retrieved (e.g., EU data residency restrictions).
49class TargetOrControl(Enum): 50 """Indicates whether a connector is the target (new) or control (baseline) version.""" 51 52 TARGET = "target" 53 CONTROL = "control"
Indicates whether a connector is the target (new) or control (baseline) version.
119def enrich_config_with_secrets( 120 connection_data: ConnectionData, 121 retrieval_reason: str = "MCP live tests", 122 raise_on_failure: bool = True, 123) -> ConnectionData: 124 """Enrich connection data with unmasked secrets from internal retriever. 125 126 This function takes a ConnectionData object (typically from the public 127 Cloud API with masked secrets) and replaces the config with unmasked 128 secrets from the internal connection-retriever. 129 130 Args: 131 connection_data: The connection data to enrich. 132 retrieval_reason: Reason for retrieval (for audit logging). 133 raise_on_failure: If True (default), raise SecretRetrievalError when 134 secrets cannot be retrieved. If False, return the original 135 connection_data with masked secrets (legacy behavior). 136 137 Returns: 138 A new ConnectionData with unmasked config. 139 140 Raises: 141 SecretRetrievalError: If raise_on_failure is True and secrets cannot 142 be retrieved (e.g., due to EU data residency restrictions). 143 """ 144 unmasked_config = retrieve_unmasked_config( 145 connection_id=connection_data.connection_id, 146 retrieval_reason=retrieval_reason, 147 ) 148 149 if unmasked_config is None: 150 error_msg = ( 151 "Could not retrieve unmasked secrets for connection " 152 f"{connection_data.connection_id}. This may be due to EU data " 153 "residency restrictions or database connectivity issues. " 154 "The connection's credentials cannot be used for regression testing." 155 ) 156 logger.warning(error_msg) 157 if raise_on_failure: 158 raise SecretRetrievalError(error_msg) 159 return connection_data 160 161 logger.info( 162 f"Successfully enriched config with unmasked secrets for " 163 f"{connection_data.connection_id}" 164 ) 165 166 # Return a new ConnectionData with the unmasked config 167 return replace(connection_data, config=unmasked_config)
Enrich connection data with unmasked secrets from internal retriever.
This function takes a ConnectionData object (typically from the public Cloud API with masked secrets) and replaces the config with unmasked secrets from the internal connection-retriever.
Arguments:
- connection_data: The connection data to enrich.
- retrieval_reason: Reason for retrieval (for audit logging).
- raise_on_failure: If True (default), raise SecretRetrievalError when secrets cannot be retrieved. If False, return the original connection_data with masked secrets (legacy behavior).
Returns:
A new ConnectionData with unmasked config.
Raises:
- SecretRetrievalError: If raise_on_failure is True and secrets cannot be retrieved (e.g., due to EU data residency restrictions).
71def fetch_connection_data( 72 connection_id: str, 73 client_id: str | None = None, 74 client_secret: str | None = None, 75) -> ConnectionData: 76 """Fetch connection configuration and catalog from Airbyte Cloud. 77 78 Args: 79 connection_id: The connection ID to fetch data for. 80 client_id: Airbyte Cloud client ID (defaults to env var). 81 client_secret: Airbyte Cloud client secret (defaults to env var). 82 83 Returns: 84 ConnectionData with config and catalog. 85 86 Raises: 87 PyAirbyteInputError: If the API request fails. 88 """ 89 client_id = client_id or os.getenv("AIRBYTE_CLOUD_CLIENT_ID") 90 client_secret = client_secret or os.getenv("AIRBYTE_CLOUD_CLIENT_SECRET") 91 92 if not client_id or not client_secret: 93 raise PyAirbyteInputError( 94 message="Missing Airbyte Cloud credentials", 95 context={ 96 "hint": "Set AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET env vars" 97 }, 98 ) 99 100 access_token = _get_access_token(client_id, client_secret) 101 public_api_root = constants.CLOUD_API_ROOT 102 headers = { 103 "Authorization": f"Bearer {access_token}", 104 "Content-Type": "application/json", 105 } 106 107 # Get connection details 108 conn_response = requests.get( 109 f"{public_api_root}/connections/{connection_id}", 110 headers=headers, 111 timeout=30, 112 ) 113 114 if conn_response.status_code != 200: 115 raise PyAirbyteInputError( 116 message=f"Failed to get connection: {conn_response.status_code}", 117 context={"connection_id": connection_id, "response": conn_response.text}, 118 ) 119 120 conn_data = conn_response.json() 121 source_id = conn_data["sourceId"] 122 123 # Get source details (includes config) 124 source_response = requests.get( 125 f"{public_api_root}/sources/{source_id}", 126 headers=headers, 127 timeout=30, 128 ) 129 130 if source_response.status_code != 200: 131 raise PyAirbyteInputError( 132 message=f"Failed to get source: {source_response.status_code}", 133 context={"source_id": source_id, "response": source_response.text}, 134 ) 135 136 source_data = source_response.json() 137 source_definition_id = source_data.get("definitionId", "") 138 139 # Try to get docker repository and image tag from source definition version 140 docker_repository = None 141 docker_image_tag = None 142 if source_definition_id: 143 try: 144 # Use the Config API to get version info for the source 145 config_api_root = constants.CLOUD_CONFIG_API_ROOT 146 version_response = requests.post( 147 f"{config_api_root}/actor_definition_versions/get_for_source", 148 json={"sourceId": source_id}, 149 headers=headers, 150 timeout=30, 151 ) 152 if version_response.status_code == 200: 153 version_data = version_response.json() 154 docker_repository = version_data.get("dockerRepository") 155 docker_image_tag = version_data.get("dockerImageTag") 156 except Exception: 157 # Non-fatal: we can still proceed without docker info 158 pass 159 160 # Build configured catalog from connection streams 161 streams_config = conn_data.get("configurations", {}).get("streams", []) 162 stream_names = [s["name"] for s in streams_config] 163 164 # Build Airbyte protocol catalog format 165 catalog = _build_configured_catalog( 166 streams_config, source_id, headers, public_api_root 167 ) 168 169 return ConnectionData( 170 connection_id=connection_id, 171 source_id=source_id, 172 source_name=source_data.get("name", ""), 173 source_definition_id=source_definition_id, 174 config=source_data.get("configuration", {}), 175 catalog=catalog, 176 stream_names=stream_names, 177 workspace_id=conn_data.get("workspaceId"), 178 docker_repository=docker_repository, 179 docker_image_tag=docker_image_tag, 180 )
Fetch connection configuration and catalog from Airbyte Cloud.
Arguments:
- connection_id: The connection ID to fetch data for.
- client_id: Airbyte Cloud client ID (defaults to env var).
- client_secret: Airbyte Cloud client secret (defaults to env var).
Returns:
ConnectionData with config and catalog.
Raises:
- PyAirbyteInputError: If the API request fails.
53def is_secret_retriever_enabled() -> bool: 54 """Check if secret retrieval is enabled via environment variable. 55 56 Returns: 57 True if USE_CONNECTION_SECRET_RETRIEVER is set to a truthy value. 58 """ 59 value = os.getenv(ENV_USE_SECRET_RETRIEVER, "").lower() 60 return value in ("true", "1", "yes")
Check if secret retrieval is enabled via environment variable.
Returns:
True if USE_CONNECTION_SECRET_RETRIEVER is set to a truthy value.
72def retrieve_unmasked_config( 73 connection_id: str, 74 retrieval_reason: str = "MCP live tests", 75) -> dict | None: 76 """Retrieve unmasked source config from vendored connection-retriever. 77 78 This function directly queries the internal Postgres database to get 79 the source configuration with unmasked secrets. 80 81 Args: 82 connection_id: The Airbyte Cloud connection ID. 83 retrieval_reason: Reason for retrieval (for audit logging). 84 85 Returns: 86 The unmasked source config dict, or None if retrieval fails. 87 """ 88 # Only request the source config - that's all we need for secrets 89 requested_objects = [ConnectionObject.SOURCE_CONFIG] 90 91 candidates = retrieve_objects( 92 connection_objects=requested_objects, 93 retrieval_reason=retrieval_reason, 94 connection_id=connection_id, 95 ) 96 97 if not candidates: 98 logger.warning( 99 f"No connection data found for connection ID {connection_id} " 100 "via connection-retriever" 101 ) 102 return None 103 104 candidate = candidates[0] 105 if candidate.source_config: 106 return dict(candidate.source_config) 107 108 return None
Retrieve unmasked source config from vendored connection-retriever.
This function directly queries the internal Postgres database to get the source configuration with unmasked secrets.
Arguments:
- connection_id: The Airbyte Cloud connection ID.
- retrieval_reason: Reason for retrieval (for audit logging).
Returns:
The unmasked source config dict, or None if retrieval fails.
63def should_use_secret_retriever() -> bool: 64 """Check if secret retrieval should be used. 65 66 Returns: 67 True if USE_CONNECTION_SECRET_RETRIEVER env var is set to a truthy value. 68 """ 69 return is_secret_retriever_enabled()
Check if secret retrieval should be used.
Returns:
True if USE_CONNECTION_SECRET_RETRIEVER env var is set to a truthy value.