airbyte_ops_mcp.regression_tests

Live tests module for running connector validation and regression tests.

This module provides tools for testing Airbyte connectors against live data without using Dagger. It uses Docker SDK directly for container orchestration.

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2"""Live tests module for running connector validation and regression tests.
 3
 4This module provides tools for testing Airbyte connectors against live data
 5without using Dagger. It uses Docker SDK directly for container orchestration.
 6"""
 7
 8from airbyte_ops_mcp.regression_tests.connection_fetcher import (
 9    ConnectionData,
10    fetch_connection_data,
11)
12from airbyte_ops_mcp.regression_tests.connection_secret_retriever import (
13    SecretRetrievalError,
14    enrich_config_with_secrets,
15    is_secret_retriever_enabled,
16    retrieve_unmasked_config,
17    should_use_secret_retriever,
18)
19from airbyte_ops_mcp.regression_tests.models import (
20    Command,
21    ConnectorUnderTest,
22    ExecutionResult,
23    TargetOrControl,
24)
25
26__all__ = [
27    "Command",
28    "ConnectionData",
29    "ConnectorUnderTest",
30    "ExecutionResult",
31    "SecretRetrievalError",
32    "TargetOrControl",
33    "enrich_config_with_secrets",
34    "fetch_connection_data",
35    "is_secret_retriever_enabled",
36    "retrieve_unmasked_config",
37    "should_use_secret_retriever",
38]
class Command(enum.Enum):
25class Command(Enum):
26    """Airbyte connector commands."""
27
28    CHECK = "check"
29    DISCOVER = "discover"
30    READ = "read"
31    READ_WITH_STATE = "read-with-state"
32    SPEC = "spec"
33
34    def needs_config(self) -> bool:
35        return self in {
36            Command.CHECK,
37            Command.DISCOVER,
38            Command.READ,
39            Command.READ_WITH_STATE,
40        }
41
42    def needs_catalog(self) -> bool:
43        return self in {Command.READ, Command.READ_WITH_STATE}
44
45    def needs_state(self) -> bool:
46        return self in {Command.READ_WITH_STATE}

Airbyte connector commands.

CHECK = <Command.CHECK: 'check'>
DISCOVER = <Command.DISCOVER: 'discover'>
READ = <Command.READ: 'read'>
READ_WITH_STATE = <Command.READ_WITH_STATE: 'read-with-state'>
SPEC = <Command.SPEC: 'spec'>
def needs_config(self) -> bool:
34    def needs_config(self) -> bool:
35        return self in {
36            Command.CHECK,
37            Command.DISCOVER,
38            Command.READ,
39            Command.READ_WITH_STATE,
40        }
def needs_catalog(self) -> bool:
42    def needs_catalog(self) -> bool:
43        return self in {Command.READ, Command.READ_WITH_STATE}
def needs_state(self) -> bool:
45    def needs_state(self) -> bool:
46        return self in {Command.READ_WITH_STATE}
@dataclass
class ConnectionData:
23@dataclass
24class ConnectionData:
25    """Data fetched from an Airbyte Cloud connection."""
26
27    connection_id: str
28    source_id: str
29    source_name: str
30    source_definition_id: str
31    config: dict[str, Any]
32    catalog: dict[str, Any]
33    stream_names: list[str]
34    workspace_id: str | None = None
35    docker_repository: str | None = None
36    docker_image_tag: str | None = None
37    state: list[dict[str, Any]] | None = field(default=None)
38
39    @property
40    def connector_image(self) -> str | None:
41        """Get the full connector image name with tag."""
42        if self.docker_repository and self.docker_image_tag:
43            return f"{self.docker_repository}:{self.docker_image_tag}"
44        return None

Data fetched from an Airbyte Cloud connection.

ConnectionData( connection_id: str, source_id: str, source_name: str, source_definition_id: str, config: dict[str, typing.Any], catalog: dict[str, typing.Any], stream_names: list[str], workspace_id: str | None = None, docker_repository: str | None = None, docker_image_tag: str | None = None, state: list[dict[str, typing.Any]] | None = None)
connection_id: str
source_id: str
source_name: str
source_definition_id: str
config: dict[str, typing.Any]
catalog: dict[str, typing.Any]
stream_names: list[str]
workspace_id: str | None = None
docker_repository: str | None = None
docker_image_tag: str | None = None
state: list[dict[str, typing.Any]] | None = None
connector_image: str | None
39    @property
40    def connector_image(self) -> str | None:
41        """Get the full connector image name with tag."""
42        if self.docker_repository and self.docker_image_tag:
43            return f"{self.docker_repository}:{self.docker_image_tag}"
44        return None

