airbyte_ops_mcp.connection_config_retriever

Connection config retriever module.

This module provides functionality to retrieve unmasked connection configuration from Airbyte Cloud's internal database, including secret resolution from GCP Secret Manager and audit logging to GCP Cloud Logging.

Refactored from: live_tests/_connection_retriever Original source: airbyte-platform-internal/tools/connection-retriever

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2"""Connection config retriever module.
 3
 4This module provides functionality to retrieve unmasked connection configuration
 5from Airbyte Cloud's internal database, including secret resolution from GCP
 6Secret Manager and audit logging to GCP Cloud Logging.
 7
 8Refactored from: live_tests/_connection_retriever
 9Original source: airbyte-platform-internal/tools/connection-retriever
10"""
11
12from airbyte_ops_mcp.connection_config_retriever.retrieval import (
13    ConnectionNotFoundError,
14    RetrievalMetadata,
15    TestingCandidate,
16    retrieve_objects,
17)
18from airbyte_ops_mcp.constants import ConnectionObject
19
20__all__ = [
21    "ConnectionNotFoundError",
22    "ConnectionObject",
23    "RetrievalMetadata",
24    "TestingCandidate",
25    "retrieve_objects",
26]
class ConnectionNotFoundError(builtins.Exception):
149class ConnectionNotFoundError(Exception):
150    """Raised when a connection cannot be found."""
151
152    pass

Raised when a connection cannot be found.

class ConnectionObject(enum.Enum):
210class ConnectionObject(Enum):
211    """Types of connection objects that can be retrieved."""
212
213    CONNECTION = "connection"
214    SOURCE_ID = "source-id"
215    DESTINATION_ID = "destination-id"
216    DESTINATION_CONFIG = "destination-config"
217    SOURCE_CONFIG = "source-config"
218    CATALOG = "catalog"
219    CONFIGURED_CATALOG = "configured-catalog"
220    STATE = "state"
221    WORKSPACE_ID = "workspace-id"
222    DESTINATION_DOCKER_IMAGE = "destination-docker-image"
223    SOURCE_DOCKER_IMAGE = "source-docker-image"

Types of connection objects that can be retrieved.

CONNECTION = <ConnectionObject.CONNECTION: 'connection'>
SOURCE_ID = <ConnectionObject.SOURCE_ID: 'source-id'>
DESTINATION_ID = <ConnectionObject.DESTINATION_ID: 'destination-id'>
DESTINATION_CONFIG = <ConnectionObject.DESTINATION_CONFIG: 'destination-config'>
SOURCE_CONFIG = <ConnectionObject.SOURCE_CONFIG: 'source-config'>
CATALOG = <ConnectionObject.CATALOG: 'catalog'>
CONFIGURED_CATALOG = <ConnectionObject.CONFIGURED_CATALOG: 'configured-catalog'>
STATE = <ConnectionObject.STATE: 'state'>
WORKSPACE_ID = <ConnectionObject.WORKSPACE_ID: 'workspace-id'>
DESTINATION_DOCKER_IMAGE = <ConnectionObject.DESTINATION_DOCKER_IMAGE: 'destination-docker-image'>
SOURCE_DOCKER_IMAGE = <ConnectionObject.SOURCE_DOCKER_IMAGE: 'source-docker-image'>
@dataclass
class RetrievalMetadata:
103@dataclass
104class RetrievalMetadata:
105    """Metadata about a retrieval operation for audit logging."""
106
107    connection_id: str
108    connection_object: ConnectionObject
109    retrieval_reason: str

Metadata about a retrieval operation for audit logging.

RetrievalMetadata( connection_id: str, connection_object: ConnectionObject, retrieval_reason: str)
connection_id: str
connection_object: ConnectionObject
retrieval_reason: str
@dataclass
class TestingCandidate:
112@dataclass
113class TestingCandidate:
114    """A connection candidate for testing."""
115
116    connection_id: str
117    connection_url: str | None = None
118    stream_count: int | None = None
119    last_attempt_duration_in_microseconds: int | None = None
120    is_internal: bool | None = None
121    streams_with_data: list[str] | None = None
122
123    # ConnectionObject fields
124    connection: str | None = None
125    source_id: str | None = None
126    destination_id: str | None = None
127
128    destination_config: Mapping | None = None
129    source_config: Mapping | None = None
130    catalog: Mapping | None = None
131    configured_catalog: Mapping | None = None
132    state: list[Mapping] | None = None
133
134    workspace_id: str | None = None
135    destination_docker_image: str | None = None
136    source_docker_image: str | None = None
137
138    def update(self, **kwargs: Any) -> None:
139        """Update fields from keyword arguments."""
140        for key, value in kwargs.items():
141            if hasattr(self, key):
142                setattr(self, key, value)
143            else:
144                raise AttributeError(
145                    f"{key} is not a valid field of {self.__class__.__name__}"
146                )

