airbyte_ops_mcp.prod_db_access
Prod DB Access module for querying Airbyte Cloud Prod DB Replica.
This module provides:
- sql.py: SQL query templates and schema documentation
- db_engine.py: Database connection and engine management
- queries.py: Query execution functions
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Prod DB Access module for querying Airbyte Cloud Prod DB Replica. 3 4This module provides: 5- sql.py: SQL query templates and schema documentation 6- db_engine.py: Database connection and engine management 7- queries.py: Query execution functions 8""" 9 10from airbyte_ops_mcp.prod_db_access.db_engine import get_pool 11from airbyte_ops_mcp.prod_db_access.sql import ( 12 SELECT_ACTORS_PINNED_TO_VERSION, 13 SELECT_CONNECTIONS_BY_CONNECTOR, 14 SELECT_CONNECTOR_VERSIONS, 15 SELECT_DATAPLANES_LIST, 16 SELECT_NEW_CONNECTOR_RELEASES, 17 SELECT_ORG_WORKSPACES, 18 SELECT_SUCCESSFUL_SYNCS_FOR_VERSION, 19 SELECT_SYNC_RESULTS_FOR_VERSION, 20 SELECT_WORKSPACE_INFO, 21) 22 23__all__ = [ 24 "SELECT_ACTORS_PINNED_TO_VERSION", 25 "SELECT_CONNECTIONS_BY_CONNECTOR", 26 "SELECT_CONNECTOR_VERSIONS", 27 "SELECT_DATAPLANES_LIST", 28 "SELECT_NEW_CONNECTOR_RELEASES", 29 "SELECT_ORG_WORKSPACES", 30 "SELECT_SUCCESSFUL_SYNCS_FOR_VERSION", 31 "SELECT_SYNC_RESULTS_FOR_VERSION", 32 "SELECT_WORKSPACE_INFO", 33 "get_pool", 34]
SELECT_ACTORS_PINNED_TO_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTIONS_BY_CONNECTOR =
<sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTOR_VERSIONS =
<sqlalchemy.sql.elements.TextClause object>
SELECT_DATAPLANES_LIST =
<sqlalchemy.sql.elements.TextClause object>
SELECT_NEW_CONNECTOR_RELEASES =
<sqlalchemy.sql.elements.TextClause object>
SELECT_ORG_WORKSPACES =
<sqlalchemy.sql.elements.TextClause object>
SELECT_SUCCESSFUL_SYNCS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_SYNC_RESULTS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_WORKSPACE_INFO =
<sqlalchemy.sql.elements.TextClause object>
def
get_pool( gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient) -> sqlalchemy.engine.base.Engine:
176def get_pool( 177 gsm_client: secretmanager.SecretManagerServiceClient, 178) -> sqlalchemy.Engine: 179 """Get a SQLAlchemy connection pool for the Airbyte Cloud database. 180 181 This function connects with the Cloud SQL Python Connector in public IP mode. 182 183 Args: 184 gsm_client: GCP Secret Manager client for retrieving credentials 185 186 Returns: 187 SQLAlchemy Engine connected to the Prod DB Replica 188 """ 189 pg_connection_details = json.loads( 190 _get_secret_value( 191 gsm_client, CONNECTION_RETRIEVER_PG_CONNECTION_DETAILS_SECRET_ID 192 ) 193 ) 194 195 return sqlalchemy.create_engine( 196 f"postgresql+{PG_DRIVER}://", 197 creator=get_database_creator(pg_connection_details), 198 connect_args={"timeout": DIRECT_CONNECTION_TIMEOUT}, 199 )
Get a SQLAlchemy connection pool for the Airbyte Cloud database.
This function connects with the Cloud SQL Python Connector in public IP mode.
Arguments:
- gsm_client: GCP Secret Manager client for retrieving credentials
Returns:
SQLAlchemy Engine connected to the Prod DB Replica