airbyte_ops_mcp.mcp.prod_db_queries

MCP tools for querying the Airbyte Cloud Prod DB Replica.

This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.

MCP reference

MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 14 tool(s), 0 prompt(s), 0 resource(s).

Tools (14)

query_prod_actors_by_pinned_connector_version

Hints: read-only · idempotent

List actors (sources/destinations) effectively pinned to a specific connector version.

Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find pinned instances for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find pinned instances for",
      "type": "string"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connection_sync_activity

Hints: read-only · idempotent · open-world

List recent sync jobs and attempts from the Prod DB Replica.

Returns one row per (job, attempt) pair for sync jobs whose updated_at falls in [start_at, end_at), scoped to the provided organization, workspace, or connection IDs. Designed for live operational lookups — e.g. "what happened on this connection in the last hour" — not for historical analysis.

Each row is enriched with customer_tier and is_eu for the owning organization. Tier filtering is intentionally not applied — this is a read-only observability query.

Input requirements:

  • At least one of organization_id, workspace_id, or connection_ids must be provided (any combination is accepted).
  • start_at and end_at must be timezone-aware and start_at < end_at.

Key fields in each row:

  • job_id, attempt_id, attempt_number
  • job_status, attempt_status
  • job_started_at, job_updated_at, attempt_ended_at
  • failure_summary (JSON; populated when an attempt failed)
  • connection_id, connection_name, connection_status
  • source_actor_id, source_actor_name, source_actor_definition_id
  • destination_actor_id, destination_actor_name, destination_actor_definition_id
  • workspace_id, workspace_name, organization_id
  • dataplane_group_id, dataplane_name
  • customer_tier, is_eu (added by tier enrichment)

Parameters:

Name Type Required Default Description
start_at string yes Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or Z).
end_at string yes Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after start_at.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @airbyte-internal as an alias for the Airbyte internal org.
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") | null no null Optional workspace UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @devin-ai-sandbox as an alias for the Devin AI sandbox workspace.
connection_ids array<string> | null no null Optional list of connection UUIDs. At least one of organization_id, workspace_id, or connection_ids is required.
status_filter enum("all", "succeeded", "failed") no "all" Filter by job status: all (default), succeeded, or failed. Applied to jobs.status in the Prod DB Replica.
limit integer no 1000 Maximum number of attempt rows to return.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "start_at": {
      "description": "Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or `Z`).",
      "format": "date-time",
      "type": "string"
    },
    "end_at": {
      "description": "Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after `start_at`.",
      "format": "date-time",
      "type": "string"
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@airbyte-internal` as an alias for the Airbyte internal org."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional workspace UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@devin-ai-sandbox` as an alias for the Devin AI sandbox workspace."
    },
    "connection_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional list of connection UUIDs. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required."
    },
    "status_filter": {
      "description": "Filter by job status: `all` (default), `succeeded`, or `failed`. Applied to `jobs.status` in the Prod DB Replica.",
      "enum": [
        "all",
        "succeeded",
        "failed"
      ],
      "type": "string",
      "default": "all"
    },
    "limit": {
      "default": 1000,
      "description": "Maximum number of attempt rows to return.",
      "type": "integer"
    }
  },
  "required": [
    "start_at",
    "end_at"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_connector

Hints: read-only · idempotent · open-world

Search for all connections using a specific source or destination connector type.

This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.

Set exclude_pinned=True to filter out connections that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 1000 Maximum number of results (default: 1000)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 1000,
      "description": "Maximum number of results (default: 1000)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_stream

Hints: read-only · idempotent · open-world

Find connections that have a specific stream enabled in their catalog.

This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use cases:

  • Finding connections with a specific stream enabled for regression testing
  • Validating connector fixes that affect particular streams
  • Identifying which customers use rarely-enabled streams

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.

Parameters:

Name Type Required Default Description
stream_name string yes Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "stream_name": {
      "description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
      "type": "string"
    },
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_connection_stats

Hints: read-only · idempotent · open-world

Get aggregate connection stats for multiple connectors.

Returns counts of connections grouped by pinned version for each connector, including:

  • Total, enabled, and active connection counts
  • Pinned vs unpinned breakdown
  • Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)

This tool is designed for release monitoring workflows. It allows you to:

  1. Query recently released connectors to identify which ones to monitor
  2. Get aggregate stats showing how many connections are using each version
  3. See health metrics (pass/fail) broken down by version

The lookback_days parameter controls the lookback window for:

  • Counting 'active' connections (those with recent sync activity)
  • Determining 'latest attempt status' (most recent attempt within the window)

Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.

Parameters:

Name Type Required Default Description
source_definition_ids array<string> | null no null List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']
destination_definition_ids array<string> | null no null List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']
lookback_days integer no 7 Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
    },
    "destination_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Response containing connection stats for multiple connectors.",
  "properties": {
    "sources": {
      "description": "Stats for source connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "destinations": {
      "description": "Stats for destination connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "lookback_days": {
      "description": "Lookback window used for 'active' connections",
      "type": "integer"
    },
    "generated_at": {
      "description": "When this response was generated",
      "format": "date-time",
      "type": "string"
    }
  },
  "required": [
    "lookback_days",
    "generated_at"
  ],
  "type": "object"
}

query_prod_connector_rollouts

Hints: read-only · idempotent

Query connector rollouts with flexible filtering.

Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.

Filter behavior:

  • rollout_id: Returns that specific rollout (ignores other filters)
  • active_only: Returns only active (non-terminal) rollouts
  • actor_definition_id: Returns rollouts for that specific connector
  • No filters: Returns all active rollouts (same as active_only=True)

Parameters:

Name Type Required Default Description
actor_definition_id string | null no null Connector definition UUID to filter by (optional)
rollout_id string | null no null Specific rollout UUID to look up (optional)
active_only boolean no false If true, only return active (non-terminal) rollouts
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "actor_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector definition UUID to filter by (optional)"
    },
    "rollout_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Specific rollout UUID to look up (optional)"
    },
    "active_only": {
      "default": false,
      "description": "If true, only return active (non-terminal) rollouts",
      "type": "boolean"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a connector rollout.",
        "properties": {
          "rollout_id": {
            "description": "The rollout UUID",
            "type": "string"
          },
          "actor_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "state": {
            "description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
            "type": "string"
          },
          "initial_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Initial rollout percentage"
          },
          "current_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Current target rollout percentage"
          },
          "final_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Final target rollout percentage"
          },
          "has_breaking_changes": {
            "description": "Whether the RC has breaking changes",
            "type": "boolean"
          },
          "max_step_wait_time_mins": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Maximum wait time between rollout steps in minutes"
          },
          "rollout_strategy": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Rollout strategy: manual, automated, overridden"
          },
          "updated_by_user_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "User ID recorded as last updating the rollout"
          },
          "updated_by_user_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Name recorded as last updating the rollout"
          },
          "updated_by_user_email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Email recorded as last updating the rollout"
          },
          "workflow_run_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Temporal workflow run ID"
          },
          "error_msg": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Error message if errored"
          },
          "failed_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for failure if failed"
          },
          "paused_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for pause if paused"
          },
          "tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Optional tag for the rollout"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was created"
          },
          "updated_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was last updated"
          },
          "completed_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout completed (if terminal)"
          },
          "expires_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout expires"
          },
          "rc_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the release candidate"
          },
          "rc_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the release candidate"
          },
          "initial_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the initial version"
          },
          "initial_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the initial version"
          },
          "filters": {
            "anyOf": [
              {
                "additionalProperties": true,
                "type": "object"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
          }
        },
        "required": [
          "rollout_id",
          "actor_definition_id",
          "state",
          "has_breaking_changes"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_versions

Hints: read-only · idempotent

List all versions for a connector definition.

Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.

Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date

Parameters:

Name Type Required Default Description
connector_definition_id string yes Connector definition UUID to list versions for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_definition_id": {
      "description": "Connector definition UUID to list versions for",
      "type": "string"
    }
  },
  "required": [
    "connector_definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_dataplanes

Hints: read-only · idempotent

List all dataplane groups with workspace counts.

Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).

Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count

Parameters:

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_failed_sync_attempts_for_connector

Hints: read-only · idempotent · open-world

List failed sync attempts for ALL actors using a source connector type.

This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.

Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.

Key fields in results:

  • failure_summary: JSON containing failure details including failureType and messages
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_new_connector_releases

Hints: read-only · idempotent

List recently published connector versions.

Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).

Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at

Parameters:

Name Type Required Default Description
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_connector

Hints: read-only · idempotent · open-world

List recent sync jobs for ALL actors using a connector type.

This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use this tool to:

  • Find healthy connections with recent successful syncs (status_filter='succeeded')
  • Investigate connector issues across all users (status_filter='failed')
  • Get an overview of all recent sync activity (status_filter='all')

Set exclude_pinned=True to filter out syncs for actors that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.