Get the full connector image name with tag.

@dataclass
class ConnectorUnderTest:
 63@dataclass
 64class ConnectorUnderTest:
 65    """Represents a connector being tested.
 66
 67    In validation tests, there would be one connector under test.
 68    When running regression tests, there would be two connectors under test:
 69    the target and the control versions of the same connector.
 70    """
 71
 72    image_name: str
 73    target_or_control: TargetOrControl
 74
 75    @property
 76    def name(self) -> str:
 77        """Get connector name without registry prefix."""
 78        return self.image_name.replace("airbyte/", "").split(":")[0]
 79
 80    @property
 81    def name_without_type_prefix(self) -> str:
 82        """Get connector name without actor type prefix."""
 83        return self.name.replace(f"{self.actor_type.value}-", "")
 84
 85    @property
 86    def version(self) -> str:
 87        """Get connector version from image tag."""
 88        return self.image_name.replace("airbyte/", "").split(":")[1]
 89
 90    @property
 91    def actor_type(self) -> ActorType:
 92        """Infer actor type from image name."""
 93        if "airbyte/destination-" in self.image_name:
 94            return ActorType.DESTINATION
 95        elif "airbyte/source-" in self.image_name:
 96            return ActorType.SOURCE
 97        else:
 98            raise ValueError(
 99                f"Can't infer the actor type. Connector image name {self.image_name} "
100                "does not contain 'airbyte/source' or 'airbyte/destination'"
101            )
102
103    @classmethod
104    def from_image_name(
105        cls,
106        image_name: str,
107        target_or_control: TargetOrControl,
108    ) -> ConnectorUnderTest:
109        """Create a ConnectorUnderTest from an image name."""
110        return cls(image_name, target_or_control)

Represents a connector being tested.

In validation tests, there would be one connector under test. When running regression tests, there would be two connectors under test: the target and the control versions of the same connector.

ConnectorUnderTest( image_name: str, target_or_control: TargetOrControl)
image_name: str
target_or_control: TargetOrControl
name: str
75    @property
76    def name(self) -> str:
77        """Get connector name without registry prefix."""
78        return self.image_name.replace("airbyte/", "").split(":")[0]

Get connector name without registry prefix.

name_without_type_prefix: str
80    @property
81    def name_without_type_prefix(self) -> str:
82        """Get connector name without actor type prefix."""
83        return self.name.replace(f"{self.actor_type.value}-", "")

Get connector name without actor type prefix.

version: str
85    @property
86    def version(self) -> str:
87        """Get connector version from image tag."""
88        return self.image_name.replace("airbyte/", "").split(":")[1]

Get connector version from image tag.

actor_type: airbyte_ops_mcp.regression_tests.models.ActorType
 90    @property
 91    def actor_type(self) -> ActorType:
 92        """Infer actor type from image name."""
 93        if "airbyte/destination-" in self.image_name:
 94            return ActorType.DESTINATION
 95        elif "airbyte/source-" in self.image_name:
 96            return ActorType.SOURCE
 97        else:
 98            raise ValueError(
 99                f"Can't infer the actor type. Connector image name {self.image_name} "
100                "does not contain 'airbyte/source' or 'airbyte/destination'"
101            )

Infer actor type from image name.

@classmethod
def from_image_name( cls, image_name: str, target_or_control: TargetOrControl) -> ConnectorUnderTest:
103    @classmethod
104    def from_image_name(
105        cls,
106        image_name: str,
107        target_or_control: TargetOrControl,
108    ) -> ConnectorUnderTest:
109        """Create a ConnectorUnderTest from an image name."""
110        return cls(image_name, target_or_control)

Create a ConnectorUnderTest from an image name.

