airbyte_ops_mcp.prod_db_access

Prod DB Access module for querying Airbyte Cloud Prod DB Replica.

This module provides:

  • sql.py: SQL query templates and schema documentation
  • db_engine.py: Database connection and engine management
  • queries.py: Query execution functions
 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2"""Prod DB Access module for querying Airbyte Cloud Prod DB Replica.
 3
 4This module provides:
 5- sql.py: SQL query templates and schema documentation
 6- db_engine.py: Database connection and engine management
 7- queries.py: Query execution functions
 8"""
 9
10from airbyte_ops_mcp.prod_db_access.db_engine import get_pool
11from airbyte_ops_mcp.prod_db_access.sql import (
12    SELECT_ACTORS_PINNED_TO_VERSION,
13    SELECT_CONNECTIONS_BY_CONNECTOR,
14    SELECT_CONNECTOR_VERSIONS,
15    SELECT_DATAPLANES_LIST,
16    SELECT_NEW_CONNECTOR_RELEASES,
17    SELECT_ORG_WORKSPACES,
18    SELECT_SUCCESSFUL_SYNCS_FOR_VERSION,
19    SELECT_SYNC_RESULTS_FOR_VERSION,
20    SELECT_WORKSPACE_INFO,
21)
22
23__all__ = [
24    "SELECT_ACTORS_PINNED_TO_VERSION",
25    "SELECT_CONNECTIONS_BY_CONNECTOR",
26    "SELECT_CONNECTOR_VERSIONS",
27    "SELECT_DATAPLANES_LIST",
28    "SELECT_NEW_CONNECTOR_RELEASES",
29    "SELECT_ORG_WORKSPACES",
30    "SELECT_SUCCESSFUL_SYNCS_FOR_VERSION",
31    "SELECT_SYNC_RESULTS_FOR_VERSION",
32    "SELECT_WORKSPACE_INFO",
33    "get_pool",
34]
SELECT_ACTORS_PINNED_TO_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTIONS_BY_CONNECTOR = <sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTOR_VERSIONS = <sqlalchemy.sql.elements.TextClause object>
SELECT_DATAPLANES_LIST = <sqlalchemy.sql.elements.TextClause object>
SELECT_NEW_CONNECTOR_RELEASES = <sqlalchemy.sql.elements.TextClause object>
SELECT_ORG_WORKSPACES = <sqlalchemy.sql.elements.TextClause object>
SELECT_SUCCESSFUL_SYNCS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_SYNC_RESULTS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_WORKSPACE_INFO = <sqlalchemy.sql.elements.TextClause object>
def get_pool( gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient) -> sqlalchemy.engine.base.Engine:
203def get_pool(
204    gsm_client: secretmanager.SecretManagerServiceClient,
205) -> sqlalchemy.Engine:
206    """Get a SQLAlchemy connection pool for the Airbyte Cloud database.
207
208    This function supports two connection modes:
209    1. Direct connection via Cloud SQL Python Connector (default, requires VPC/Tailscale)
210    2. Connection via Cloud SQL Auth Proxy (when CI or USE_CLOUD_SQL_PROXY env var is set)
211
212    For proxy mode, start the proxy with:
213        airbyte-ops cloud db start-proxy
214
215    Environment variables:
216        CI: If set, uses proxy connection mode
217        USE_CLOUD_SQL_PROXY: If set, uses proxy connection mode
218        DB_PORT: Port for proxy connection (default: 15432)
219
220    Raises:
221        VpnNotConnectedError: If direct mode is used but no VPN/proxy is detected
222        CloudSqlProxyNotRunningError: If proxy mode is enabled but the proxy is not running
223
224    Args:
225        gsm_client: GCP Secret Manager client for retrieving credentials
226
227    Returns:
228        SQLAlchemy Engine connected to the Prod DB Replica
229    """
230    # Fail fast if no VPN or proxy is available
231    _check_vpn_or_proxy_available()
232
233    pg_connection_details = json.loads(
234        _get_secret_value(
235            gsm_client, CONNECTION_RETRIEVER_PG_CONNECTION_DETAILS_SECRET_ID
236        )
237    )
238
239    if os.getenv("CI") or os.getenv("USE_CLOUD_SQL_PROXY"):
240        # Connect via Cloud SQL Auth Proxy, running on localhost
241        # Port can be configured via DB_PORT env var (default: DEFAULT_CLOUD_SQL_PROXY_PORT)
242        host = "127.0.0.1"
243        port = int(os.getenv("DB_PORT", str(DEFAULT_CLOUD_SQL_PROXY_PORT)))
244
245        # Fail fast if proxy is not running
246        _check_proxy_is_running(host, port)
247
248        return sqlalchemy.create_engine(
249            f"postgresql+{PG_DRIVER}://{pg_connection_details['pg_user']}:{pg_connection_details['pg_password']}@{host}:{port}/{pg_connection_details['database_name']}",
250        )
251
252    # Default: Connect via Cloud SQL Python Connector (requires VPC/Tailscale access)
253    # Use a timeout to fail faster if the connection can't be established
254    return sqlalchemy.create_engine(
255        f"postgresql+{PG_DRIVER}://",
256        creator=get_database_creator(pg_connection_details),
257        connect_args={"timeout": DIRECT_CONNECTION_TIMEOUT},
258    )

Get a SQLAlchemy connection pool for the Airbyte Cloud database.

This function supports two connection modes:

  1. Direct connection via Cloud SQL Python Connector (default, requires VPC/Tailscale)
  2. Connection via Cloud SQL Auth Proxy (when CI or USE_CLOUD_SQL_PROXY env var is set)

For proxy mode, start the proxy with: airbyte-ops cloud db start-proxy

Environment variables:

CI: If set, uses proxy connection mode USE_CLOUD_SQL_PROXY: If set, uses proxy connection mode DB_PORT: Port for proxy connection (default: 15432)

Raises:
  • VpnNotConnectedError: If direct mode is used but no VPN/proxy is detected
  • CloudSqlProxyNotRunningError: If proxy mode is enabled but the proxy is not running
Arguments:
  • gsm_client: GCP Secret Manager client for retrieving credentials
Returns:

SQLAlchemy Engine connected to the Prod DB Replica