airbyte_ops_mcp.prod_db_access

Prod DB Access module for querying Airbyte Cloud Prod DB Replica.

This module provides:

  • sql.py: SQL query templates and schema documentation
  • db_engine.py: Database connection and engine management
  • queries.py: Query execution functions
 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2"""Prod DB Access module for querying Airbyte Cloud Prod DB Replica.
 3
 4This module provides:
 5- sql.py: SQL query templates and schema documentation
 6- db_engine.py: Database connection and engine management
 7- queries.py: Query execution functions
 8"""
 9
10from airbyte_ops_mcp.prod_db_access.db_engine import get_pool
11from airbyte_ops_mcp.prod_db_access.sql import (
12    SELECT_ACTORS_PINNED_TO_VERSION,
13    SELECT_CONNECTIONS_BY_CONNECTOR,
14    SELECT_CONNECTOR_VERSIONS,
15    SELECT_DATAPLANES_LIST,
16    SELECT_NEW_CONNECTOR_RELEASES,
17    SELECT_ORG_WORKSPACES,
18    SELECT_SUCCESSFUL_SYNCS_FOR_VERSION,
19    SELECT_SYNC_RESULTS_FOR_VERSION,
20    SELECT_WORKSPACE_INFO,
21)
22
23__all__ = [
24    "SELECT_ACTORS_PINNED_TO_VERSION",
25    "SELECT_CONNECTIONS_BY_CONNECTOR",
26    "SELECT_CONNECTOR_VERSIONS",
27    "SELECT_DATAPLANES_LIST",
28    "SELECT_NEW_CONNECTOR_RELEASES",
29    "SELECT_ORG_WORKSPACES",
30    "SELECT_SUCCESSFUL_SYNCS_FOR_VERSION",
31    "SELECT_SYNC_RESULTS_FOR_VERSION",
32    "SELECT_WORKSPACE_INFO",
33    "get_pool",
34]
SELECT_ACTORS_PINNED_TO_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTIONS_BY_CONNECTOR = <sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTOR_VERSIONS = <sqlalchemy.sql.elements.TextClause object>
SELECT_DATAPLANES_LIST = <sqlalchemy.sql.elements.TextClause object>
SELECT_NEW_CONNECTOR_RELEASES = <sqlalchemy.sql.elements.TextClause object>
SELECT_ORG_WORKSPACES = <sqlalchemy.sql.elements.TextClause object>
SELECT_SUCCESSFUL_SYNCS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_SYNC_RESULTS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_WORKSPACE_INFO = <sqlalchemy.sql.elements.TextClause object>
def get_pool( gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient) -> sqlalchemy.engine.base.Engine:
176def get_pool(
177    gsm_client: secretmanager.SecretManagerServiceClient,
178) -> sqlalchemy.Engine:
179    """Get a SQLAlchemy connection pool for the Airbyte Cloud database.
180
181    This function connects with the Cloud SQL Python Connector in public IP mode.
182
183    Args:
184        gsm_client: GCP Secret Manager client for retrieving credentials
185
186    Returns:
187        SQLAlchemy Engine connected to the Prod DB Replica
188    """
189    pg_connection_details = json.loads(
190        _get_secret_value(
191            gsm_client, CONNECTION_RETRIEVER_PG_CONNECTION_DETAILS_SECRET_ID
192        )
193    )
194
195    return sqlalchemy.create_engine(
196        f"postgresql+{PG_DRIVER}://",
197        creator=get_database_creator(pg_connection_details),
198        connect_args={"timeout": DIRECT_CONNECTION_TIMEOUT},
199    )

Get a SQLAlchemy connection pool for the Airbyte Cloud database.

This function connects with the Cloud SQL Python Connector in public IP mode.

Arguments:
  • gsm_client: GCP Secret Manager client for retrieving credentials
Returns:

SQLAlchemy Engine connected to the Prod DB Replica