@dataclass
class ExecutionResult:
135@dataclass
136class ExecutionResult:
137    """Result of executing a connector command."""
138
139    connector_under_test: ConnectorUnderTest
140    command: Command
141    stdout_file_path: Path
142    stderr_file_path: Path
143    success: bool
144    exit_code: int
145    configured_catalog: ConfiguredAirbyteCatalog | None = None
146    config: dict[str, Any] | None = None
147    _airbyte_messages: list[AirbyteMessage] = field(default_factory=list)
148    _messages_loaded: bool = field(default=False, repr=False)
149
150    @property
151    def logger(self) -> logging.Logger:
152        return logging.getLogger(
153            f"{self.connector_under_test.target_or_control.value}-{self.command.value}"
154        )
155
156    @cached_property
157    def airbyte_messages(self) -> list[AirbyteMessage]:
158        """Parse and return all Airbyte messages from stdout."""
159        if self._messages_loaded:
160            return self._airbyte_messages
161
162        messages = []
163        for line in self.stdout_file_path.read_text().splitlines():
164            line = line.strip()
165            if not line:
166                continue
167            with contextlib.suppress(ValidationError):
168                messages.append(AirbyteMessage.parse_raw(line))
169        self._airbyte_messages = messages
170        self._messages_loaded = True
171        return messages
172
173    @property
174    def configured_streams(self) -> list[str]:
175        """Get list of configured stream names."""
176        if not self.configured_catalog:
177            return []
178        return [stream.stream.name for stream in self.configured_catalog.streams]
179
180    def get_records(self) -> Iterator[AirbyteMessage]:
181        """Iterate over record messages."""
182        for message in self.airbyte_messages:
183            if message.type is AirbyteMessageType.RECORD:
184                yield message
185
186    def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]:
187        """Get records for a specific stream."""
188        for message in self.get_records():
189            if message.record.stream == stream:
190                yield message
191
192    def get_states(self) -> Iterator[AirbyteMessage]:
193        """Iterate over state messages."""
194        for message in self.airbyte_messages:
195            if message.type is AirbyteMessageType.STATE:
196                yield message
197
198    def get_message_count_per_type(self) -> dict[AirbyteMessageType, int]:
199        """Count messages by type."""
200        counts: dict[AirbyteMessageType, int] = defaultdict(int)
201        for message in self.airbyte_messages:
202            counts[message.type] += 1
203        return dict(counts)
204
205    def get_record_count_per_stream(self) -> dict[str, int]:
206        """Count records by stream name.
207
208        Returns:
209            Dictionary mapping stream names to record counts.
210        """
211        counts: dict[str, int] = defaultdict(int)
212        for message in self.get_records():
213            counts[message.record.stream] += 1
214        return dict(counts)
215
216    def get_catalog(self) -> AirbyteCatalog | None:
217        """Get discovered catalog from messages."""
218        for message in self.airbyte_messages:
219            if message.type is AirbyteMessageType.CATALOG:
220                return message.catalog
221        return None
222
223    def get_spec(self) -> Any | None:
224        """Get connector spec from messages."""
225        for message in self.airbyte_messages:
226            if message.type is AirbyteMessageType.SPEC:
227                return message.spec
228        return None
229
230    def get_connection_status(self) -> Any | None:
231        """Get connection status from check command."""
232        for message in self.airbyte_messages:
233            if message.type is AirbyteMessageType.CONNECTION_STATUS:
234                return message.connectionStatus
235        return None
236
237    def is_check_successful(self) -> bool:
238        """Check if the check command was successful."""
239        status = self.get_connection_status()
240        if status is None:
241            return False
242        return status.status.value == "SUCCEEDED"
243
244    def save_artifacts(self, output_dir: Path) -> None:
245        """Save execution artifacts to the output directory."""
246        output_dir.mkdir(parents=True, exist_ok=True)
247
248        airbyte_messages_dir = output_dir / "airbyte_messages"
249        airbyte_messages_dir.mkdir(parents=True, exist_ok=True)
250
251        messages_by_type: dict[str, list[str]] = defaultdict(list)
252        for message in self.airbyte_messages:
253            type_name = message.type.value.lower()
254            messages_by_type[type_name].append(message.model_dump_json())
255
256        for type_name, messages in messages_by_type.items():
257            file_path = airbyte_messages_dir / f"{type_name}.jsonl"
258            file_path.write_text("\n".join(messages))
259
260        # Save configured catalog (input) if available
261        if self.configured_catalog is not None:
262            catalog_path = output_dir / "configured_catalog.json"
263            catalog_path.write_text(self.configured_catalog.model_dump_json(indent=2))
264            self.logger.info(f"Saved configured catalog to {catalog_path}")
265
266        self.logger.info(f"Artifacts saved to {output_dir}")

