airbyte_ops_mcp.prod_db_access
Prod DB Access module for querying Airbyte Cloud Prod DB Replica.
This module provides:
- sql.py: SQL query templates and schema documentation
- db_engine.py: Database connection and engine management
- queries.py: Query execution functions
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""Prod DB Access module for querying Airbyte Cloud Prod DB Replica. 3 4This module provides: 5- sql.py: SQL query templates and schema documentation 6- db_engine.py: Database connection and engine management 7- queries.py: Query execution functions 8""" 9 10from airbyte_ops_mcp.prod_db_access.db_engine import get_pool 11from airbyte_ops_mcp.prod_db_access.queries import ( 12 query_versions_with_pins, 13) 14from airbyte_ops_mcp.prod_db_access.sql import ( 15 SELECT_ACTORS_PINNED_TO_VERSION, 16 SELECT_CONNECTIONS_BY_CONNECTOR, 17 SELECT_CONNECTOR_VERSIONS, 18 SELECT_DATAPLANES_LIST, 19 SELECT_DESTINATION_SUCCESSFUL_SYNCS_FOR_VERSION, 20 SELECT_DESTINATION_SYNC_RESULTS_FOR_VERSION, 21 SELECT_NEW_CONNECTOR_RELEASES, 22 SELECT_ORG_WORKSPACES, 23 SELECT_SOURCE_SUCCESSFUL_SYNCS_FOR_VERSION, 24 SELECT_SOURCE_SYNC_RESULTS_FOR_VERSION, 25 SELECT_VERSION_ID_BY_TAG, 26 SELECT_VERSION_INFO_BY_ID, 27 SELECT_VERSIONS_WITH_PINS, 28 SELECT_VERSIONS_WITH_PINS_BY_DEFINITION, 29 SELECT_WORKSPACE_INFO, 30) 31 32__all__ = [ 33 "SELECT_ACTORS_PINNED_TO_VERSION", 34 "SELECT_CONNECTIONS_BY_CONNECTOR", 35 "SELECT_CONNECTOR_VERSIONS", 36 "SELECT_DATAPLANES_LIST", 37 "SELECT_DESTINATION_SUCCESSFUL_SYNCS_FOR_VERSION", 38 "SELECT_DESTINATION_SYNC_RESULTS_FOR_VERSION", 39 "SELECT_NEW_CONNECTOR_RELEASES", 40 "SELECT_ORG_WORKSPACES", 41 "SELECT_SOURCE_SUCCESSFUL_SYNCS_FOR_VERSION", 42 "SELECT_SOURCE_SYNC_RESULTS_FOR_VERSION", 43 "SELECT_VERSIONS_WITH_PINS", 44 "SELECT_VERSIONS_WITH_PINS_BY_DEFINITION", 45 "SELECT_VERSION_ID_BY_TAG", 46 "SELECT_VERSION_INFO_BY_ID", 47 "SELECT_WORKSPACE_INFO", 48 "get_pool", 49 "query_versions_with_pins", 50]
SELECT_ACTORS_PINNED_TO_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTIONS_BY_CONNECTOR =
<sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTOR_VERSIONS =
<sqlalchemy.sql.elements.TextClause object>
SELECT_DATAPLANES_LIST =
<sqlalchemy.sql.elements.TextClause object>
SELECT_DESTINATION_SUCCESSFUL_SYNCS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_DESTINATION_SYNC_RESULTS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_NEW_CONNECTOR_RELEASES =
<sqlalchemy.sql.elements.TextClause object>
SELECT_ORG_WORKSPACES =
<sqlalchemy.sql.elements.TextClause object>
SELECT_SOURCE_SUCCESSFUL_SYNCS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_SOURCE_SYNC_RESULTS_FOR_VERSION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_VERSIONS_WITH_PINS =
<sqlalchemy.sql.elements.TextClause object>
SELECT_VERSIONS_WITH_PINS_BY_DEFINITION =
<sqlalchemy.sql.elements.TextClause object>
SELECT_VERSION_ID_BY_TAG =
<sqlalchemy.sql.elements.TextClause object>
SELECT_VERSION_INFO_BY_ID =
<sqlalchemy.sql.elements.TextClause object>
SELECT_WORKSPACE_INFO =
<sqlalchemy.sql.elements.TextClause object>
def
get_pool( gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient) -> sqlalchemy.engine.base.Engine:
176def get_pool( 177 gsm_client: secretmanager.SecretManagerServiceClient, 178) -> sqlalchemy.Engine: 179 """Get a SQLAlchemy connection pool for the Airbyte Cloud database. 180 181 This function connects with the Cloud SQL Python Connector in public IP mode. 182 183 Args: 184 gsm_client: GCP Secret Manager client for retrieving credentials 185 186 Returns: 187 SQLAlchemy Engine connected to the Prod DB Replica 188 """ 189 pg_connection_details = json.loads( 190 _get_secret_value( 191 gsm_client, CONNECTION_RETRIEVER_PG_CONNECTION_DETAILS_SECRET_ID 192 ) 193 ) 194 195 return sqlalchemy.create_engine( 196 f"postgresql+{PG_DRIVER}://", 197 creator=get_database_creator(pg_connection_details), 198 connect_args={"timeout": DIRECT_CONNECTION_TIMEOUT}, 199 )
Get a SQLAlchemy connection pool for the Airbyte Cloud database.
This function connects with the Cloud SQL Python Connector in public IP mode.
Arguments:
- gsm_client: GCP Secret Manager client for retrieving credentials
Returns:
SQLAlchemy Engine connected to the Prod DB Replica
def
query_versions_with_pins( actor_definition_id: str | None = None, *, gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient | None = None) -> list[dict[str, typing.Any]]:
1049def query_versions_with_pins( 1050 actor_definition_id: str | None = None, 1051 *, 1052 gsm_client: secretmanager.SecretManagerServiceClient | None = None, 1053) -> list[dict[str, Any]]: 1054 """Query connector versions that have at least one pin. 1055 1056 Does NOT join `connector_rollout`, so each version appears exactly once 1057 regardless of how many rollouts reference it. Includes per-scope pin 1058 breakdown (`actor_pins`, `workspace_pins`, `org_pins`). 1059 1060 Args: 1061 actor_definition_id: Optional connector definition UUID to filter results. 1062 If `None`, returns the global superset across all connectors. 1063 gsm_client: GCP Secret Manager client. If `None`, a new client will be instantiated. 1064 1065 Returns: 1066 List of version dicts ordered by `pin_count` DESC, `created_at` DESC. 1067 """ 1068 if actor_definition_id is not None: 1069 return _run_sql_query( 1070 SELECT_VERSIONS_WITH_PINS_BY_DEFINITION, 1071 parameters={"actor_definition_id": actor_definition_id}, 1072 query_name="SELECT_VERSIONS_WITH_PINS_BY_DEFINITION", 1073 gsm_client=gsm_client, 1074 ) 1075 return _run_sql_query( 1076 SELECT_VERSIONS_WITH_PINS, 1077 parameters=None, 1078 query_name="SELECT_VERSIONS_WITH_PINS", 1079 gsm_client=gsm_client, 1080 )
Query connector versions that have at least one pin.
Does NOT join connector_rollout, so each version appears exactly once
regardless of how many rollouts reference it. Includes per-scope pin
breakdown (actor_pins, workspace_pins, org_pins).
Arguments:
- actor_definition_id: Optional connector definition UUID to filter results.
If
None, returns the global superset across all connectors. - gsm_client: GCP Secret Manager client. If
None, a new client will be instantiated.
Returns:
List of version dicts ordered by
pin_countDESC,created_atDESC.