airbyte_ops_mcp.connection_config_retriever
Connection config retriever module.
This module provides functionality to retrieve unmasked connection configuration from Airbyte Cloud's internal database, including secret resolution from GCP Secret Manager and audit logging to GCP Cloud Logging.
Refactored from: live_tests/_connection_retriever Original source: airbyte-platform-internal/tools/connection-retriever
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Connection config retriever module. 3 4This module provides functionality to retrieve unmasked connection configuration 5from Airbyte Cloud's internal database, including secret resolution from GCP 6Secret Manager and audit logging to GCP Cloud Logging. 7 8Refactored from: live_tests/_connection_retriever 9Original source: airbyte-platform-internal/tools/connection-retriever 10""" 11 12from airbyte_ops_mcp.connection_config_retriever.retrieval import ( 13 ConnectionNotFoundError, 14 RetrievalMetadata, 15 TestingCandidate, 16 retrieve_objects, 17) 18from airbyte_ops_mcp.constants import ConnectionObject 19 20__all__ = [ 21 "ConnectionNotFoundError", 22 "ConnectionObject", 23 "RetrievalMetadata", 24 "TestingCandidate", 25 "retrieve_objects", 26]
149class ConnectionNotFoundError(Exception): 150 """Raised when a connection cannot be found.""" 151 152 pass
Raised when a connection cannot be found.
210class ConnectionObject(Enum): 211 """Types of connection objects that can be retrieved.""" 212 213 CONNECTION = "connection" 214 SOURCE_ID = "source-id" 215 DESTINATION_ID = "destination-id" 216 DESTINATION_CONFIG = "destination-config" 217 SOURCE_CONFIG = "source-config" 218 CATALOG = "catalog" 219 CONFIGURED_CATALOG = "configured-catalog" 220 STATE = "state" 221 WORKSPACE_ID = "workspace-id" 222 DESTINATION_DOCKER_IMAGE = "destination-docker-image" 223 SOURCE_DOCKER_IMAGE = "source-docker-image"
Types of connection objects that can be retrieved.
103@dataclass 104class RetrievalMetadata: 105 """Metadata about a retrieval operation for audit logging.""" 106 107 connection_id: str 108 connection_object: ConnectionObject 109 retrieval_reason: str
Metadata about a retrieval operation for audit logging.
112@dataclass 113class TestingCandidate: 114 """A connection candidate for testing.""" 115 116 connection_id: str 117 connection_url: str | None = None 118 stream_count: int | None = None 119 last_attempt_duration_in_microseconds: int | None = None 120 is_internal: bool | None = None 121 streams_with_data: list[str] | None = None 122 123 # ConnectionObject fields 124 connection: str | None = None 125 source_id: str | None = None 126 destination_id: str | None = None 127 128 destination_config: Mapping | None = None 129 source_config: Mapping | None = None 130 catalog: Mapping | None = None 131 configured_catalog: Mapping | None = None 132 state: list[Mapping] | None = None 133 134 workspace_id: str | None = None 135 destination_docker_image: str | None = None 136 source_docker_image: str | None = None 137 138 def update(self, **kwargs: Any) -> None: 139 """Update fields from keyword arguments.""" 140 for key, value in kwargs.items(): 141 if hasattr(self, key): 142 setattr(self, key, value) 143 else: 144 raise AttributeError( 145 f"{key} is not a valid field of {self.__class__.__name__}" 146 )
A connection candidate for testing.
138 def update(self, **kwargs: Any) -> None: 139 """Update fields from keyword arguments.""" 140 for key, value in kwargs.items(): 141 if hasattr(self, key): 142 setattr(self, key, value) 143 else: 144 raise AttributeError( 145 f"{key} is not a valid field of {self.__class__.__name__}" 146 )
Update fields from keyword arguments.
299def retrieve_objects( 300 connection_objects: list[ConnectionObject], 301 retrieval_reason: str, 302 connection_id: str, 303) -> list[TestingCandidate]: 304 """Retrieve connection objects for a given connection ID. 305 306 This is a simplified version that only supports retrieval by connection_id. 307 For testing candidate discovery by docker image, see issue #91. 308 """ 309 connection_candidates = [TestingCandidate(connection_id=connection_id)] 310 311 secret_manager_client = get_secret_manager_client() 312 connection_pool = get_pool(secret_manager_client) 313 314 with connection_pool.connect() as db_conn: 315 for candidate in connection_candidates.copy(): 316 is_eu_result = db_conn.execute( 317 SELECT_ON_CONNECTION_DATAPLANE_GROUP_IS_EU, 318 parameters={"connection_id": candidate.connection_id}, 319 ).first() 320 if is_eu_result is None: 321 raise ConnectionNotFoundError( 322 f"Credentials were not found for connection ID {candidate.connection_id}." 323 ) 324 elif is_eu_result[0] is True: 325 connection_candidates.remove(candidate) 326 LOGGER.warning( 327 f"Credential retrieval not permitted; the data residency for " 328 f"connection ID {candidate.connection_id} is within the EU. " 329 f"Candidate will be removed from the list" 330 ) 331 continue 332 333 candidate.update( 334 **{ 335 connection_object.value.replace("-", "_"): retrieve_object( 336 candidate.connection_id, 337 connection_object, 338 retrieval_reason, 339 db_conn, 340 secret_manager_client, 341 ) 342 for connection_object in connection_objects 343 } 344 ) 345 346 return connection_candidates
Retrieve connection objects for a given connection ID.
This is a simplified version that only supports retrieval by connection_id. For testing candidate discovery by docker image, see issue #91.