Result of executing a connector command.

ExecutionResult( connector_under_test: ConnectorUnderTest, command: Command, stdout_file_path: pathlib.Path, stderr_file_path: pathlib.Path, success: bool, exit_code: int, configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog | None = None, config: dict[str, typing.Any] | None = None, _airbyte_messages: list[airbyte_protocol.models.airbyte_protocol.AirbyteMessage] = <factory>, _messages_loaded: bool = False)
connector_under_test: ConnectorUnderTest
command: Command
stdout_file_path: pathlib.Path
stderr_file_path: pathlib.Path
success: bool
exit_code: int
configured_catalog: airbyte_protocol.models.airbyte_protocol.ConfiguredAirbyteCatalog | None = None
config: dict[str, typing.Any] | None = None
logger: logging.Logger
150    @property
151    def logger(self) -> logging.Logger:
152        return logging.getLogger(
153            f"{self.connector_under_test.target_or_control.value}-{self.command.value}"
154        )
airbyte_messages: list[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]
156    @cached_property
157    def airbyte_messages(self) -> list[AirbyteMessage]:
158        """Parse and return all Airbyte messages from stdout."""
159        if self._messages_loaded:
160            return self._airbyte_messages
161
162        messages = []
163        for line in self.stdout_file_path.read_text().splitlines():
164            line = line.strip()
165            if not line:
166                continue
167            with contextlib.suppress(ValidationError):
168                messages.append(AirbyteMessage.parse_raw(line))
169        self._airbyte_messages = messages
170        self._messages_loaded = True
171        return messages

Parse and return all Airbyte messages from stdout.

configured_streams: list[str]
173    @property
174    def configured_streams(self) -> list[str]:
175        """Get list of configured stream names."""
176        if not self.configured_catalog:
177            return []
178        return [stream.stream.name for stream in self.configured_catalog.streams]

Get list of configured stream names.

def get_records( self) -> Iterator[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]:
180    def get_records(self) -> Iterator[AirbyteMessage]:
181        """Iterate over record messages."""
182        for message in self.airbyte_messages:
183            if message.type is AirbyteMessageType.RECORD:
184                yield message

Iterate over record messages.

def get_records_per_stream( self, stream: str) -> Iterator[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]:
186    def get_records_per_stream(self, stream: str) -> Iterator[AirbyteMessage]:
187        """Get records for a specific stream."""
188        for message in self.get_records():
189            if message.record.stream == stream:
190                yield message

Get records for a specific stream.

def get_states( self) -> Iterator[airbyte_protocol.models.airbyte_protocol.AirbyteMessage]:
192    def get_states(self) -> Iterator[AirbyteMessage]:
193        """Iterate over state messages."""
194        for message in self.airbyte_messages:
195            if message.type is AirbyteMessageType.STATE:
196                yield message

Iterate over state messages.

def get_message_count_per_type(self) -> dict[airbyte_protocol.models.airbyte_protocol.Type, int]:
198    def get_message_count_per_type(self) -> dict[AirbyteMessageType, int]:
199        """Count messages by type."""
200        counts: dict[AirbyteMessageType, int] = defaultdict(int)
201        for message in self.airbyte_messages:
202            counts[message.type] += 1
203        return dict(counts)

Count messages by type.

def get_record_count_per_stream(self) -> dict[str, int]:
205    def get_record_count_per_stream(self) -> dict[str, int]:
206        """Count records by stream name.
207
208        Returns:
209            Dictionary mapping stream names to record counts.
210        """
211        counts: dict[str, int] = defaultdict(int)
212        for message in self.get_records():
213            counts[message.record.stream] += 1
214        return dict(counts)

Count records by stream name.

Returns:

Dictionary mapping stream names to record counts.

def get_catalog(self) -> airbyte_protocol.models.airbyte_protocol.AirbyteCatalog | None:
216    def get_catalog(self) -> AirbyteCatalog | None:
217        """Get discovered catalog from messages."""
218        for message in self.airbyte_messages:
219            if message.type is AirbyteMessageType.CATALOG:
220                return message.catalog
221        return None