Key fields in results:

  • job_status: 'succeeded', 'failed', 'cancelled', etc.
  • connection_id, connection_name: The connection that ran the sync
  • actor_id, actor_name: The source or destination actor
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'.
status_filter enum("all", "succeeded", "failed") no "all" Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
    },
    "status_filter": {
      "description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
      "enum": [
        "all",
        "succeeded",
        "failed"
      ],
      "type": "string",
      "default": "all"
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_version_pinned_connector

Hints: read-only · idempotent

List sync job results for actors PINNED to a specific connector version.

IMPORTANT: This tool ONLY returns results for actors that are effectively pinned to the specified version via scoped_configuration at any scope level (actor, workspace, or organization). Most connections run unpinned and will NOT appear in these results.

Use this tool when you want to monitor rollout health for actors that have been pinned to a pre-release or specific version. For finding healthy connections across ALL actors using a connector type (regardless of pinning), use query_prod_recent_syncs_for_connector instead.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: job_id, connection_id, job_status, started_at, job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find sync results for
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
successful_only boolean no false If True, only return successful syncs (default: False)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find sync results for",
      "type": "string"
    },
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "successful_only": {
      "default": false,
      "description": "If True, only return successful syncs (default: False)",
      "type": "boolean"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspace_info

Hints: read-only · idempotent

Get workspace information including dataplane group.

Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.

Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    }
  },
  "required": [
    "workspace_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspaces_by_email_domain

Hints: read-only · idempotent

Find workspaces by email domain.

This tool searches for workspaces where users have email addresses matching the specified domain. This is useful for identifying workspaces belonging to specific companies - for example, searching for "motherduck.com" will find workspaces belonging to MotherDuck employees.

Use cases:

  • Finding partner organization connections for testing connector fixes
  • Identifying internal test accounts for specific integrations
  • Locating workspaces belonging to technology partners

The returned organization IDs can be used with other tools like query_prod_connections_by_connector to find connections within those organizations for safe testing.

Parameters:

Name Type Required Default Description
email_domain string yes Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.
limit integer no 100 Maximum number of workspaces to return (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "email_domain": {
      "description": "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.",
      "type": "string"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of workspaces to return (default: 100)",
      "type": "integer"
    }
  },
  "required": [
    "email_domain"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of looking up workspaces by email domain.",
  "properties": {
    "email_domain": {
      "description": "The email domain that was searched for (e.g., 'motherduck.com')",
      "type": "string"
    },
    "total_workspaces_found": {
      "description": "Total number of workspaces matching the email domain",
      "type": "integer"
    },
    "unique_organization_ids": {
      "description": "List of unique organization IDs found",
      "items": {
        "type": "string"
      },
      "type": "array"
    },
    "workspaces": {
      "description": "List of workspaces matching the email domain",
      "items": {
        "description": "Information about a workspace found by email domain search.",
        "properties": {
          "organization_id": {
            "description": "The organization UUID",
            "type": "string"
          },
          "workspace_id": {
            "description": "The workspace UUID",
            "type": "string"
          },
          "workspace_name": {
            "description": "The name of the workspace",
            "type": "string"
          },
          "slug": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The workspace slug (URL-friendly identifier)"
          },
          "email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The email address associated with the workspace"
          },
          "dataplane_group_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The dataplane group UUID (region)"
          },
          "dataplane_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The name of the dataplane (e.g., 'US', 'EU')"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the workspace was created"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
          },
          "is_eu": {
            "anyOf": [
              {
                "type": "boolean"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Whether the workspace is in the EU region (derived from dataplane_name)."
          }
        },
        "required": [
          "organization_id",
          "workspace_id",
          "workspace_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "email_domain",
    "total_workspaces_found",
    "unique_organization_ids",
    "workspaces"
  ],
  "type": "object"
}

   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""MCP tools for querying the Airbyte Cloud Prod DB Replica.
   3
   4This module provides MCP tools that wrap the query functions from
   5airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
   6
   7## MCP reference
   8
   9.. include:: ../../../docs/mcp-generated/prod_db_queries.md
  10    :start-line: 2
  11"""
  12
  13from __future__ import annotations
  14
  15__all__: list[str] = []
  16
  17import json
  18from datetime import datetime, timezone
  19from enum import StrEnum
  20from typing import Annotated, Any
  21
  22from airbyte.exceptions import PyAirbyteInputError
  23from fastmcp import FastMCP
  24from fastmcp_extensions import mcp_tool, register_mcp_tools
  25from pydantic import BaseModel, Field
  26
  27from airbyte_ops_mcp.cloud_admin.registry_lookup import (
  28    resolve_canonical_name_to_definition_id,
  29)
  30from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum
  31from airbyte_ops_mcp.prod_db_access.queries import (
  32    query_actors_pinned_to_version,
  33    query_connection_sync_activity_from_prod,
  34    query_connections_by_connector,
  35    query_connections_by_destination_connector,
  36    query_connections_by_stream,
  37    query_connector_rollouts,
  38    query_connector_versions,
  39    query_dataplanes_list,
  40    query_destination_connection_stats,
  41    query_failed_sync_attempts_for_connector,
  42    query_new_connector_releases,
  43    query_recent_syncs_for_connector,
  44    query_source_connection_stats,
  45    query_syncs_for_version_pinned_connector,
  46    query_workspace_info,
  47    query_workspaces_by_email_domain,
  48)
  49from airbyte_ops_mcp.tier_cache import (
  50    TierFilter,
  51    enrich_rows_by_org,
  52    filter_rows_by_tier,
  53    get_org_tiers,
  54)
  55
  56
  57class StatusFilter(StrEnum):
  58    """Filter for job status in sync queries."""
  59
  60    ALL = "all"
  61    SUCCEEDED = "succeeded"
  62    FAILED = "failed"
  63
  64
  65# Cloud UI base URL for building connection URLs
  66CLOUD_UI_BASE_URL = "https://cloud.airbyte.com"
  67
  68
  69def _validate_sync_activity_scope(
  70    *,
  71    organization_id: str | None,
  72    workspace_id: str | None,
  73    connection_ids: list[str] | None,
  74) -> None:
  75    """Require at least one explicit scope filter for sync activity queries."""
  76    if organization_id or workspace_id or connection_ids:
  77        return
  78    raise PyAirbyteInputError(
  79        message=(
  80            "Provide at least one scope filter: `organization_id`, `workspace_id`, "
  81            "or `connection_ids`."
  82        ),
  83        context={
  84            "organization_id": organization_id,
  85            "workspace_id": workspace_id,
  86            "connection_ids": connection_ids,
  87        },
  88    )
  89
  90
  91def _validate_sync_activity_window(
  92    *,
  93    start_at: datetime,
  94    end_at: datetime,
  95) -> tuple[datetime, datetime]:
  96    """Validate that `start_at` and `end_at` describe a usable window.
  97
  98    Returns the timestamps normalized to UTC. Raises `PyAirbyteInputError` for
  99    naive timestamps or inverted ranges. No clock-relative caps are enforced
 100    here; the caller is trusted to choose a sensible window.
 101    """
 102    if start_at.tzinfo is None or end_at.tzinfo is None:
 103        raise PyAirbyteInputError(
 104            message="`start_at` and `end_at` must include timezone information.",
 105            context={
 106                "start_at": start_at.isoformat(),
 107                "end_at": end_at.isoformat(),
 108            },
 109        )
 110
 111    normalized_start = start_at.astimezone(timezone.utc)
 112    normalized_end = end_at.astimezone(timezone.utc)
 113
 114    if normalized_start >= normalized_end:
 115        raise PyAirbyteInputError(
 116            message="`start_at` must be earlier than `end_at`.",
 117            context={
 118                "start_at": normalized_start.isoformat(),
 119                "end_at": normalized_end.isoformat(),
 120            },
 121        )
 122    return normalized_start, normalized_end
 123
 124
 125# =============================================================================
 126# Pydantic Models for MCP Tool Responses
 127# =============================================================================
 128
 129
 130class WorkspaceInfo(BaseModel):
 131    """Information about a workspace found by email domain search."""
 132
 133    organization_id: str = Field(description="The organization UUID")
 134    workspace_id: str = Field(description="The workspace UUID")
 135    workspace_name: str = Field(description="The name of the workspace")
 136    slug: str | None = Field(
 137        default=None, description="The workspace slug (URL-friendly identifier)"
 138    )
 139    email: str | None = Field(
 140        default=None, description="The email address associated with the workspace"
 141    )
 142    dataplane_group_id: str | None = Field(
 143        default=None, description="The dataplane group UUID (region)"
 144    )
 145    dataplane_name: str | None = Field(
 146        default=None, description="The name of the dataplane (e.g., 'US', 'EU')"
 147    )
 148    created_at: datetime | None = Field(
 149        default=None, description="When the workspace was created"
 150    )
 151    customer_tier: str | None = Field(
 152        default=None,
 153        description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.",
 154    )
 155    is_eu: bool | None = Field(
 156        default=None,
 157        description="Whether the workspace is in the EU region (derived from dataplane_name).",
 158    )
 159
 160
 161class WorkspacesByEmailDomainResult(BaseModel):
 162    """Result of looking up workspaces by email domain."""
 163
 164    email_domain: str = Field(
 165        description="The email domain that was searched for (e.g., 'motherduck.com')"
 166    )
 167    total_workspaces_found: int = Field(
 168        description="Total number of workspaces matching the email domain"
 169    )
 170    unique_organization_ids: list[str] = Field(
 171        description="List of unique organization IDs found"
 172    )
 173    workspaces: list[WorkspaceInfo] = Field(
 174        description="List of workspaces matching the email domain"
 175    )
 176
 177
 178class LatestAttemptBreakdown(BaseModel):
 179    """Breakdown of connections by latest attempt status."""
 180
 181    succeeded: int = Field(
 182        default=0, description="Connections where latest attempt succeeded"
 183    )
 184    failed: int = Field(
 185        default=0, description="Connections where latest attempt failed"
 186    )
 187    cancelled: int = Field(
 188        default=0, description="Connections where latest attempt was cancelled"
 189    )
 190    running: int = Field(
 191        default=0, description="Connections where latest attempt is still running"
 192    )
 193    unknown: int = Field(
 194        default=0,
 195        description="Connections with no recent attempts in the lookback window",
 196    )
 197
 198
 199class VersionPinStats(BaseModel):
 200    """Stats for connections pinned to a specific version."""
 201
 202    pinned_version_id: str | None = Field(
 203        description="The connector version UUID (None for unpinned connections)"
 204    )
 205    docker_image_tag: str | None = Field(
 206        default=None, description="The docker image tag for this version"
 207    )
 208    total_connections: int = Field(description="Total number of connections")
 209    enabled_connections: int = Field(
 210        description="Number of enabled (active status) connections"
 211    )
 212    active_connections: int = Field(
 213        description="Number of connections with recent sync activity"
 214    )
 215    latest_attempt: LatestAttemptBreakdown = Field(
 216        description="Breakdown by latest attempt status"
 217    )
 218
 219
 220class ConnectorConnectionStats(BaseModel):
 221    """Aggregate connection stats for a connector."""
 222
 223    connector_definition_id: str = Field(description="The connector definition UUID")
 224    connector_type: str = Field(description="'source' or 'destination'")
 225    canonical_name: str | None = Field(
 226        default=None, description="The canonical connector name if resolved"
 227    )
 228    total_connections: int = Field(
 229        description="Total number of non-deprecated connections"
 230    )
 231    enabled_connections: int = Field(
 232        description="Number of enabled (active status) connections"
 233    )
 234    active_connections: int = Field(
 235        description="Number of connections with recent sync activity"
 236    )
 237    pinned_connections: int = Field(
 238        description="Number of connections with explicit version pins"
 239    )
 240    unpinned_connections: int = Field(
 241        description="Number of connections on default version"
 242    )
 243    latest_attempt: LatestAttemptBreakdown = Field(
 244        description="Overall breakdown by latest attempt status"
 245    )
 246    by_version: list[VersionPinStats] = Field(
 247        description="Stats broken down by pinned version"
 248    )
 249
 250
 251class ConnectorConnectionStatsResponse(BaseModel):
 252    """Response containing connection stats for multiple connectors."""
 253
 254    sources: list[ConnectorConnectionStats] = Field(
 255        default_factory=list, description="Stats for source connectors"
 256    )
 257    destinations: list[ConnectorConnectionStats] = Field(
 258        default_factory=list, description="Stats for destination connectors"
 259    )
 260    lookback_days: int = Field(
 261        description="Lookback window used for 'active' connections"
 262    )
 263    generated_at: datetime = Field(description="When this response was generated")
 264
 265
 266def _opt_str(value: Any) -> str | None:
 267    """Convert a nullable value to str, returning None if the value is None/falsy."""
 268    return str(value) if value else None
 269
 270
 271@mcp_tool(
 272    read_only=True,
 273    idempotent=True,
 274)
 275def query_prod_dataplanes() -> list[dict[str, Any]]:
 276    """List all dataplane groups with workspace counts.
 277
 278    Returns information about all active dataplane groups in Airbyte Cloud,
 279    including the number of workspaces in each. Useful for understanding
 280    the distribution of workspaces across regions (US, US-Central, EU).
 281
 282    Returns list of dicts with keys: dataplane_group_id, dataplane_name,
 283    organization_id, enabled, tombstone, created_at, workspace_count
 284    """
 285    return query_dataplanes_list()
 286
 287
 288@mcp_tool(
 289    read_only=True,
 290    idempotent=True,
 291)
 292def query_prod_workspace_info(
 293    workspace_id: Annotated[
 294        str | WorkspaceAliasEnum,
 295        Field(
 296            description="Workspace UUID or alias to look up. "
 297            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 298        ),
 299    ],
 300) -> dict[str, Any] | None:
 301    """Get workspace information including dataplane group.
 302
 303    Returns details about a specific workspace, including which dataplane
 304    (region) it belongs to. Useful for determining if a workspace is in
 305    the EU region for filtering purposes.
 306
 307    Returns dict with keys: workspace_id, workspace_name, slug, organization_id,
 308    dataplane_group_id, dataplane_name, created_at, tombstone
 309    Or None if workspace not found.
 310    """
 311    # Resolve workspace ID alias (workspace_id is required, so resolved value is never None)
 312    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 313    assert resolved_workspace_id is not None  # Type narrowing: workspace_id is required
 314
 315    return query_workspace_info(resolved_workspace_id)
 316
 317
 318@mcp_tool(
 319    read_only=True,
 320    idempotent=True,
 321)
 322def query_prod_connector_versions(
 323    connector_definition_id: Annotated[
 324        str,
 325        Field(description="Connector definition UUID to list versions for"),
 326    ],
 327) -> list[dict[str, Any]]:
 328    """List all versions for a connector definition.
 329
 330    Returns all published versions of a connector, ordered by last_published
 331    date descending. Useful for understanding version history and finding
 332    specific version IDs for pinning or rollout monitoring.
 333
 334    Returns list of dicts with keys: version_id, docker_image_tag, docker_repository,
 335    release_stage, support_level, cdk_version, language, last_published, release_date
 336    """
 337    return query_connector_versions(connector_definition_id)
 338
 339
 340@mcp_tool(
 341    read_only=True,
 342    idempotent=True,
 343)
 344def query_prod_new_connector_releases(
 345    days: Annotated[
 346        int,
 347        Field(description="Number of days to look back (default: 7)", default=7),
 348    ] = 7,
 349    limit: Annotated[
 350        int,
 351        Field(description="Maximum number of results (default: 100)", default=100),
 352    ] = 100,
 353) -> list[dict[str, Any]]:
 354    """List recently published connector versions.
 355
 356    Returns connector versions published within the specified number of days.
 357    Uses last_published timestamp which reflects when the version was actually
 358    deployed to the registry (not the changelog date).
 359
 360    Returns list of dicts with keys: version_id, connector_definition_id, docker_repository,
 361    docker_image_tag, last_published, release_date, release_stage, support_level,
 362    cdk_version, language, created_at
 363    """
 364    return query_new_connector_releases(days=days, limit=limit)
 365
 366
 367@mcp_tool(
 368    read_only=True,
 369    idempotent=True,
 370)
 371def query_prod_actors_by_pinned_connector_version(
 372    connector_version_id: Annotated[
 373        str,
 374        Field(description="Connector version UUID to find pinned instances for"),
 375    ],
 376) -> list[dict[str, Any]]:
 377    """List actors (sources/destinations) effectively pinned to a specific connector version.
 378
 379    Returns all actors that are effectively pinned to a specific connector version,
 380    considering all scope levels: actor-level pins, workspace-level pins, and
 381    organization-level pins (with actor > workspace > organization precedence).
 382    Useful for monitoring rollouts and understanding which customers are affected.
 383
 384    The actor_id field is the actor ID (superset of source_id/destination_id).
 385
 386    Returns list of dicts with keys: actor_id, connector_definition_id, origin_type,
 387    origin, description, created_at, expires_at, pin_scope_type, actor_name,
 388    workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
 389
 390    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 391    level the effective pin came from.
 392    """
 393    return query_actors_pinned_to_version(connector_version_id)
 394
 395
 396@mcp_tool(
 397    read_only=True,
 398    idempotent=True,
 399)
 400def query_prod_recent_syncs_for_version_pinned_connector(
 401    connector_version_id: Annotated[
 402        str,
 403        Field(description="Connector version UUID to find sync results for"),
 404    ],
 405    days: Annotated[
 406        int,
 407        Field(description="Number of days to look back (default: 7)", default=7),
 408    ] = 7,
 409    limit: Annotated[
 410        int,
 411        Field(description="Maximum number of results (default: 100)", default=100),
 412    ] = 100,
 413    successful_only: Annotated[
 414        bool,
 415        Field(
 416            description="If True, only return successful syncs (default: False)",
 417            default=False,
 418        ),
 419    ] = False,
 420) -> list[dict[str, Any]]:
 421    """List sync job results for actors PINNED to a specific connector version.
 422
 423    IMPORTANT: This tool ONLY returns results for actors that are effectively
 424    pinned to the specified version via scoped_configuration at any scope level
 425    (actor, workspace, or organization). Most connections run unpinned and will
 426    NOT appear in these results.
 427
 428    Use this tool when you want to monitor rollout health for actors that have been
 429    pinned to a pre-release or specific version. For finding healthy connections
 430    across ALL actors using a connector type (regardless of pinning),
 431    use query_prod_recent_syncs_for_connector instead.
 432
 433    The actor_id field is the actor ID (superset of source_id/destination_id).
 434
 435    Returns list of dicts with keys: job_id, connection_id, job_status, started_at,
 436    job_updated_at, connection_name, actor_id, actor_name, connector_definition_id,
 437    pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name,
 438    organization_id, dataplane_group_id, dataplane_name
 439
 440    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 441    level the effective pin came from.
 442    """
 443    return query_syncs_for_version_pinned_connector(
 444        connector_version_id,
 445        days=days,
 446        limit=limit,
 447        successful_only=successful_only,
 448    )
 449
 450
 451@mcp_tool(
 452    read_only=True,
 453    idempotent=True,
 454    open_world=True,
 455)
 456def query_prod_recent_syncs_for_connector(
 457    source_definition_id: Annotated[
 458        str | None,
 459        Field(
 460            description=(
 461                "Source connector definition ID (UUID) to search for. "
 462                "Provide this OR source_canonical_name OR destination_definition_id "
 463                "OR destination_canonical_name (exactly one required). "
 464                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 465            ),
 466            default=None,
 467        ),
 468    ],
 469    source_canonical_name: Annotated[
 470        str | None,
 471        Field(
 472            description=(
 473                "Canonical source connector name to search for. "
 474                "Provide this OR source_definition_id OR destination_definition_id "
 475                "OR destination_canonical_name (exactly one required). "
 476                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 477            ),
 478            default=None,
 479        ),
 480    ],
 481    destination_definition_id: Annotated[
 482        str | None,
 483        Field(
 484            description=(
 485                "Destination connector definition ID (UUID) to search for. "
 486                "Provide this OR destination_canonical_name OR source_definition_id "
 487                "OR source_canonical_name (exactly one required). "
 488                "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
 489            ),
 490            default=None,
 491        ),
 492    ],
 493    destination_canonical_name: Annotated[
 494        str | None,
 495        Field(
 496            description=(
 497                "Canonical destination connector name to search for. "
 498                "Provide this OR destination_definition_id OR source_definition_id "
 499                "OR source_canonical_name (exactly one required). "
 500                "Examples: 'destination-duckdb', 'DuckDB'."
 501            ),
 502            default=None,
 503        ),
 504    ],
 505    status_filter: Annotated[
 506        StatusFilter,
 507        Field(
 508            description=(
 509                "Filter by job status: 'all' (default), 'succeeded', or 'failed'. "
 510                "Use 'succeeded' to find healthy connections with recent successful syncs. "
 511                "Use 'failed' to find connections with recent failures."
 512            ),
 513            default=StatusFilter.ALL,
 514        ),
 515    ],
 516    organization_id: Annotated[
 517        str | OrganizationAliasEnum | None,
 518        Field(
 519            description=(
 520                "Optional organization ID (UUID) or alias to filter results. "
 521                "If provided, only syncs from this organization will be returned. "
 522                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 523            ),
 524            default=None,
 525        ),
 526    ],
 527    lookback_days: Annotated[
 528        int,
 529        Field(description="Number of days to look back (default: 7)", default=7),
 530    ],
 531    limit: Annotated[
 532        int,
 533        Field(description="Maximum number of results (default: 100)", default=100),
 534    ],
 535    customer_tier_filter: Annotated[
 536        TierFilter,
 537        Field(
 538            description=(
 539                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 540                "Filters results to only include connections belonging to organizations "
 541                "in the specified tier. Use 'ALL' to include all tiers."
 542            ),
 543        ),
 544    ] = "TIER_2",
 545    *,
 546    exclude_pinned: Annotated[
 547        bool,
 548        Field(
 549            description=(
 550                "If True, exclude syncs for actors that are already pinned to a "
 551                "specific version (at any scope level: actor, workspace, or organization). "
 552                "Useful for 'prove fix' workflows where you want to find unpinned "
 553                "connections for live testing. Default: False (include all syncs)."
 554            ),
 555            default=False,
 556        ),
 557    ],
 558) -> list[dict[str, Any]]:
 559    """List recent sync jobs for ALL actors using a connector type.
 560
 561    This tool finds all actors with the given connector definition and returns their
 562    recent sync jobs, regardless of whether they have explicit version pins. It filters
 563    out deleted actors, deleted workspaces, and deprecated connections.
 564
 565    Results are always enriched with customer_tier and is_eu fields.
 566    The customer_tier_filter parameter is required to ensure tier-aware querying.
 567
 568    Use this tool to:
 569    - Find healthy connections with recent successful syncs (status_filter='succeeded')
 570    - Investigate connector issues across all users (status_filter='failed')
 571    - Get an overview of all recent sync activity (status_filter='all')
 572
 573    Set exclude_pinned=True to filter out syncs for actors that are already pinned to a
 574    specific version. This is useful for 'prove fix' live connection testing workflows
 575    where you want to find unpinned connections to test against.
 576
 577    Supports both SOURCE and DESTINATION connectors. Provide exactly one of:
 578    source_definition_id, source_canonical_name, destination_definition_id,
 579    or destination_canonical_name.
 580
 581    Key fields in results:
 582    - job_status: 'succeeded', 'failed', 'cancelled', etc.
 583    - connection_id, connection_name: The connection that ran the sync
 584    - actor_id, actor_name: The source or destination actor
 585    - customer_tier: TIER_0, TIER_1, or TIER_2
 586    - is_eu: Whether the workspace is in the EU region
 587    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 588    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 589    """
 590    # Validate that exactly one connector parameter is provided
 591    provided_params = [
 592        source_definition_id,
 593        source_canonical_name,
 594        destination_definition_id,
 595        destination_canonical_name,
 596    ]
 597    num_provided = sum(p is not None for p in provided_params)
 598    if num_provided != 1:
 599        raise PyAirbyteInputError(
 600            message=(
 601                "Exactly one of source_definition_id, source_canonical_name, "
 602                "destination_definition_id, or destination_canonical_name must be provided."
 603            ),
 604        )
 605
 606    # Determine if this is a destination connector
 607    is_destination = (
 608        destination_definition_id is not None or destination_canonical_name is not None
 609    )
 610
 611    # Resolve canonical name to definition ID if needed
 612    resolved_definition_id: str
 613    if source_canonical_name:
 614        resolved_definition_id = resolve_canonical_name_to_definition_id(
 615            canonical_name=source_canonical_name,
 616        )
 617    elif destination_canonical_name:
 618        resolved_definition_id = resolve_canonical_name_to_definition_id(
 619            canonical_name=destination_canonical_name,
 620        )
 621    elif source_definition_id:
 622        resolved_definition_id = source_definition_id
 623    else:
 624        # We've validated exactly one param is provided, so this must be set
 625        assert destination_definition_id is not None
 626        resolved_definition_id = destination_definition_id
 627
 628    # Resolve organization ID alias
 629    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 630
 631    rows = query_recent_syncs_for_connector(
 632        connector_definition_id=resolved_definition_id,
 633        is_destination=is_destination,
 634        status_filter=status_filter,
 635        organization_id=resolved_organization_id,
 636        days=lookback_days,
 637        limit=limit,
 638        exclude_pinned=exclude_pinned,
 639    )
 640
 641    enriched = enrich_rows_by_org(rows)
 642    return filter_rows_by_tier(enriched, customer_tier_filter)
 643
 644
 645@mcp_tool(
 646    read_only=True,
 647    idempotent=True,
 648    open_world=True,
 649)
 650def query_prod_failed_sync_attempts_for_connector(
 651    source_definition_id: Annotated[
 652        str | None,
 653        Field(
 654            description=(
 655                "Source connector definition ID (UUID) to search for. "
 656                "Exactly one of this or source_canonical_name is required. "
 657                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 658            ),
 659            default=None,
 660        ),
 661    ] = None,
 662    source_canonical_name: Annotated[
 663        str | None,
 664        Field(
 665            description=(
 666                "Canonical source connector name to search for. "
 667                "Exactly one of this or source_definition_id is required. "
 668                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 669            ),
 670            default=None,
 671        ),
 672    ] = None,
 673    organization_id: Annotated[
 674        str | OrganizationAliasEnum | None,
 675        Field(
 676            description=(
 677                "Optional organization ID (UUID) or alias to filter results. "
 678                "If provided, only failed attempts from this organization will be returned. "
 679                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 680            ),
 681            default=None,
 682        ),
 683    ] = None,
 684    lookback_days: Annotated[
 685        int,
 686        Field(description="Number of days to look back (default: 7)", default=7),
 687    ] = 7,
 688    limit: Annotated[
 689        int,
 690        Field(description="Maximum number of results (default: 100)", default=100),
 691    ] = 100,
 692    customer_tier_filter: Annotated[
 693        TierFilter,
 694        Field(
 695            description=(
 696                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 697                "Filters results to only include connections belonging to organizations "
 698                "in the specified tier. Use 'ALL' to include all tiers."
 699            ),
 700        ),
 701    ] = "TIER_2",
 702) -> list[dict[str, Any]]:
 703    """List failed sync attempts for ALL actors using a source connector type.
 704
 705    This tool finds all actors with the given connector definition and returns their
 706    failed sync attempts, regardless of whether they have explicit version pins.
 707
 708    Results are always enriched with customer_tier and is_eu fields.
 709    The customer_tier_filter parameter is required to ensure tier-aware querying.
 710
 711    This is useful for investigating connector issues across all users. Use this when
 712    you want to find failures for a connector type regardless of which version users
 713    are on.
 714
 715    Note: This tool only supports SOURCE connectors. For destination connectors,
 716    a separate tool would be needed.
 717
 718    Key fields in results:
 719    - failure_summary: JSON containing failure details including failureType and messages
 720    - customer_tier: TIER_0, TIER_1, or TIER_2
 721    - is_eu: Whether the workspace is in the EU region
 722    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 723    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 724    """
 725    # Validate that exactly one of the two parameters is provided
 726    if (source_definition_id is None) == (source_canonical_name is None):
 727        raise PyAirbyteInputError(
 728            message=(
 729                "Exactly one of source_definition_id or source_canonical_name "
 730                "must be provided, but not both."
 731            ),
 732        )
 733
 734    # Resolve canonical name to definition ID if needed
 735    resolved_definition_id: str
 736    if source_canonical_name:
 737        resolved_definition_id = resolve_canonical_name_to_definition_id(
 738            canonical_name=source_canonical_name,
 739        )
 740    else:
 741        resolved_definition_id = source_definition_id  # type: ignore[assignment]
 742
 743    # Resolve organization ID alias
 744    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 745
 746    rows = query_failed_sync_attempts_for_connector(
 747        connector_definition_id=resolved_definition_id,
 748        organization_id=resolved_organization_id,
 749        days=lookback_days,
 750        limit=limit,
 751    )
 752    enriched = enrich_rows_by_org(rows)
 753    return filter_rows_by_tier(enriched, customer_tier_filter)
 754
 755
 756@mcp_tool(
 757    read_only=True,
 758    idempotent=True,
 759    open_world=True,
 760)
 761def query_prod_connections_by_connector(
 762    source_definition_id: Annotated[
 763        str | None,
 764        Field(
 765            description=(
 766                "Source connector definition ID (UUID) to search for. "
 767                "Exactly one of source_definition_id, source_canonical_name, "
 768                "destination_definition_id, or destination_canonical_name is required. "
 769                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 770            ),
 771            default=None,
 772        ),
 773    ] = None,
 774    source_canonical_name: Annotated[
 775        str | None,
 776        Field(
 777            description=(
 778                "Canonical source connector name to search for. "
 779                "Exactly one of source_definition_id, source_canonical_name, "
 780                "destination_definition_id, or destination_canonical_name is required. "
 781                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 782            ),
 783            default=None,
 784        ),
 785    ] = None,
 786    destination_definition_id: Annotated[
 787        str | None,
 788        Field(
 789            description=(
 790                "Destination connector definition ID (UUID) to search for. "
 791                "Exactly one of source_definition_id, source_canonical_name, "
 792                "destination_definition_id, or destination_canonical_name is required. "
 793                "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
 794            ),
 795            default=None,
 796        ),
 797    ] = None,
 798    destination_canonical_name: Annotated[
 799        str | None,
 800        Field(
 801            description=(
 802                "Canonical destination connector name to search for. "
 803                "Exactly one of source_definition_id, source_canonical_name, "
 804                "destination_definition_id, or destination_canonical_name is required. "
 805                "Examples: 'destination-duckdb', 'DuckDB'."
 806            ),
 807            default=None,
 808        ),
 809    ] = None,
 810    organization_id: Annotated[
 811        str | OrganizationAliasEnum | None,
 812        Field(
 813            description=(
 814                "Optional organization ID (UUID) or alias to filter results. "
 815                "If provided, only connections in this organization will be returned. "
 816                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 817            ),
 818            default=None,
 819        ),
 820    ] = None,
 821    limit: Annotated[
 822        int,
 823        Field(description="Maximum number of results (default: 1000)", default=1000),
 824    ] = 1000,
 825    customer_tier_filter: Annotated[
 826        TierFilter,
 827        Field(
 828            description=(
 829                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 830                "Filters results to only include connections belonging to organizations "
 831                "in the specified tier. Use 'ALL' to include all tiers."
 832            ),
 833        ),
 834    ] = "TIER_2",
 835    *,
 836    exclude_pinned: Annotated[
 837        bool,
 838        Field(
 839            description=(
 840                "If True, exclude connections whose connector is already pinned to a "
 841                "specific version (at any scope level: actor, workspace, or organization). "
 842                "Useful for 'prove fix' workflows where you want to find unpinned "
 843                "connections for live testing. Default: False (include all connections)."
 844            ),
 845            default=False,
 846        ),
 847    ],
 848) -> list[dict[str, Any]]:
 849    """Search for all connections using a specific source or destination connector type.
 850
 851    This tool queries the Airbyte Cloud Prod DB Replica directly for fast results.
 852    It finds all connections where the source or destination connector matches the
 853    specified type, regardless of how the connector is named by users.
 854
 855    Results are always enriched with customer_tier and is_eu fields.
 856    The customer_tier_filter parameter is required to ensure tier-aware querying.
 857
 858    Optionally filter by organization_id to limit results to a specific organization.
 859    Use '@airbyte-internal' as an alias for the Airbyte internal organization.
 860
 861    Set exclude_pinned=True to filter out connections that are already pinned to a
 862    specific version. This is useful for 'prove fix' live connection testing workflows
 863    where you want to find unpinned connections to test against.
 864
 865    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
 866    For source queries, returns: connection_id, connection_name, connection_url, source_id,
 867    source_name, source_definition_id, workspace_id, workspace_name, organization_id,
 868    dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id,
 869    pin_scope_type, customer_tier, is_eu.
 870    For destination queries, returns: connection_id, connection_name, connection_url,
 871    destination_id, destination_name, destination_definition_id, workspace_id,
 872    workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type,
 873    pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
 874
 875    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 876    level the effective pin came from (NULL if not pinned).
 877    """
 878    # Validate that exactly one of the four connector parameters is provided
 879    provided_params = [
 880        source_definition_id,
 881        source_canonical_name,
 882        destination_definition_id,
 883        destination_canonical_name,
 884    ]
 885    num_provided = sum(p is not None for p in provided_params)
 886    if num_provided != 1:
 887        raise PyAirbyteInputError(
 888            message=(
 889                "Exactly one of source_definition_id, source_canonical_name, "
 890                "destination_definition_id, or destination_canonical_name must be provided."
 891            ),
 892        )
 893
 894    # Determine if this is a source or destination query and resolve the definition ID
 895    is_source_query = (
 896        source_definition_id is not None or source_canonical_name is not None
 897    )
 898    resolved_definition_id: str
 899
 900    if source_canonical_name:
 901        resolved_definition_id = resolve_canonical_name_to_definition_id(
 902            canonical_name=source_canonical_name,
 903        )
 904    elif source_definition_id:
 905        resolved_definition_id = source_definition_id
 906    elif destination_canonical_name:
 907        resolved_definition_id = resolve_canonical_name_to_definition_id(
 908            canonical_name=destination_canonical_name,
 909        )
 910    else:
 911        resolved_definition_id = destination_definition_id  # type: ignore[assignment]
 912
 913    # Resolve organization ID alias
 914    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 915
 916    # Query the database based on connector type
 917    if is_source_query:
 918        rows = [
 919            {
 920                "organization_id": str(row.get("organization_id", "")),
 921                "workspace_id": str(row["workspace_id"]),
 922                "workspace_name": row.get("workspace_name", ""),
 923                "connection_id": str(row["connection_id"]),
 924                "connection_name": row.get("connection_name", ""),
 925                "connection_url": (
 926                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
 927                    f"/connections/{row['connection_id']}/status"
 928                ),
 929                "source_id": str(row["source_id"]),
 930                "source_name": row.get("source_name", ""),
 931                "source_definition_id": str(row["source_definition_id"]),
 932                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
 933                "dataplane_name": row.get("dataplane_name", ""),
 934                "pin_origin_type": row.get("pin_origin_type"),
 935                "pin_origin": row.get("pin_origin"),
 936                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
 937                "pin_scope_type": row.get("pin_scope_type"),
 938            }
 939            for row in query_connections_by_connector(
 940                connector_definition_id=resolved_definition_id,
 941                organization_id=resolved_organization_id,
 942                limit=limit,
 943                exclude_pinned=exclude_pinned,
 944            )
 945        ]
 946    else:
 947        # Destination query
 948        rows = [
 949            {
 950                "organization_id": str(row.get("organization_id", "")),
 951                "workspace_id": str(row["workspace_id"]),
 952                "workspace_name": row.get("workspace_name", ""),
 953                "connection_id": str(row["connection_id"]),
 954                "connection_name": row.get("connection_name", ""),
 955                "connection_url": (
 956                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
 957                    f"/connections/{row['connection_id']}/status"
 958                ),
 959                "destination_id": str(row["destination_id"]),
 960                "destination_name": row.get("destination_name", ""),
 961                "destination_definition_id": str(row["destination_definition_id"]),
 962                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
 963                "dataplane_name": row.get("dataplane_name", ""),
 964                "pin_origin_type": row.get("pin_origin_type"),
 965                "pin_origin": row.get("pin_origin"),
 966                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
 967                "pin_scope_type": row.get("pin_scope_type"),
 968            }
 969            for row in query_connections_by_destination_connector(
 970                connector_definition_id=resolved_definition_id,
 971                organization_id=resolved_organization_id,
 972                limit=limit,
 973                exclude_pinned=exclude_pinned,
 974            )
 975        ]
 976
 977    enriched = enrich_rows_by_org(rows)
 978    return filter_rows_by_tier(enriched, customer_tier_filter)
 979
 980
 981@mcp_tool(
 982    read_only=True,
 983    idempotent=True,
 984    open_world=True,
 985)
 986def query_prod_connections_by_stream(
 987    stream_name: Annotated[
 988        str,
 989        Field(
 990            description=(
 991                "Name of the stream to search for in connection catalogs. "
 992                "This must match the exact stream name as configured in the connection. "
 993                "Examples: 'global_exclusions', 'campaigns', 'users'."
 994            ),
 995        ),
 996    ],
 997    source_definition_id: Annotated[
 998        str | None,
 999        Field(
1000            description=(
1001                "Source connector definition ID (UUID) to search for. "
1002                "Provide this OR source_canonical_name (exactly one required). "
1003                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
1004            ),
1005            default=None,
1006        ),
1007    ],
1008    source_canonical_name: Annotated[
1009        str | None,
1010        Field(
1011            description=(
1012                "Canonical source connector name to search for. "
1013                "Provide this OR source_definition_id (exactly one required). "
1014                "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
1015            ),
1016            default=None,
1017        ),
1018    ],
1019    organization_id: Annotated[
1020        str | OrganizationAliasEnum | None,
1021        Field(
1022            description=(
1023                "Optional organization ID (UUID) or alias to filter results. "
1024                "If provided, only connections in this organization will be returned. "
1025                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
1026            ),
1027            default=None,
1028        ),
1029    ],
1030    limit: Annotated[
1031        int,
1032        Field(description="Maximum number of results (default: 100)", default=100),
1033    ],
1034    customer_tier_filter: Annotated[
1035        TierFilter,
1036        Field(
1037            description=(
1038                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
1039                "Filters results to only include connections belonging to organizations "
1040                "in the specified tier. Use 'ALL' to include all tiers."
1041            ),
1042        ),
1043    ] = "TIER_2",
1044) -> list[dict[str, Any]]:
1045    """Find connections that have a specific stream enabled in their catalog.
1046
1047    This tool searches the connection's configured catalog (JSONB) for streams
1048    matching the specified name. It's particularly useful when validating
1049    connector fixes that affect specific streams - you can quickly find
1050    customer connections that use the affected stream.
1051
1052    Results are always enriched with customer_tier and is_eu fields.
1053    The customer_tier_filter parameter is required to ensure tier-aware querying.
1054
1055    Use cases:
1056    - Finding connections with a specific stream enabled for regression testing
1057    - Validating connector fixes that affect particular streams
1058    - Identifying which customers use rarely-enabled streams
1059
1060    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
1061    """
1062    provided_params = [source_definition_id, source_canonical_name]
1063    num_provided = sum(p is not None for p in provided_params)
1064    if num_provided != 1:
1065        raise PyAirbyteInputError(
1066            message=(
1067                "Exactly one of source_definition_id or source_canonical_name "
1068                "must be provided."
1069            ),
1070        )
1071
1072    resolved_definition_id: str
1073    if source_canonical_name:
1074        resolved_definition_id = resolve_canonical_name_to_definition_id(
1075            canonical_name=source_canonical_name,
1076        )
1077    else:
1078        assert source_definition_id is not None
1079        resolved_definition_id = source_definition_id
1080
1081    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1082
1083    rows = [
1084        {
1085            "organization_id": str(row.get("organization_id", "")),
1086            "workspace_id": str(row["workspace_id"]),
1087            "workspace_name": row.get("workspace_name", ""),
1088            "connection_id": str(row["connection_id"]),
1089            "connection_name": row.get("connection_name", ""),
1090            "connection_status": row.get("connection_status", ""),
1091            "connection_url": (
1092                f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
1093                f"/connections/{row['connection_id']}/status"
1094            ),
1095            "source_id": str(row["source_id"]),
1096            "source_name": row.get("source_name", ""),
1097            "source_definition_id": str(row["source_definition_id"]),
1098            "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1099            "dataplane_name": row.get("dataplane_name", ""),
1100        }
1101        for row in query_connections_by_stream(
1102            connector_definition_id=resolved_definition_id,
1103            stream_name=stream_name,
1104            organization_id=resolved_organization_id,
1105            limit=limit,
1106        )
1107    ]
1108    enriched = enrich_rows_by_org(rows)
1109    return filter_rows_by_tier(enriched, customer_tier_filter)
1110
1111
1112@mcp_tool(
1113    read_only=True,
1114    idempotent=True,
1115)
1116def query_prod_workspaces_by_email_domain(
1117    email_domain: Annotated[
1118        str,
1119        Field(
1120            description=(
1121                "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). "
1122                "Do not include the '@' symbol. This will find workspaces where users "
1123                "have email addresses with this domain."
1124            ),
1125        ),
1126    ],
1127    limit: Annotated[
1128        int,
1129        Field(
1130            description="Maximum number of workspaces to return (default: 100)",
1131            default=100,
1132        ),
1133    ] = 100,
1134) -> WorkspacesByEmailDomainResult:
1135    """Find workspaces by email domain.
1136
1137    This tool searches for workspaces where users have email addresses matching
1138    the specified domain. This is useful for identifying workspaces belonging to
1139    specific companies - for example, searching for "motherduck.com" will find
1140    workspaces belonging to MotherDuck employees.
1141
1142    Use cases:
1143    - Finding partner organization connections for testing connector fixes
1144    - Identifying internal test accounts for specific integrations
1145    - Locating workspaces belonging to technology partners
1146
1147    The returned organization IDs can be used with other tools like
1148    `query_prod_connections_by_connector` to find connections within
1149    those organizations for safe testing.
1150    """
1151    # Strip leading @ if provided
1152    clean_domain = email_domain.lstrip("@")
1153
1154    # Query the database
1155    rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit)
1156
1157    # Convert rows to Pydantic models
1158    workspaces = [
1159        WorkspaceInfo(
1160            organization_id=str(row["organization_id"]),
1161            workspace_id=str(row["workspace_id"]),
1162            workspace_name=row.get("workspace_name", ""),
1163            slug=row.get("slug"),
1164            email=row.get("email"),
1165            dataplane_group_id=_opt_str(row.get("dataplane_group_id")),
1166            dataplane_name=row.get("dataplane_name"),
1167            created_at=row.get("created_at"),
1168        )
1169        for row in rows
1170    ]
1171
1172    # Enrich with tier annotation (annotation only, no filtering)
1173    unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces))
1174    tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)}
1175    for ws in workspaces:
1176        tier_result = tier_results.get(ws.organization_id)
1177        if tier_result:
1178            ws.customer_tier = tier_result.customer_tier
1179        ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False
1180
1181    return WorkspacesByEmailDomainResult(
1182        email_domain=clean_domain,
1183        total_workspaces_found=len(workspaces),
1184        unique_organization_ids=unique_org_ids,
1185        workspaces=workspaces,
1186    )
1187
1188
1189def _build_connector_stats(
1190    connector_definition_id: str,
1191    connector_type: str,
1192    canonical_name: str | None,
1193    rows: list[dict[str, Any]],
1194    version_tags: dict[str, str | None],
1195) -> ConnectorConnectionStats:
1196    """Build ConnectorConnectionStats from query result rows."""
1197    # Aggregate totals across all version groups
1198    total_connections = 0
1199    enabled_connections = 0
1200    active_connections = 0
1201    pinned_connections = 0
1202    unpinned_connections = 0
1203    total_succeeded = 0
1204    total_failed = 0
1205    total_cancelled = 0
1206    total_running = 0
1207    total_unknown = 0
1208
1209    by_version: list[VersionPinStats] = []
1210
1211    for row in rows:
1212        version_id = row.get("pinned_version_id")
1213        row_total = int(row.get("total_connections", 0))
1214        row_enabled = int(row.get("enabled_connections", 0))
1215        row_active = int(row.get("active_connections", 0))
1216        row_pinned = int(row.get("pinned_connections", 0))
1217        row_unpinned = int(row.get("unpinned_connections", 0))
1218        row_succeeded = int(row.get("succeeded_connections", 0))
1219        row_failed = int(row.get("failed_connections", 0))
1220        row_cancelled = int(row.get("cancelled_connections", 0))
1221        row_running = int(row.get("running_connections", 0))
1222        row_unknown = int(row.get("unknown_connections", 0))
1223
1224        total_connections += row_total
1225        enabled_connections += row_enabled
1226        active_connections += row_active
1227        pinned_connections += row_pinned
1228        unpinned_connections += row_unpinned
1229        total_succeeded += row_succeeded
1230        total_failed += row_failed
1231        total_cancelled += row_cancelled
1232        total_running += row_running
1233        total_unknown += row_unknown
1234
1235        by_version.append(
1236            VersionPinStats(
1237                pinned_version_id=str(version_id) if version_id else None,
1238                docker_image_tag=version_tags.get(str(version_id))
1239                if version_id
1240                else None,
1241                total_connections=row_total,
1242                enabled_connections=row_enabled,
1243                active_connections=row_active,
1244                latest_attempt=LatestAttemptBreakdown(
1245                    succeeded=row_succeeded,
1246                    failed=row_failed,
1247                    cancelled=row_cancelled,
1248                    running=row_running,
1249                    unknown=row_unknown,
1250                ),
1251            )
1252        )
1253
1254    return ConnectorConnectionStats(
1255        connector_definition_id=connector_definition_id,
1256        connector_type=connector_type,
1257        canonical_name=canonical_name,
1258        total_connections=total_connections,
1259        enabled_connections=enabled_connections,
1260        active_connections=active_connections,
1261        pinned_connections=pinned_connections,
1262        unpinned_connections=unpinned_connections,
1263        latest_attempt=LatestAttemptBreakdown(
1264            succeeded=total_succeeded,
1265            failed=total_failed,
1266            cancelled=total_cancelled,
1267            running=total_running,
1268            unknown=total_unknown,
1269        ),
1270        by_version=by_version,
1271    )
1272
1273
1274@mcp_tool(
1275    read_only=True,
1276    idempotent=True,
1277    open_world=True,
1278)
1279def query_prod_connector_connection_stats(
1280    source_definition_ids: Annotated[
1281        list[str] | None,
1282        Field(
1283            description=(
1284                "List of source connector definition IDs (UUIDs) to get stats for. "
1285                "Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
1286            ),
1287            default=None,
1288        ),
1289    ] = None,
1290    destination_definition_ids: Annotated[
1291        list[str] | None,
1292        Field(
1293            description=(
1294                "List of destination connector definition IDs (UUIDs) to get stats for. "
1295                "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
1296            ),
1297            default=None,
1298        ),
1299    ] = None,
1300    lookback_days: Annotated[
1301        int,
1302        Field(
1303            description=(
1304                "Number of days to look back for 'active' connections (default: 7). "
1305                "Connections with sync activity within this window are counted as active."
1306            ),
1307            default=7,
1308        ),
1309    ] = 7,
1310) -> ConnectorConnectionStatsResponse:
1311    """Get aggregate connection stats for multiple connectors.
1312
1313    Returns counts of connections grouped by pinned version for each connector,
1314    including:
1315    - Total, enabled, and active connection counts
1316    - Pinned vs unpinned breakdown
1317    - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
1318
1319    This tool is designed for release monitoring workflows. It allows you to:
1320    1. Query recently released connectors to identify which ones to monitor
1321    2. Get aggregate stats showing how many connections are using each version
1322    3. See health metrics (pass/fail) broken down by version
1323
1324    The `lookback_days` parameter controls the lookback window for:
1325    - Counting 'active' connections (those with recent sync activity)
1326    - Determining 'latest attempt status' (most recent attempt within the window)
1327
1328    Connections with no sync activity in the lookback window will have
1329    'unknown' status in the latest_attempt breakdown.
1330    """
1331    # Initialize empty lists if None
1332    source_ids = source_definition_ids or []
1333    destination_ids = destination_definition_ids or []
1334
1335    if not source_ids and not destination_ids:
1336        raise PyAirbyteInputError(
1337            message=(
1338                "At least one of source_definition_ids or destination_definition_ids "
1339                "must be provided."
1340            ),
1341        )
1342
1343    sources: list[ConnectorConnectionStats] = []
1344    destinations: list[ConnectorConnectionStats] = []
1345
1346    # Process source connectors
1347    for source_def_id in source_ids:
1348        # Get version info for tag lookup
1349        versions = query_connector_versions(source_def_id)
1350        version_tags = {
1351            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1352        }
1353
1354        # Get aggregate stats
1355        rows = query_source_connection_stats(source_def_id, days=lookback_days)
1356
1357        sources.append(
1358            _build_connector_stats(
1359                connector_definition_id=source_def_id,
1360                connector_type="source",
1361                canonical_name=None,
1362                rows=rows,
1363                version_tags=version_tags,
1364            )
1365        )
1366
1367    # Process destination connectors
1368    for dest_def_id in destination_ids:
1369        # Get version info for tag lookup
1370        versions = query_connector_versions(dest_def_id)
1371        version_tags = {
1372            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1373        }
1374
1375        # Get aggregate stats
1376        rows = query_destination_connection_stats(dest_def_id, days=lookback_days)
1377
1378        destinations.append(
1379            _build_connector_stats(
1380                connector_definition_id=dest_def_id,
1381                connector_type="destination",
1382                canonical_name=None,
1383                rows=rows,
1384                version_tags=version_tags,
1385            )
1386        )
1387
1388    return ConnectorConnectionStatsResponse(
1389        sources=sources,
1390        destinations=destinations,
1391        lookback_days=lookback_days,
1392        generated_at=datetime.now(timezone.utc),
1393    )
1394
1395
1396# =============================================================================
1397# Connector Rollout Models and Tools
1398# =============================================================================
1399
1400
1401class ConnectorRolloutInfo(BaseModel):
1402    """Information about a connector rollout."""
1403
1404    rollout_id: str = Field(description="The rollout UUID")
1405    actor_definition_id: str = Field(description="The connector definition UUID")
1406    state: str = Field(
1407        description="Rollout state: initialized, workflow_started, in_progress, "
1408        "paused, finalizing, succeeded, errored, failed_rolled_back, canceled"
1409    )
1410    initial_rollout_pct: int | None = Field(
1411        default=None, description="Initial rollout percentage"
1412    )
1413    current_target_rollout_pct: int | None = Field(
1414        default=None, description="Current target rollout percentage"
1415    )
1416    final_target_rollout_pct: int | None = Field(
1417        default=None, description="Final target rollout percentage"
1418    )
1419    has_breaking_changes: bool = Field(
1420        description="Whether the RC has breaking changes"
1421    )
1422    max_step_wait_time_mins: int | None = Field(
1423        default=None, description="Maximum wait time between rollout steps in minutes"
1424    )
1425    rollout_strategy: str | None = Field(
1426        default=None, description="Rollout strategy: manual, automated, overridden"
1427    )
1428    updated_by_user_id: str | None = Field(
1429        default=None,
1430        description="User ID recorded as last updating the rollout",
1431    )
1432    updated_by_user_name: str | None = Field(
1433        default=None,
1434        description="Name recorded as last updating the rollout",
1435    )
1436    updated_by_user_email: str | None = Field(
1437        default=None,
1438        description="Email recorded as last updating the rollout",
1439    )
1440    workflow_run_id: str | None = Field(
1441        default=None, description="Temporal workflow run ID"
1442    )
1443    error_msg: str | None = Field(default=None, description="Error message if errored")
1444    failed_reason: str | None = Field(
1445        default=None, description="Reason for failure if failed"
1446    )
1447    paused_reason: str | None = Field(
1448        default=None, description="Reason for pause if paused"
1449    )
1450    tag: str | None = Field(default=None, description="Optional tag for the rollout")
1451    created_at: datetime | None = Field(
1452        default=None, description="When the rollout was created"
1453    )
1454    updated_at: datetime | None = Field(
1455        default=None, description="When the rollout was last updated"
1456    )
1457    completed_at: datetime | None = Field(
1458        default=None, description="When the rollout completed (if terminal)"
1459    )
1460    expires_at: datetime | None = Field(
1461        default=None, description="When the rollout expires"
1462    )
1463    rc_docker_image_tag: str | None = Field(
1464        default=None, description="Docker image tag of the release candidate"
1465    )
1466    rc_docker_repository: str | None = Field(
1467        default=None, description="Docker repository of the release candidate"
1468    )
1469    initial_docker_image_tag: str | None = Field(
1470        default=None, description="Docker image tag of the initial version"
1471    )
1472    initial_docker_repository: str | None = Field(
1473        default=None, description="Docker repository of the initial version"
1474    )
1475    filters: dict[str, Any] | None = Field(
1476        default=None,
1477        description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})",
1478    )
1479    customer_tier: str | None = Field(
1480        default=None,
1481        description="Customer tier targeted by this rollout (extracted from filters), "
1482        "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.",
1483    )
1484
1485
1486def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None:
1487    """Parse the rollout filters field from a database row.
1488
1489    The filters field may be a JSON string, a dict, or None.
1490    """
1491    if filters_raw is None:
1492        return None
1493    if isinstance(filters_raw, dict):
1494        return filters_raw
1495    if isinstance(filters_raw, str):
1496        try:
1497            parsed = json.loads(filters_raw)
1498            if isinstance(parsed, dict):
1499                return parsed
1500        except (json.JSONDecodeError, TypeError):
1501            pass
1502    return None
1503
1504
1505def _extract_tier_from_filters(filters_raw: Any) -> str | None:
1506    """Extract customer tier from rollout filters JSON.
1507
1508    Supports two formats:
1509    - Legacy: `{"tierFilter": {"tier": "TIER_0"}}`
1510    - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}`
1511    """
1512    parsed = _parse_rollout_filters(filters_raw)
1513    if parsed is None:
1514        return None
1515
1516    # Current format: customerTierFilters list
1517    tier_filters = parsed.get("customerTierFilters")
1518    if isinstance(tier_filters, list):
1519        for entry in tier_filters:
1520            if isinstance(entry, dict) and entry.get("name") == "TIER":
1521                values = entry.get("value")
1522                if isinstance(values, list) and len(values) == 1:
1523                    return str(values[0])
1524                if isinstance(values, list) and len(values) > 1:
1525                    return ", ".join(str(v) for v in values)
1526
1527    # Legacy format: tierFilter dict
1528    tier_filter = parsed.get("tierFilter")
1529    if isinstance(tier_filter, dict):
1530        tier = tier_filter.get("tier")
1531        if isinstance(tier, str):
1532            return tier
1533
1534    return None
1535
1536
1537def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo:
1538    """Convert a database row to a ConnectorRolloutInfo model."""
1539    return ConnectorRolloutInfo(
1540        rollout_id=str(row["rollout_id"]),
1541        actor_definition_id=str(row["actor_definition_id"]),
1542        state=row["state"],
1543        initial_rollout_pct=row.get("initial_rollout_pct"),
1544        current_target_rollout_pct=row.get("current_target_rollout_pct"),
1545        final_target_rollout_pct=row.get("final_target_rollout_pct"),
1546        has_breaking_changes=row["has_breaking_changes"],
1547        max_step_wait_time_mins=row.get("max_step_wait_time_mins"),
1548        rollout_strategy=row.get("rollout_strategy"),
1549        updated_by_user_id=str(row["updated_by_user_id"])
1550        if row.get("updated_by_user_id") is not None
1551        else None,
1552        updated_by_user_name=row.get("updated_by_user_name"),
1553        updated_by_user_email=row.get("updated_by_user_email"),
1554        workflow_run_id=row.get("workflow_run_id"),
1555        error_msg=row.get("error_msg"),
1556        failed_reason=row.get("failed_reason"),
1557        paused_reason=row.get("paused_reason"),
1558        tag=row.get("tag"),
1559        created_at=row.get("created_at"),
1560        updated_at=row.get("updated_at"),
1561        completed_at=row.get("completed_at"),
1562        expires_at=row.get("expires_at"),
1563        rc_docker_image_tag=row.get("rc_docker_image_tag"),
1564        rc_docker_repository=row.get("rc_docker_repository"),
1565        initial_docker_image_tag=row.get("initial_docker_image_tag"),
1566        initial_docker_repository=row.get("initial_docker_repository"),
1567        filters=_parse_rollout_filters(row.get("filters")),
1568        customer_tier=_extract_tier_from_filters(row.get("filters")),
1569    )
1570
1571
1572@mcp_tool(
1573    read_only=True,
1574    idempotent=True,
1575)
1576def query_prod_connector_rollouts(
1577    actor_definition_id: Annotated[
1578        str | None,
1579        Field(description="Connector definition UUID to filter by (optional)"),
1580    ] = None,
1581    rollout_id: Annotated[
1582        str | None,
1583        Field(description="Specific rollout UUID to look up (optional)"),
1584    ] = None,
1585    active_only: Annotated[
1586        bool,
1587        Field(description="If true, only return active (non-terminal) rollouts"),
1588    ] = False,
1589    limit: Annotated[
1590        int,
1591        Field(description="Maximum number of results (default: 100)"),
1592    ] = 100,
1593) -> list[ConnectorRolloutInfo]:
1594    """Query connector rollouts with flexible filtering.
1595
1596    Returns rollouts based on the provided filters. If no filters are specified,
1597    returns all active rollouts. Useful for monitoring rollout status and history.
1598
1599    Filter behavior:
1600    - rollout_id: Returns that specific rollout (ignores other filters)
1601    - active_only: Returns only active (non-terminal) rollouts
1602    - actor_definition_id: Returns rollouts for that specific connector
1603    - No filters: Returns all active rollouts (same as active_only=True)
1604    """
1605    rows = query_connector_rollouts(
1606        actor_definition_id=actor_definition_id,
1607        rollout_id=rollout_id,
1608        active_only=active_only,
1609        limit=limit,
1610    )
1611    return [_row_to_connector_rollout_info(row) for row in rows]
1612
1613
1614@mcp_tool(
1615    read_only=True,
1616    idempotent=True,
1617    open_world=True,
1618)
1619def query_prod_connection_sync_activity(
1620    start_at: Annotated[
1621        datetime,
1622        Field(
1623            description=(
1624                "Inclusive start timestamp for the sync activity window. "
1625                "Must be timezone-aware (ISO 8601 with offset or `Z`)."
1626            ),
1627        ),
1628    ],
1629    end_at: Annotated[
1630        datetime,
1631        Field(
1632            description=(
1633                "Exclusive end timestamp for the sync activity window. "
1634                "Must be timezone-aware and strictly after `start_at`."
1635            ),
1636        ),
1637    ],
1638    organization_id: Annotated[
1639        str | OrganizationAliasEnum | None,
1640        Field(
1641            description=(
1642                "Optional organization UUID or alias. At least one of "
1643                "`organization_id`, `workspace_id`, or `connection_ids` is "
1644                "required. Accepts `@airbyte-internal` as an alias for the "
1645                "Airbyte internal org."
1646            ),
1647            default=None,
1648        ),
1649    ] = None,
1650    workspace_id: Annotated[
1651        str | WorkspaceAliasEnum | None,
1652        Field(
1653            description=(
1654                "Optional workspace UUID or alias. At least one of "
1655                "`organization_id`, `workspace_id`, or `connection_ids` is "
1656                "required. Accepts `@devin-ai-sandbox` as an alias for the "
1657                "Devin AI sandbox workspace."
1658            ),
1659            default=None,
1660        ),
1661    ] = None,
1662    connection_ids: Annotated[
1663        list[str] | None,
1664        Field(
1665            description=(
1666                "Optional list of connection UUIDs. At least one of "
1667                "`organization_id`, `workspace_id`, or `connection_ids` is "
1668                "required."
1669            ),
1670            default=None,
1671        ),
1672    ] = None,
1673    status_filter: Annotated[
1674        StatusFilter,
1675        Field(
1676            description=(
1677                "Filter by job status: `all` (default), `succeeded`, or "
1678                "`failed`. Applied to `jobs.status` in the Prod DB Replica."
1679            ),
1680            default=StatusFilter.ALL,
1681        ),
1682    ] = StatusFilter.ALL,
1683    limit: Annotated[
1684        int,
1685        Field(
1686            description="Maximum number of attempt rows to return.",
1687            default=1000,
1688        ),
1689    ] = 1000,
1690) -> list[dict[str, Any]]:
1691    """List recent sync jobs and attempts from the Prod DB Replica.
1692
1693    Returns one row per `(job, attempt)` pair for sync jobs whose `updated_at`
1694    falls in `[start_at, end_at)`, scoped to the provided organization,
1695    workspace, or connection IDs. Designed for live operational lookups —
1696    e.g. "what happened on this connection in the last hour" — not for
1697    historical analysis.
1698
1699    Each row is enriched with `customer_tier` and `is_eu` for the owning
1700    organization. Tier filtering is intentionally not applied — this is a
1701    read-only observability query.
1702
1703    Input requirements:
1704    - At least one of `organization_id`, `workspace_id`, or `connection_ids`
1705      must be provided (any combination is accepted).
1706    - `start_at` and `end_at` must be timezone-aware and `start_at < end_at`.
1707
1708    Key fields in each row:
1709    - `job_id`, `attempt_id`, `attempt_number`
1710    - `job_status`, `attempt_status`
1711    - `job_started_at`, `job_updated_at`, `attempt_ended_at`
1712    - `failure_summary` (JSON; populated when an attempt failed)
1713    - `connection_id`, `connection_name`, `connection_status`
1714    - `source_actor_id`, `source_actor_name`, `source_actor_definition_id`
1715    - `destination_actor_id`, `destination_actor_name`,
1716      `destination_actor_definition_id`
1717    - `workspace_id`, `workspace_name`, `organization_id`
1718    - `dataplane_group_id`, `dataplane_name`
1719    - `customer_tier`, `is_eu` (added by tier enrichment)
1720    """
1721    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1722    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
1723    _validate_sync_activity_scope(
1724        organization_id=resolved_organization_id,
1725        workspace_id=resolved_workspace_id,
1726        connection_ids=connection_ids,
1727    )
1728    normalized_start_at, normalized_end_at = _validate_sync_activity_window(
1729        start_at=start_at,
1730        end_at=end_at,
1731    )
1732
1733    rows = query_connection_sync_activity_from_prod(
1734        start_at=normalized_start_at,
1735        end_at=normalized_end_at,
1736        organization_id=resolved_organization_id,
1737        workspace_id=resolved_workspace_id,
1738        connection_ids=connection_ids,
1739        status_filter=status_filter.value,
1740        limit=limit,
1741    )
1742    return enrich_rows_by_org(rows)
1743
1744
1745def register_prod_db_query_tools(app: FastMCP) -> None:
1746    """Register prod DB query tools with the FastMCP app."""
1747    register_mcp_tools(app, mcp_module=__name__)