airbyte_ops_mcp.prod_db_access

Prod DB Access module for querying Airbyte Cloud Prod DB Replica.

This module provides:

  • sql.py: SQL query templates and schema documentation
  • db_engine.py: Database connection and engine management
  • queries.py: Query execution functions
 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2"""Prod DB Access module for querying Airbyte Cloud Prod DB Replica.
 3
 4This module provides:
 5- sql.py: SQL query templates and schema documentation
 6- db_engine.py: Database connection and engine management
 7- queries.py: Query execution functions
 8"""
 9
10from airbyte_ops_mcp.prod_db_access.db_engine import get_pool
11from airbyte_ops_mcp.prod_db_access.queries import (
12    query_versions_with_pins,
13)
14from airbyte_ops_mcp.prod_db_access.sql import (
15    SELECT_ACTORS_PINNED_TO_VERSION,
16    SELECT_CONNECTIONS_BY_CONNECTOR,
17    SELECT_CONNECTOR_VERSIONS,
18    SELECT_DATAPLANES_LIST,
19    SELECT_DESTINATION_SUCCESSFUL_SYNCS_FOR_VERSION,
20    SELECT_DESTINATION_SYNC_RESULTS_FOR_VERSION,
21    SELECT_NEW_CONNECTOR_RELEASES,
22    SELECT_ORG_WORKSPACES,
23    SELECT_SOURCE_SUCCESSFUL_SYNCS_FOR_VERSION,
24    SELECT_SOURCE_SYNC_RESULTS_FOR_VERSION,
25    SELECT_VERSION_ID_BY_TAG,
26    SELECT_VERSION_INFO_BY_ID,
27    SELECT_VERSIONS_WITH_PINS,
28    SELECT_VERSIONS_WITH_PINS_BY_DEFINITION,
29    SELECT_WORKSPACE_INFO,
30)
31
32__all__ = [
33    "SELECT_ACTORS_PINNED_TO_VERSION",
34    "SELECT_CONNECTIONS_BY_CONNECTOR",
35    "SELECT_CONNECTOR_VERSIONS",
36    "SELECT_DATAPLANES_LIST",
37    "SELECT_DESTINATION_SUCCESSFUL_SYNCS_FOR_VERSION",
38    "SELECT_DESTINATION_SYNC_RESULTS_FOR_VERSION",
39    "SELECT_NEW_CONNECTOR_RELEASES",
40    "SELECT_ORG_WORKSPACES",
41    "SELECT_SOURCE_SUCCESSFUL_SYNCS_FOR_VERSION",
42    "SELECT_SOURCE_SYNC_RESULTS_FOR_VERSION",
43    "SELECT_VERSIONS_WITH_PINS",
44    "SELECT_VERSIONS_WITH_PINS_BY_DEFINITION",
45    "SELECT_VERSION_ID_BY_TAG",
46    "SELECT_VERSION_INFO_BY_ID",
47    "SELECT_WORKSPACE_INFO",
48    "get_pool",
49    "query_versions_with_pins",
50]
SELECT_ACTORS_PINNED_TO_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTIONS_BY_CONNECTOR = <sqlalchemy.sql.elements.TextClause object>
SELECT_CONNECTOR_VERSIONS = <sqlalchemy.sql.elements.TextClause object>
SELECT_DATAPLANES_LIST = <sqlalchemy.sql.elements.TextClause object>
SELECT_DESTINATION_SUCCESSFUL_SYNCS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_DESTINATION_SYNC_RESULTS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_NEW_CONNECTOR_RELEASES = <sqlalchemy.sql.elements.TextClause object>
SELECT_ORG_WORKSPACES = <sqlalchemy.sql.elements.TextClause object>
SELECT_SOURCE_SUCCESSFUL_SYNCS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_SOURCE_SYNC_RESULTS_FOR_VERSION = <sqlalchemy.sql.elements.TextClause object>
SELECT_VERSIONS_WITH_PINS = <sqlalchemy.sql.elements.TextClause object>
SELECT_VERSIONS_WITH_PINS_BY_DEFINITION = <sqlalchemy.sql.elements.TextClause object>
SELECT_VERSION_ID_BY_TAG = <sqlalchemy.sql.elements.TextClause object>
SELECT_VERSION_INFO_BY_ID = <sqlalchemy.sql.elements.TextClause object>
SELECT_WORKSPACE_INFO = <sqlalchemy.sql.elements.TextClause object>
def get_pool( gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient) -> sqlalchemy.engine.base.Engine:
176def get_pool(
177    gsm_client: secretmanager.SecretManagerServiceClient,
178) -> sqlalchemy.Engine:
179    """Get a SQLAlchemy connection pool for the Airbyte Cloud database.
180
181    This function connects with the Cloud SQL Python Connector in public IP mode.
182
183    Args:
184        gsm_client: GCP Secret Manager client for retrieving credentials
185
186    Returns:
187        SQLAlchemy Engine connected to the Prod DB Replica
188    """
189    pg_connection_details = json.loads(
190        _get_secret_value(
191            gsm_client, CONNECTION_RETRIEVER_PG_CONNECTION_DETAILS_SECRET_ID
192        )
193    )
194
195    return sqlalchemy.create_engine(
196        f"postgresql+{PG_DRIVER}://",
197        creator=get_database_creator(pg_connection_details),
198        connect_args={"timeout": DIRECT_CONNECTION_TIMEOUT},
199    )