Get discovered catalog from messages.

def get_spec(self) -> typing.Any | None:
223    def get_spec(self) -> Any | None:
224        """Get connector spec from messages."""
225        for message in self.airbyte_messages:
226            if message.type is AirbyteMessageType.SPEC:
227                return message.spec
228        return None

Get connector spec from messages.

def get_connection_status(self) -> typing.Any | None:
230    def get_connection_status(self) -> Any | None:
231        """Get connection status from check command."""
232        for message in self.airbyte_messages:
233            if message.type is AirbyteMessageType.CONNECTION_STATUS:
234                return message.connectionStatus
235        return None

Get connection status from check command.

def is_check_successful(self) -> bool:
237    def is_check_successful(self) -> bool:
238        """Check if the check command was successful."""
239        status = self.get_connection_status()
240        if status is None:
241            return False
242        return status.status.value == "SUCCEEDED"

Check if the check command was successful.

def save_artifacts(self, output_dir: pathlib.Path) -> None:
244    def save_artifacts(self, output_dir: Path) -> None:
245        """Save execution artifacts to the output directory."""
246        output_dir.mkdir(parents=True, exist_ok=True)
247
248        airbyte_messages_dir = output_dir / "airbyte_messages"
249        airbyte_messages_dir.mkdir(parents=True, exist_ok=True)
250
251        messages_by_type: dict[str, list[str]] = defaultdict(list)
252        for message in self.airbyte_messages:
253            type_name = message.type.value.lower()
254            messages_by_type[type_name].append(message.model_dump_json())
255
256        for type_name, messages in messages_by_type.items():
257            file_path = airbyte_messages_dir / f"{type_name}.jsonl"
258            file_path.write_text("\n".join(messages))
259
260        # Save configured catalog (input) if available
261        if self.configured_catalog is not None:
262            catalog_path = output_dir / "configured_catalog.json"
263            catalog_path.write_text(self.configured_catalog.model_dump_json(indent=2))
264            self.logger.info(f"Saved configured catalog to {catalog_path}")
265
266        self.logger.info(f"Artifacts saved to {output_dir}")

Save execution artifacts to the output directory.

class SecretRetrievalError(builtins.Exception):
111class SecretRetrievalError(Exception):
112    """Raised when secret retrieval fails.
113
114    This exception is raised when USE_CONNECTION_SECRET_RETRIEVER is enabled
115    but secrets cannot be retrieved (e.g., EU data residency restrictions).
116    """

Raised when secret retrieval fails.

This exception is raised when USE_CONNECTION_SECRET_RETRIEVER is enabled but secrets cannot be retrieved (e.g., EU data residency restrictions).

class TargetOrControl(enum.Enum):
49class TargetOrControl(Enum):
50    """Indicates whether a connector is the target (new) or control (baseline) version."""
51
52    TARGET = "target"
53    CONTROL = "control"

Indicates whether a connector is the target (new) or control (baseline) version.

TARGET = <TargetOrControl.TARGET: 'target'>
CONTROL = <TargetOrControl.CONTROL: 'control'>
def enrich_config_with_secrets( connection_data: ConnectionData, retrieval_reason: str = 'MCP live tests', raise_on_failure: bool = True) -> ConnectionData:
119def enrich_config_with_secrets(
120    connection_data: ConnectionData,
121    retrieval_reason: str = "MCP live tests",
122    raise_on_failure: bool = True,
123) -> ConnectionData:
124    """Enrich connection data with unmasked secrets from internal retriever.
125
126    This function takes a ConnectionData object (typically from the public
127    Cloud API with masked secrets) and replaces the config with unmasked
128    secrets from the internal connection-retriever.
129
130    Args:
131        connection_data: The connection data to enrich.
132        retrieval_reason: Reason for retrieval (for audit logging).
133        raise_on_failure: If True (default), raise SecretRetrievalError when
134            secrets cannot be retrieved. If False, return the original
135            connection_data with masked secrets (legacy behavior).
136
137    Returns:
138        A new ConnectionData with unmasked config.
139
140    Raises:
141        SecretRetrievalError: If raise_on_failure is True and secrets cannot
142            be retrieved (e.g., due to EU data residency restrictions).
143    """
144    unmasked_config = retrieve_unmasked_config(
145        connection_id=connection_data.connection_id,
146        retrieval_reason=retrieval_reason,
147    )
148
149    if unmasked_config is None:
150        error_msg = (
151            "Could not retrieve unmasked secrets for connection "
152            f"{connection_data.connection_id}. This may be due to EU data "
153            "residency restrictions or database connectivity issues. "
154            "The connection's credentials cannot be used for regression testing."
155        )
156        logger.warning(error_msg)
157        if raise_on_failure:
158            raise SecretRetrievalError(error_msg)
159        return connection_data
160
161    logger.info(
162        f"Successfully enriched config with unmasked secrets for "
163        f"{connection_data.connection_id}"
164    )
165
166    # Return a new ConnectionData with the unmasked config
167    return replace(connection_data, config=unmasked_config)