A connection candidate for testing.

TestingCandidate( connection_id: str, connection_url: str | None = None, stream_count: int | None = None, last_attempt_duration_in_microseconds: int | None = None, is_internal: bool | None = None, streams_with_data: list[str] | None = None, connection: str | None = None, source_id: str | None = None, destination_id: str | None = None, destination_config: Optional[Mapping] = None, source_config: Optional[Mapping] = None, catalog: Optional[Mapping] = None, configured_catalog: Optional[Mapping] = None, state: list[typing.Mapping] | None = None, workspace_id: str | None = None, destination_docker_image: str | None = None, source_docker_image: str | None = None)
connection_id: str
connection_url: str | None = None
stream_count: int | None = None
last_attempt_duration_in_microseconds: int | None = None
is_internal: bool | None = None
streams_with_data: list[str] | None = None
connection: str | None = None
source_id: str | None = None
destination_id: str | None = None
destination_config: Optional[Mapping] = None
source_config: Optional[Mapping] = None
catalog: Optional[Mapping] = None
configured_catalog: Optional[Mapping] = None
state: list[typing.Mapping] | None = None
workspace_id: str | None = None
destination_docker_image: str | None = None
source_docker_image: str | None = None
def update(self, **kwargs: Any) -> None:
138    def update(self, **kwargs: Any) -> None:
139        """Update fields from keyword arguments."""
140        for key, value in kwargs.items():
141            if hasattr(self, key):
142                setattr(self, key, value)
143            else:
144                raise AttributeError(
145                    f"{key} is not a valid field of {self.__class__.__name__}"
146                )

Update fields from keyword arguments.

def retrieve_objects( connection_objects: list[ConnectionObject], retrieval_reason: str, connection_id: str) -> list[TestingCandidate]:
299def retrieve_objects(
300    connection_objects: list[ConnectionObject],
301    retrieval_reason: str,
302    connection_id: str,
303) -> list[TestingCandidate]:
304    """Retrieve connection objects for a given connection ID.
305
306    This is a simplified version that only supports retrieval by connection_id.
307    For testing candidate discovery by docker image, see issue #91.
308    """
309    connection_candidates = [TestingCandidate(connection_id=connection_id)]
310
311    secret_manager_client = get_secret_manager_client()
312    connection_pool = get_pool(secret_manager_client)
313
314    with connection_pool.connect() as db_conn:
315        for candidate in connection_candidates.copy():
316            is_eu_result = db_conn.execute(
317                SELECT_ON_CONNECTION_DATAPLANE_GROUP_IS_EU,
318                parameters={"connection_id": candidate.connection_id},
319            ).first()
320            if is_eu_result is None:
321                raise ConnectionNotFoundError(
322                    f"Credentials were not found for connection ID {candidate.connection_id}."
323                )
324            elif is_eu_result[0] is True:
325                connection_candidates.remove(candidate)
326                LOGGER.warning(
327                    f"Credential retrieval not permitted; the data residency for "
328                    f"connection ID {candidate.connection_id} is within the EU. "
329                    f"Candidate will be removed from the list"
330                )
331                continue
332
333            candidate.update(
334                **{
335                    connection_object.value.replace("-", "_"): retrieve_object(
336                        candidate.connection_id,
337                        connection_object,
338                        retrieval_reason,
339                        db_conn,
340                        secret_manager_client,
341                    )
342                    for connection_object in connection_objects
343                }
344            )
345
346    return connection_candidates

Retrieve connection objects for a given connection ID.

This is a simplified version that only supports retrieval by connection_id. For testing candidate discovery by docker image, see issue #91.