Get a SQLAlchemy connection pool for the Airbyte Cloud database.

This function connects with the Cloud SQL Python Connector in public IP mode.

Arguments:
  • gsm_client: GCP Secret Manager client for retrieving credentials
Returns:

SQLAlchemy Engine connected to the Prod DB Replica

def query_versions_with_pins( actor_definition_id: str | None = None, *, gsm_client: google.cloud.secretmanager_v1.services.secret_manager_service.client.SecretManagerServiceClient | None = None) -> list[dict[str, typing.Any]]:
1049def query_versions_with_pins(
1050    actor_definition_id: str | None = None,
1051    *,
1052    gsm_client: secretmanager.SecretManagerServiceClient | None = None,
1053) -> list[dict[str, Any]]:
1054    """Query connector versions that have at least one pin.
1055
1056    Does NOT join `connector_rollout`, so each version appears exactly once
1057    regardless of how many rollouts reference it.  Includes per-scope pin
1058    breakdown (`actor_pins`, `workspace_pins`, `org_pins`).
1059
1060    Args:
1061        actor_definition_id: Optional connector definition UUID to filter results.
1062            If `None`, returns the global superset across all connectors.
1063        gsm_client: GCP Secret Manager client. If `None`, a new client will be instantiated.
1064
1065    Returns:
1066        List of version dicts ordered by `pin_count` DESC, `created_at` DESC.
1067    """
1068    if actor_definition_id is not None:
1069        return _run_sql_query(
1070            SELECT_VERSIONS_WITH_PINS_BY_DEFINITION,
1071            parameters={"actor_definition_id": actor_definition_id},
1072            query_name="SELECT_VERSIONS_WITH_PINS_BY_DEFINITION",
1073            gsm_client=gsm_client,
1074        )
1075    return _run_sql_query(
1076        SELECT_VERSIONS_WITH_PINS,
1077        parameters=None,
1078        query_name="SELECT_VERSIONS_WITH_PINS",
1079        gsm_client=gsm_client,
1080    )

Query connector versions that have at least one pin.

Does NOT join connector_rollout, so each version appears exactly once regardless of how many rollouts reference it. Includes per-scope pin breakdown (actor_pins, workspace_pins, org_pins).

Arguments:
  • actor_definition_id: Optional connector definition UUID to filter results. If None, returns the global superset across all connectors.
  • gsm_client: GCP Secret Manager client. If None, a new client will be instantiated.
Returns:

List of version dicts ordered by pin_count DESC, created_at DESC.