Enrich connection data with unmasked secrets from internal retriever.

This function takes a ConnectionData object (typically from the public Cloud API with masked secrets) and replaces the config with unmasked secrets from the internal connection-retriever.

Arguments:
  • connection_data: The connection data to enrich.
  • retrieval_reason: Reason for retrieval (for audit logging).
  • raise_on_failure: If True (default), raise SecretRetrievalError when secrets cannot be retrieved. If False, return the original connection_data with masked secrets (legacy behavior).
Returns:

A new ConnectionData with unmasked config.

Raises:
  • SecretRetrievalError: If raise_on_failure is True and secrets cannot be retrieved (e.g., due to EU data residency restrictions).
def fetch_connection_data( connection_id: str, client_id: str | None = None, client_secret: str | None = None) -> ConnectionData:
 71def fetch_connection_data(
 72    connection_id: str,
 73    client_id: str | None = None,
 74    client_secret: str | None = None,
 75) -> ConnectionData:
 76    """Fetch connection configuration and catalog from Airbyte Cloud.
 77
 78    Args:
 79        connection_id: The connection ID to fetch data for.
 80        client_id: Airbyte Cloud client ID (defaults to env var).
 81        client_secret: Airbyte Cloud client secret (defaults to env var).
 82
 83    Returns:
 84        ConnectionData with config and catalog.
 85
 86    Raises:
 87        PyAirbyteInputError: If the API request fails.
 88    """
 89    client_id = client_id or os.getenv("AIRBYTE_CLOUD_CLIENT_ID")
 90    client_secret = client_secret or os.getenv("AIRBYTE_CLOUD_CLIENT_SECRET")
 91
 92    if not client_id or not client_secret:
 93        raise PyAirbyteInputError(
 94            message="Missing Airbyte Cloud credentials",
 95            context={
 96                "hint": "Set AIRBYTE_CLOUD_CLIENT_ID and AIRBYTE_CLOUD_CLIENT_SECRET env vars"
 97            },
 98        )
 99
100    access_token = _get_access_token(client_id, client_secret)
101    public_api_root = constants.CLOUD_API_ROOT
102    headers = {
103        "Authorization": f"Bearer {access_token}",
104        "Content-Type": "application/json",
105    }
106
107    # Get connection details
108    conn_response = requests.get(
109        f"{public_api_root}/connections/{connection_id}",
110        headers=headers,
111        timeout=30,
112    )
113
114    if conn_response.status_code != 200:
115        raise PyAirbyteInputError(
116            message=f"Failed to get connection: {conn_response.status_code}",
117            context={"connection_id": connection_id, "response": conn_response.text},
118        )
119
120    conn_data = conn_response.json()
121    source_id = conn_data["sourceId"]
122
123    # Get source details (includes config)
124    source_response = requests.get(
125        f"{public_api_root}/sources/{source_id}",
126        headers=headers,
127        timeout=30,
128    )
129
130    if source_response.status_code != 200:
131        raise PyAirbyteInputError(
132            message=f"Failed to get source: {source_response.status_code}",
133            context={"source_id": source_id, "response": source_response.text},
134        )
135
136    source_data = source_response.json()
137    source_definition_id = source_data.get("definitionId", "")
138
139    # Try to get docker repository and image tag from source definition version
140    docker_repository = None
141    docker_image_tag = None
142    if source_definition_id:
143        try:
144            # Use the Config API to get version info for the source
145            config_api_root = constants.CLOUD_CONFIG_API_ROOT
146            version_response = requests.post(
147                f"{config_api_root}/actor_definition_versions/get_for_source",
148                json={"sourceId": source_id},
149                headers=headers,
150                timeout=30,
151            )
152            if version_response.status_code == 200:
153                version_data = version_response.json()
154                docker_repository = version_data.get("dockerRepository")
155                docker_image_tag = version_data.get("dockerImageTag")
156        except Exception:
157            # Non-fatal: we can still proceed without docker info
158            pass
159
160    # Build configured catalog from connection streams
161    streams_config = conn_data.get("configurations", {}).get("streams", [])
162    stream_names = [s["name"] for s in streams_config]
163
164    # Build Airbyte protocol catalog format
165    catalog = _build_configured_catalog(
166        streams_config, source_id, headers, public_api_root
167    )
168
169    return ConnectionData(
170        connection_id=connection_id,
171        source_id=source_id,
172        source_name=source_data.get("name", ""),
173        source_definition_id=source_definition_id,
174        config=source_data.get("configuration", {}),
175        catalog=catalog,
176        stream_names=stream_names,
177        workspace_id=conn_data.get("workspaceId"),
178        docker_repository=docker_repository,
179        docker_image_tag=docker_image_tag,
180    )

