airbyte_ops_mcp.prod_db_access
Prod DB Access module for querying Airbyte Cloud Prod DB Replica.
This module provides:
- sql.py: SQL query templates and schema documentation
- db_engine.py: Database connection and engine management
- queries.py: Query execution functions
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Prod DB Access module for querying Airbyte Cloud Prod DB Replica. 3 4This module provides: 5- sql.py: SQL query templates and schema documentation 6- db_engine.py: Database connection and engine management 7- queries.py: Query execution functions 8""" 9 10from airbyte_ops_mcp.prod_db_access.db_engine import get_pool 11from airbyte_ops_mcp.prod_db_access.sql import ( 12 SELECT_ACTORS_PINNED_TO_VERSION, 13 SELECT_CONNECTIONS_BY_CONNECTOR, 14 SELECT_CONNECTOR_VERSIONS, 15 SELECT_DATAPLANES_LIST, 16 SELECT_NEW_CONNECTOR_RELEASES, 17 SELECT_ORG_WORKSPACES, 18 SELECT_SUCCESSFUL_SYNCS_FOR_VERSION, 19 SELECT_SYNC_RESULTS_FOR_VERSION, 20 SELECT_WORKSPACE_INFO, 21) 22 23__all__ = [ 24 "SELECT_ACTORS_PINNED_TO_VERSION", 25 "SELECT_CONNECTIONS_BY_CONNECTOR", 26 "SELECT_CONNECTOR_VERSIONS", 27 "SELECT_DATAPLANES_LIST", 28 "SELECT_NEW_CONNECTOR_RELEASES", 29 "SELECT_ORG_WORKSPACES", 30 "SELECT_SUCCESSFUL_SYNCS_FOR_VERSION", 31 "SELECT_SYNC_RESULTS_FOR_VERSION", 32 "SELECT_WORKSPACE_INFO", 33 "get_pool", 34]
SELECT_ACTORS_PINNED_TO_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTIONS_BY_CONNECTOR =
<sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTOR_VERSIONS =
<sqlalchemy.sql.elements.TextClause object>
SELECT_DATAPLANES_LIST =
<sqlalchemy.sql.elements.TextClause object>
SELECT_NEW_CONNECTOR_RELEASES =
<sqlalchemy.sql.elements.TextClause object>
SELECT_ORG_WORKSPACES =
<sqlalchemy.sql.elements.TextClause object>
SELECT_SUCCESSFUL_SYNCS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_SYNC_RESULTS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_WORKSPACE_INFO =
<sqlalchemy.sql.elements.TextClause object>
def
get_pool( gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient) -> sqlalchemy.engine.base.Engine:
203def get_pool( 204 gsm_client: secretmanager.SecretManagerServiceClient, 205) -> sqlalchemy.Engine: 206 """Get a SQLAlchemy connection pool for the Airbyte Cloud database. 207 208 This function supports two connection modes: 209 1. Direct connection via Cloud SQL Python Connector (default, requires VPC/Tailscale) 210 2. Connection via Cloud SQL Auth Proxy (when CI or USE_CLOUD_SQL_PROXY env var is set) 211 212 For proxy mode, start the proxy with: 213 airbyte-ops cloud db start-proxy 214 215 Environment variables: 216 CI: If set, uses proxy connection mode 217 USE_CLOUD_SQL_PROXY: If set, uses proxy connection mode 218 DB_PORT: Port for proxy connection (default: 15432) 219 220 Raises: 221 VpnNotConnectedError: If direct mode is used but no VPN/proxy is detected 222 CloudSqlProxyNotRunningError: If proxy mode is enabled but the proxy is not running 223 224 Args: 225 gsm_client: GCP Secret Manager client for retrieving credentials 226 227 Returns: 228 SQLAlchemy Engine connected to the Prod DB Replica 229 """ 230 # Fail fast if no VPN or proxy is available 231 _check_vpn_or_proxy_available() 232 233 pg_connection_details = json.loads( 234 _get_secret_value( 235 gsm_client, CONNECTION_RETRIEVER_PG_CONNECTION_DETAILS_SECRET_ID 236 ) 237 ) 238 239 if os.getenv("CI") or os.getenv("USE_CLOUD_SQL_PROXY"): 240 # Connect via Cloud SQL Auth Proxy, running on localhost 241 # Port can be configured via DB_PORT env var (default: DEFAULT_CLOUD_SQL_PROXY_PORT) 242 host = "127.0.0.1" 243 port = int(os.getenv("DB_PORT", str(DEFAULT_CLOUD_SQL_PROXY_PORT))) 244 245 # Fail fast if proxy is not running 246 _check_proxy_is_running(host, port) 247 248 return sqlalchemy.create_engine( 249 f"postgresql+{PG_DRIVER}://{pg_connection_details['pg_user']}:{pg_connection_details['pg_password']}@{host}:{port}/{pg_connection_details['database_name']}", 250 ) 251 252 # Default: Connect via Cloud SQL Python Connector (requires VPC/Tailscale access) 253 # Use a timeout to fail faster if the connection can't be established 254 return sqlalchemy.create_engine( 255 f"postgresql+{PG_DRIVER}://", 256 creator=get_database_creator(pg_connection_details), 257 connect_args={"timeout": DIRECT_CONNECTION_TIMEOUT}, 258 )
Get a SQLAlchemy connection pool for the Airbyte Cloud database.
This function supports two connection modes:
- Direct connection via Cloud SQL Python Connector (default, requires VPC/Tailscale)
- Connection via Cloud SQL Auth Proxy (when CI or USE_CLOUD_SQL_PROXY env var is set)
For proxy mode, start the proxy with: airbyte-ops cloud db start-proxy
Environment variables:
CI: If set, uses proxy connection mode USE_CLOUD_SQL_PROXY: If set, uses proxy connection mode DB_PORT: Port for proxy connection (default: 15432)
Raises:
- VpnNotConnectedError: If direct mode is used but no VPN/proxy is detected
- CloudSqlProxyNotRunningError: If proxy mode is enabled but the proxy is not running
Arguments:
- gsm_client: GCP Secret Manager client for retrieving credentials
Returns:
SQLAlchemy Engine connected to the Prod DB Replica