Fetch connection configuration and catalog from Airbyte Cloud.

Arguments:
  • connection_id: The connection ID to fetch data for.
  • client_id: Airbyte Cloud client ID (defaults to env var).
  • client_secret: Airbyte Cloud client secret (defaults to env var).
Returns:

ConnectionData with config and catalog.

Raises:
  • PyAirbyteInputError: If the API request fails.
def is_secret_retriever_enabled() -> bool:
53def is_secret_retriever_enabled() -> bool:
54    """Check if secret retrieval is enabled via environment variable.
55
56    Returns:
57        True if USE_CONNECTION_SECRET_RETRIEVER is set to a truthy value.
58    """
59    value = os.getenv(ENV_USE_SECRET_RETRIEVER, "").lower()
60    return value in ("true", "1", "yes")

Check if secret retrieval is enabled via environment variable.

Returns:

True if USE_CONNECTION_SECRET_RETRIEVER is set to a truthy value.

def retrieve_unmasked_config( connection_id: str, retrieval_reason: str = 'MCP live tests') -> dict | None:
 72def retrieve_unmasked_config(
 73    connection_id: str,
 74    retrieval_reason: str = "MCP live tests",
 75) -> dict | None:
 76    """Retrieve unmasked source config from vendored connection-retriever.
 77
 78    This function directly queries the internal Postgres database to get
 79    the source configuration with unmasked secrets.
 80
 81    Args:
 82        connection_id: The Airbyte Cloud connection ID.
 83        retrieval_reason: Reason for retrieval (for audit logging).
 84
 85    Returns:
 86        The unmasked source config dict, or None if retrieval fails.
 87    """
 88    # Only request the source config - that's all we need for secrets
 89    requested_objects = [ConnectionObject.SOURCE_CONFIG]
 90
 91    candidates = retrieve_objects(
 92        connection_objects=requested_objects,
 93        retrieval_reason=retrieval_reason,
 94        connection_id=connection_id,
 95    )
 96
 97    if not candidates:
 98        logger.warning(
 99            f"No connection data found for connection ID {connection_id} "
100            "via connection-retriever"
101        )
102        return None
103
104    candidate = candidates[0]
105    if candidate.source_config:
106        return dict(candidate.source_config)
107
108    return None

Retrieve unmasked source config from vendored connection-retriever.

This function directly queries the internal Postgres database to get the source configuration with unmasked secrets.

Arguments:
  • connection_id: The Airbyte Cloud connection ID.
  • retrieval_reason: Reason for retrieval (for audit logging).
Returns:

The unmasked source config dict, or None if retrieval fails.

def should_use_secret_retriever() -> bool:
63def should_use_secret_retriever() -> bool:
64    """Check if secret retrieval should be used.
65
66    Returns:
67        True if USE_CONNECTION_SECRET_RETRIEVER env var is set to a truthy value.
68    """
69    return is_secret_retriever_enabled()

Check if secret retrieval should be used.

Returns:

True if USE_CONNECTION_SECRET_RETRIEVER env var is set to a truthy value.