airbyte_ops_mcp.mcp.prod_db_queries
MCP tools for querying the Airbyte Cloud Prod DB Replica.
This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
MCP reference
MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 14 tool(s), 0 prompt(s), 0 resource(s).
Tools (14)
query_prod_actors_by_pinned_connector_version
Hints: read-only · idempotent
List actors (sources/destinations) effectively pinned to a specific connector version.
Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.
The actor_id field is the actor ID (superset of source_id/destination_id).
Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_version_id |
string |
yes | — | Connector version UUID to find pinned instances for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_version_id": {
"description": "Connector version UUID to find pinned instances for",
"type": "string"
}
},
"required": [
"connector_version_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connection_sync_activity
Hints: read-only · idempotent · open-world
List recent sync jobs and attempts from the Prod DB Replica.
Returns one row per (job, attempt) pair for sync jobs whose updated_at
falls in [start_at, end_at), scoped to the provided organization,
workspace, or connection IDs. Designed for live operational lookups —
e.g. "what happened on this connection in the last hour" — not for
historical analysis.
Each row is enriched with customer_tier and is_eu for the owning
organization. Tier filtering is intentionally not applied — this is a
read-only observability query.
Input requirements:
- At least one of
organization_id,workspace_id, orconnection_idsmust be provided (any combination is accepted). start_atandend_atmust be timezone-aware andstart_at < end_at.
Key fields in each row:
job_id,attempt_id,attempt_numberjob_status,attempt_statusjob_started_at,job_updated_at,attempt_ended_atfailure_summary(JSON; populated when an attempt failed)connection_id,connection_name,connection_statussource_actor_id,source_actor_name,source_actor_definition_iddestination_actor_id,destination_actor_name,destination_actor_definition_idworkspace_id,workspace_name,organization_iddataplane_group_id,dataplane_namecustomer_tier,is_eu(added by tier enrichment)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
start_at |
string |
yes | — | Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or Z). |
end_at |
string |
yes | — | Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after start_at. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @airbyte-internal as an alias for the Airbyte internal org. |
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") | null |
no | null |
Optional workspace UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @devin-ai-sandbox as an alias for the Devin AI sandbox workspace. |
connection_ids |
array<string> | null |
no | null |
Optional list of connection UUIDs. At least one of organization_id, workspace_id, or connection_ids is required. |
status_filter |
enum("all", "succeeded", "failed") |
no | "all" |
Filter by job status: all (default), succeeded, or failed. Applied to jobs.status in the Prod DB Replica. |
limit |
integer |
no | 1000 |
Maximum number of attempt rows to return. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"start_at": {
"description": "Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or `Z`).",
"format": "date-time",
"type": "string"
},
"end_at": {
"description": "Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after `start_at`.",
"format": "date-time",
"type": "string"
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@airbyte-internal` as an alias for the Airbyte internal org."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional workspace UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@devin-ai-sandbox` as an alias for the Devin AI sandbox workspace."
},
"connection_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional list of connection UUIDs. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required."
},
"status_filter": {
"description": "Filter by job status: `all` (default), `succeeded`, or `failed`. Applied to `jobs.status` in the Prod DB Replica.",
"enum": [
"all",
"succeeded",
"failed"
],
"type": "string",
"default": "all"
},
"limit": {
"default": 1000,
"description": "Maximum number of attempt rows to return.",
"type": "integer"
}
},
"required": [
"start_at",
"end_at"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connections_by_connector
Hints: read-only · idempotent · open-world
Search for all connections using a specific source or destination connector type.
This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.
Set exclude_pinned=True to filter out connections that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.
Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
destination_definition_id |
string | null |
no | null |
Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB. |
destination_canonical_name |
string | null |
no | null |
Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
limit |
integer |
no | 1000 |
Maximum number of results (default: 1000) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
exclude_pinned |
boolean |
no | false |
If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"destination_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
},
"destination_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"limit": {
"default": 1000,
"description": "Maximum number of results (default: 1000)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
"exclude_pinned": {
"default": false,
"description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connections_by_stream
Hints: read-only · idempotent · open-world
Find connections that have a specific stream enabled in their catalog.
This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Use cases:
- Finding connections with a specific stream enabled for regression testing
- Validating connector fixes that affect particular streams
- Identifying which customers use rarely-enabled streams
Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
stream_name |
string |
yes | — | Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'. |
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"stream_name": {
"description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
"type": "string"
},
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"required": [
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connector_connection_stats
Hints: read-only · idempotent · open-world
Get aggregate connection stats for multiple connectors.
Returns counts of connections grouped by pinned version for each connector, including:
- Total, enabled, and active connection counts
- Pinned vs unpinned breakdown
- Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
This tool is designed for release monitoring workflows. It allows you to:
- Query recently released connectors to identify which ones to monitor
- Get aggregate stats showing how many connections are using each version
- See health metrics (pass/fail) broken down by version
The lookback_days parameter controls the lookback window for:
- Counting 'active' connections (those with recent sync activity)
- Determining 'latest attempt status' (most recent attempt within the window)
Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_ids |
array<string> | null |
no | null |
List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139'] |
destination_definition_ids |
array<string> | null |
no | null |
List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610'] |
lookback_days |
integer |
no | 7 |
Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
},
"destination_definition_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Response containing connection stats for multiple connectors.",
"properties": {
"sources": {
"description": "Stats for source connectors",
"items": {
"description": "Aggregate connection stats for a connector.",
"properties": {
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_type": {
"description": "'source' or 'destination'",
"type": "string"
},
"canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The canonical connector name if resolved"
},
"total_connections": {
"description": "Total number of non-deprecated connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"pinned_connections": {
"description": "Number of connections with explicit version pins",
"type": "integer"
},
"unpinned_connections": {
"description": "Number of connections on default version",
"type": "integer"
},
"latest_attempt": {
"description": "Overall breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
},
"by_version": {
"description": "Stats broken down by pinned version",
"items": {
"description": "Stats for connections pinned to a specific version.",
"properties": {
"pinned_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"description": "The connector version UUID (None for unpinned connections)"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag for this version"
},
"total_connections": {
"description": "Total number of connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"latest_attempt": {
"description": "Breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"pinned_version_id",
"total_connections",
"enabled_connections",
"active_connections",
"latest_attempt"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"connector_definition_id",
"connector_type",
"total_connections",
"enabled_connections",
"active_connections",
"pinned_connections",
"unpinned_connections",
"latest_attempt",
"by_version"
],
"type": "object"
},
"type": "array"
},
"destinations": {
"description": "Stats for destination connectors",
"items": {
"description": "Aggregate connection stats for a connector.",
"properties": {
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_type": {
"description": "'source' or 'destination'",
"type": "string"
},
"canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The canonical connector name if resolved"
},
"total_connections": {
"description": "Total number of non-deprecated connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"pinned_connections": {
"description": "Number of connections with explicit version pins",
"type": "integer"
},
"unpinned_connections": {
"description": "Number of connections on default version",
"type": "integer"
},
"latest_attempt": {
"description": "Overall breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
},
"by_version": {
"description": "Stats broken down by pinned version",
"items": {
"description": "Stats for connections pinned to a specific version.",
"properties": {
"pinned_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"description": "The connector version UUID (None for unpinned connections)"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag for this version"
},
"total_connections": {
"description": "Total number of connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"latest_attempt": {
"description": "Breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"pinned_version_id",
"total_connections",
"enabled_connections",
"active_connections",
"latest_attempt"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"connector_definition_id",
"connector_type",
"total_connections",
"enabled_connections",
"active_connections",
"pinned_connections",
"unpinned_connections",
"latest_attempt",
"by_version"
],
"type": "object"
},
"type": "array"
},
"lookback_days": {
"description": "Lookback window used for 'active' connections",
"type": "integer"
},
"generated_at": {
"description": "When this response was generated",
"format": "date-time",
"type": "string"
}
},
"required": [
"lookback_days",
"generated_at"
],
"type": "object"
}
query_prod_connector_rollouts
Hints: read-only · idempotent
Query connector rollouts with flexible filtering.
Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.
Filter behavior:
- rollout_id: Returns that specific rollout (ignores other filters)
- active_only: Returns only active (non-terminal) rollouts
- actor_definition_id: Returns rollouts for that specific connector
- No filters: Returns all active rollouts (same as active_only=True)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
actor_definition_id |
string | null |
no | null |
Connector definition UUID to filter by (optional) |
rollout_id |
string | null |
no | null |
Specific rollout UUID to look up (optional) |
active_only |
boolean |
no | false |
If true, only return active (non-terminal) rollouts |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"actor_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connector definition UUID to filter by (optional)"
},
"rollout_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Specific rollout UUID to look up (optional)"
},
"active_only": {
"default": false,
"description": "If true, only return active (non-terminal) rollouts",
"type": "boolean"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a connector rollout.",
"properties": {
"rollout_id": {
"description": "The rollout UUID",
"type": "string"
},
"actor_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"state": {
"description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
"type": "string"
},
"initial_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Initial rollout percentage"
},
"current_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Current target rollout percentage"
},
"final_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Final target rollout percentage"
},
"has_breaking_changes": {
"description": "Whether the RC has breaking changes",
"type": "boolean"
},
"max_step_wait_time_mins": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Maximum wait time between rollout steps in minutes"
},
"rollout_strategy": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Rollout strategy: manual, automated, overridden"
},
"updated_by_user_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "User ID recorded as last updating the rollout"
},
"updated_by_user_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Name recorded as last updating the rollout"
},
"updated_by_user_email": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Email recorded as last updating the rollout"
},
"workflow_run_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Temporal workflow run ID"
},
"error_msg": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Error message if errored"
},
"failed_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Reason for failure if failed"
},
"paused_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Reason for pause if paused"
},
"tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional tag for the rollout"
},
"created_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout was created"
},
"updated_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout was last updated"
},
"completed_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout completed (if terminal)"
},
"expires_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout expires"
},
"rc_docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker image tag of the release candidate"
},
"rc_docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker repository of the release candidate"
},
"initial_docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker image tag of the initial version"
},
"initial_docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker repository of the initial version"
},
"filters": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
}
},
"required": [
"rollout_id",
"actor_definition_id",
"state",
"has_breaking_changes"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connector_versions
Hints: read-only · idempotent
List all versions for a connector definition.
Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.
Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_definition_id |
string |
yes | — | Connector definition UUID to list versions for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_definition_id": {
"description": "Connector definition UUID to list versions for",
"type": "string"
}
},
"required": [
"connector_definition_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_dataplanes
Hints: read-only · idempotent
List all dataplane groups with workspace counts.
Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).
Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count
Parameters:
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_failed_sync_attempts_for_connector
Hints: read-only · idempotent · open-world
List failed sync attempts for ALL actors using a source connector type.
This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.
Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.
Key fields in results:
- failure_summary: JSON containing failure details including failureType and messages
- customer_tier: TIER_0, TIER_1, or TIER_2
- is_eu: Whether the workspace is in the EU region
- pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
- pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
lookback_days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_new_connector_releases
Hints: read-only · idempotent
List recently published connector versions.
Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).
Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_recent_syncs_for_connector
Hints: read-only · idempotent · open-world
List recent sync jobs for ALL actors using a connector type.
This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Use this tool to:
- Find healthy connections with recent successful syncs (status_filter='succeeded')
- Investigate connector issues across all users (status_filter='failed')
- Get an overview of all recent sync activity (status_filter='all')
Set exclude_pinned=True to filter out syncs for actors that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.
Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.
Key fields in results:
- job_status: 'succeeded', 'failed', 'cancelled', etc.
- connection_id, connection_name: The connection that ran the sync
- actor_id, actor_name: The source or destination actor
- customer_tier: TIER_0, TIER_1, or TIER_2
- is_eu: Whether the workspace is in the EU region
- pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
- pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
destination_definition_id |
string | null |
no | null |
Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB. |
destination_canonical_name |
string | null |
no | null |
Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'. |
status_filter |
enum("all", "succeeded", "failed") |
no | "all" |
Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
lookback_days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
exclude_pinned |
boolean |
no | false |
If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"destination_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
},
"destination_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
},
"status_filter": {
"description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
"enum": [
"all",
"succeeded",
"failed"
],
"type": "string",
"default": "all"
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
"exclude_pinned": {
"default": false,
"description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_recent_syncs_for_version_pinned_connector
Hints: read-only · idempotent
List sync job results for actors PINNED to a specific connector version.
IMPORTANT: This tool ONLY returns results for actors that are effectively pinned to the specified version via scoped_configuration at any scope level (actor, workspace, or organization). Most connections run unpinned and will NOT appear in these results.
Use this tool when you want to monitor rollout health for actors that have been pinned to a pre-release or specific version. For finding healthy connections across ALL actors using a connector type (regardless of pinning), use query_prod_recent_syncs_for_connector instead.
The actor_id field is the actor ID (superset of source_id/destination_id).
Returns list of dicts with keys: job_id, connection_id, job_status, started_at, job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_version_id |
string |
yes | — | Connector version UUID to find sync results for |
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
successful_only |
boolean |
no | false |
If True, only return successful syncs (default: False) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_version_id": {
"description": "Connector version UUID to find sync results for",
"type": "string"
},
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"successful_only": {
"default": false,
"description": "If True, only return successful syncs (default: False)",
"type": "boolean"
}
},
"required": [
"connector_version_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_workspace_info
Hints: read-only · idempotent
Get workspace information including dataplane group.
Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.
Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
}
},
"required": [
"workspace_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_workspaces_by_email_domain
Hints: read-only · idempotent
Find workspaces by email domain.
This tool searches for workspaces where users have email addresses matching the specified domain. This is useful for identifying workspaces belonging to specific companies - for example, searching for "motherduck.com" will find workspaces belonging to MotherDuck employees.
Use cases:
- Finding partner organization connections for testing connector fixes
- Identifying internal test accounts for specific integrations
- Locating workspaces belonging to technology partners
The returned organization IDs can be used with other tools like
query_prod_connections_by_connector to find connections within
those organizations for safe testing.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
email_domain |
string |
yes | — | Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain. |
limit |
integer |
no | 100 |
Maximum number of workspaces to return (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"email_domain": {
"description": "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.",
"type": "string"
},
"limit": {
"default": 100,
"description": "Maximum number of workspaces to return (default: 100)",
"type": "integer"
}
},
"required": [
"email_domain"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of looking up workspaces by email domain.",
"properties": {
"email_domain": {
"description": "The email domain that was searched for (e.g., 'motherduck.com')",
"type": "string"
},
"total_workspaces_found": {
"description": "Total number of workspaces matching the email domain",
"type": "integer"
},
"unique_organization_ids": {
"description": "List of unique organization IDs found",
"items": {
"type": "string"
},
"type": "array"
},
"workspaces": {
"description": "List of workspaces matching the email domain",
"items": {
"description": "Information about a workspace found by email domain search.",
"properties": {
"organization_id": {
"description": "The organization UUID",
"type": "string"
},
"workspace_id": {
"description": "The workspace UUID",
"type": "string"
},
"workspace_name": {
"description": "The name of the workspace",
"type": "string"
},
"slug": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The workspace slug (URL-friendly identifier)"
},
"email": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The email address associated with the workspace"
},
"dataplane_group_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The dataplane group UUID (region)"
},
"dataplane_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the dataplane (e.g., 'US', 'EU')"
},
"created_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the workspace was created"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
},
"is_eu": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether the workspace is in the EU region (derived from dataplane_name)."
}
},
"required": [
"organization_id",
"workspace_id",
"workspace_name"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"email_domain",
"total_workspaces_found",
"unique_organization_ids",
"workspaces"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for querying the Airbyte Cloud Prod DB Replica. 3 4This module provides MCP tools that wrap the query functions from 5airbyte_ops_mcp.prod_db_access.queries for use by AI agents. 6 7## MCP reference 8 9.. include:: ../../../docs/mcp-generated/prod_db_queries.md 10 :start-line: 2 11""" 12 13from __future__ import annotations 14 15__all__: list[str] = [] 16 17import json 18from datetime import datetime, timezone 19from enum import StrEnum 20from typing import Annotated, Any 21 22from airbyte.exceptions import PyAirbyteInputError 23from fastmcp import FastMCP 24from fastmcp_extensions import mcp_tool, register_mcp_tools 25from pydantic import BaseModel, Field 26 27from airbyte_ops_mcp.cloud_admin.registry_lookup import ( 28 resolve_canonical_name_to_definition_id, 29) 30from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum 31from airbyte_ops_mcp.prod_db_access.queries import ( 32 query_actors_pinned_to_version, 33 query_connection_sync_activity_from_prod, 34 query_connections_by_connector, 35 query_connections_by_destination_connector, 36 query_connections_by_stream, 37 query_connector_rollouts, 38 query_connector_versions, 39 query_dataplanes_list, 40 query_destination_connection_stats, 41 query_failed_sync_attempts_for_connector, 42 query_new_connector_releases, 43 query_recent_syncs_for_connector, 44 query_source_connection_stats, 45 query_syncs_for_version_pinned_connector, 46 query_workspace_info, 47 query_workspaces_by_email_domain, 48) 49from airbyte_ops_mcp.tier_cache import ( 50 TierFilter, 51 enrich_rows_by_org, 52 filter_rows_by_tier, 53 get_org_tiers, 54) 55 56 57class StatusFilter(StrEnum): 58 """Filter for job status in sync queries.""" 59 60 ALL = "all" 61 SUCCEEDED = "succeeded" 62 FAILED = "failed" 63 64 65# Cloud UI base URL for building connection URLs 66CLOUD_UI_BASE_URL = "https://cloud.airbyte.com" 67 68 69def _validate_sync_activity_scope( 70 *, 71 organization_id: str | None, 72 workspace_id: str | None, 73 connection_ids: list[str] | None, 74) -> None: 75 """Require at least one explicit scope filter for sync activity queries.""" 76 if organization_id or workspace_id or connection_ids: 77 return 78 raise PyAirbyteInputError( 79 message=( 80 "Provide at least one scope filter: `organization_id`, `workspace_id`, " 81 "or `connection_ids`." 82 ), 83 context={ 84 "organization_id": organization_id, 85 "workspace_id": workspace_id, 86 "connection_ids": connection_ids, 87 }, 88 ) 89 90 91def _validate_sync_activity_window( 92 *, 93 start_at: datetime, 94 end_at: datetime, 95) -> tuple[datetime, datetime]: 96 """Validate that `start_at` and `end_at` describe a usable window. 97 98 Returns the timestamps normalized to UTC. Raises `PyAirbyteInputError` for 99 naive timestamps or inverted ranges. No clock-relative caps are enforced 100 here; the caller is trusted to choose a sensible window. 101 """ 102 if start_at.tzinfo is None or end_at.tzinfo is None: 103 raise PyAirbyteInputError( 104 message="`start_at` and `end_at` must include timezone information.", 105 context={ 106 "start_at": start_at.isoformat(), 107 "end_at": end_at.isoformat(), 108 }, 109 ) 110 111 normalized_start = start_at.astimezone(timezone.utc) 112 normalized_end = end_at.astimezone(timezone.utc) 113 114 if normalized_start >= normalized_end: 115 raise PyAirbyteInputError( 116 message="`start_at` must be earlier than `end_at`.", 117 context={ 118 "start_at": normalized_start.isoformat(), 119 "end_at": normalized_end.isoformat(), 120 }, 121 ) 122 return normalized_start, normalized_end 123 124 125# ============================================================================= 126# Pydantic Models for MCP Tool Responses 127# ============================================================================= 128 129 130class WorkspaceInfo(BaseModel): 131 """Information about a workspace found by email domain search.""" 132 133 organization_id: str = Field(description="The organization UUID") 134 workspace_id: str = Field(description="The workspace UUID") 135 workspace_name: str = Field(description="The name of the workspace") 136 slug: str | None = Field( 137 default=None, description="The workspace slug (URL-friendly identifier)" 138 ) 139 email: str | None = Field( 140 default=None, description="The email address associated with the workspace" 141 ) 142 dataplane_group_id: str | None = Field( 143 default=None, description="The dataplane group UUID (region)" 144 ) 145 dataplane_name: str | None = Field( 146 default=None, description="The name of the dataplane (e.g., 'US', 'EU')" 147 ) 148 created_at: datetime | None = Field( 149 default=None, description="When the workspace was created" 150 ) 151 customer_tier: str | None = Field( 152 default=None, 153 description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.", 154 ) 155 is_eu: bool | None = Field( 156 default=None, 157 description="Whether the workspace is in the EU region (derived from dataplane_name).", 158 ) 159 160 161class WorkspacesByEmailDomainResult(BaseModel): 162 """Result of looking up workspaces by email domain.""" 163 164 email_domain: str = Field( 165 description="The email domain that was searched for (e.g., 'motherduck.com')" 166 ) 167 total_workspaces_found: int = Field( 168 description="Total number of workspaces matching the email domain" 169 ) 170 unique_organization_ids: list[str] = Field( 171 description="List of unique organization IDs found" 172 ) 173 workspaces: list[WorkspaceInfo] = Field( 174 description="List of workspaces matching the email domain" 175 ) 176 177 178class LatestAttemptBreakdown(BaseModel): 179 """Breakdown of connections by latest attempt status.""" 180 181 succeeded: int = Field( 182 default=0, description="Connections where latest attempt succeeded" 183 ) 184 failed: int = Field( 185 default=0, description="Connections where latest attempt failed" 186 ) 187 cancelled: int = Field( 188 default=0, description="Connections where latest attempt was cancelled" 189 ) 190 running: int = Field( 191 default=0, description="Connections where latest attempt is still running" 192 ) 193 unknown: int = Field( 194 default=0, 195 description="Connections with no recent attempts in the lookback window", 196 ) 197 198 199class VersionPinStats(BaseModel): 200 """Stats for connections pinned to a specific version.""" 201 202 pinned_version_id: str | None = Field( 203 description="The connector version UUID (None for unpinned connections)" 204 ) 205 docker_image_tag: str | None = Field( 206 default=None, description="The docker image tag for this version" 207 ) 208 total_connections: int = Field(description="Total number of connections") 209 enabled_connections: int = Field( 210 description="Number of enabled (active status) connections" 211 ) 212 active_connections: int = Field( 213 description="Number of connections with recent sync activity" 214 ) 215 latest_attempt: LatestAttemptBreakdown = Field( 216 description="Breakdown by latest attempt status" 217 ) 218 219 220class ConnectorConnectionStats(BaseModel): 221 """Aggregate connection stats for a connector.""" 222 223 connector_definition_id: str = Field(description="The connector definition UUID") 224 connector_type: str = Field(description="'source' or 'destination'") 225 canonical_name: str | None = Field( 226 default=None, description="The canonical connector name if resolved" 227 ) 228 total_connections: int = Field( 229 description="Total number of non-deprecated connections" 230 ) 231 enabled_connections: int = Field( 232 description="Number of enabled (active status) connections" 233 ) 234 active_connections: int = Field( 235 description="Number of connections with recent sync activity" 236 ) 237 pinned_connections: int = Field( 238 description="Number of connections with explicit version pins" 239 ) 240 unpinned_connections: int = Field( 241 description="Number of connections on default version" 242 ) 243 latest_attempt: LatestAttemptBreakdown = Field( 244 description="Overall breakdown by latest attempt status" 245 ) 246 by_version: list[VersionPinStats] = Field( 247 description="Stats broken down by pinned version" 248 ) 249 250 251class ConnectorConnectionStatsResponse(BaseModel): 252 """Response containing connection stats for multiple connectors.""" 253 254 sources: list[ConnectorConnectionStats] = Field( 255 default_factory=list, description="Stats for source connectors" 256 ) 257 destinations: list[ConnectorConnectionStats] = Field( 258 default_factory=list, description="Stats for destination connectors" 259 ) 260 lookback_days: int = Field( 261 description="Lookback window used for 'active' connections" 262 ) 263 generated_at: datetime = Field(description="When this response was generated") 264 265 266def _opt_str(value: Any) -> str | None: 267 """Convert a nullable value to str, returning None if the value is None/falsy.""" 268 return str(value) if value else None 269 270 271@mcp_tool( 272 read_only=True, 273 idempotent=True, 274) 275def query_prod_dataplanes() -> list[dict[str, Any]]: 276 """List all dataplane groups with workspace counts. 277 278 Returns information about all active dataplane groups in Airbyte Cloud, 279 including the number of workspaces in each. Useful for understanding 280 the distribution of workspaces across regions (US, US-Central, EU). 281 282 Returns list of dicts with keys: dataplane_group_id, dataplane_name, 283 organization_id, enabled, tombstone, created_at, workspace_count 284 """ 285 return query_dataplanes_list() 286 287 288@mcp_tool( 289 read_only=True, 290 idempotent=True, 291) 292def query_prod_workspace_info( 293 workspace_id: Annotated[ 294 str | WorkspaceAliasEnum, 295 Field( 296 description="Workspace UUID or alias to look up. " 297 "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace." 298 ), 299 ], 300) -> dict[str, Any] | None: 301 """Get workspace information including dataplane group. 302 303 Returns details about a specific workspace, including which dataplane 304 (region) it belongs to. Useful for determining if a workspace is in 305 the EU region for filtering purposes. 306 307 Returns dict with keys: workspace_id, workspace_name, slug, organization_id, 308 dataplane_group_id, dataplane_name, created_at, tombstone 309 Or None if workspace not found. 310 """ 311 # Resolve workspace ID alias (workspace_id is required, so resolved value is never None) 312 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 313 assert resolved_workspace_id is not None # Type narrowing: workspace_id is required 314 315 return query_workspace_info(resolved_workspace_id) 316 317 318@mcp_tool( 319 read_only=True, 320 idempotent=True, 321) 322def query_prod_connector_versions( 323 connector_definition_id: Annotated[ 324 str, 325 Field(description="Connector definition UUID to list versions for"), 326 ], 327) -> list[dict[str, Any]]: 328 """List all versions for a connector definition. 329 330 Returns all published versions of a connector, ordered by last_published 331 date descending. Useful for understanding version history and finding 332 specific version IDs for pinning or rollout monitoring. 333 334 Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, 335 release_stage, support_level, cdk_version, language, last_published, release_date 336 """ 337 return query_connector_versions(connector_definition_id) 338 339 340@mcp_tool( 341 read_only=True, 342 idempotent=True, 343) 344def query_prod_new_connector_releases( 345 days: Annotated[ 346 int, 347 Field(description="Number of days to look back (default: 7)", default=7), 348 ] = 7, 349 limit: Annotated[ 350 int, 351 Field(description="Maximum number of results (default: 100)", default=100), 352 ] = 100, 353) -> list[dict[str, Any]]: 354 """List recently published connector versions. 355 356 Returns connector versions published within the specified number of days. 357 Uses last_published timestamp which reflects when the version was actually 358 deployed to the registry (not the changelog date). 359 360 Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, 361 docker_image_tag, last_published, release_date, release_stage, support_level, 362 cdk_version, language, created_at 363 """ 364 return query_new_connector_releases(days=days, limit=limit) 365 366 367@mcp_tool( 368 read_only=True, 369 idempotent=True, 370) 371def query_prod_actors_by_pinned_connector_version( 372 connector_version_id: Annotated[ 373 str, 374 Field(description="Connector version UUID to find pinned instances for"), 375 ], 376) -> list[dict[str, Any]]: 377 """List actors (sources/destinations) effectively pinned to a specific connector version. 378 379 Returns all actors that are effectively pinned to a specific connector version, 380 considering all scope levels: actor-level pins, workspace-level pins, and 381 organization-level pins (with actor > workspace > organization precedence). 382 Useful for monitoring rollouts and understanding which customers are affected. 383 384 The actor_id field is the actor ID (superset of source_id/destination_id). 385 386 Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, 387 origin, description, created_at, expires_at, pin_scope_type, actor_name, 388 workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name 389 390 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 391 level the effective pin came from. 392 """ 393 return query_actors_pinned_to_version(connector_version_id) 394 395 396@mcp_tool( 397 read_only=True, 398 idempotent=True, 399) 400def query_prod_recent_syncs_for_version_pinned_connector( 401 connector_version_id: Annotated[ 402 str, 403 Field(description="Connector version UUID to find sync results for"), 404 ], 405 days: Annotated[ 406 int, 407 Field(description="Number of days to look back (default: 7)", default=7), 408 ] = 7, 409 limit: Annotated[ 410 int, 411 Field(description="Maximum number of results (default: 100)", default=100), 412 ] = 100, 413 successful_only: Annotated[ 414 bool, 415 Field( 416 description="If True, only return successful syncs (default: False)", 417 default=False, 418 ), 419 ] = False, 420) -> list[dict[str, Any]]: 421 """List sync job results for actors PINNED to a specific connector version. 422 423 IMPORTANT: This tool ONLY returns results for actors that are effectively 424 pinned to the specified version via scoped_configuration at any scope level 425 (actor, workspace, or organization). Most connections run unpinned and will 426 NOT appear in these results. 427 428 Use this tool when you want to monitor rollout health for actors that have been 429 pinned to a pre-release or specific version. For finding healthy connections 430 across ALL actors using a connector type (regardless of pinning), 431 use query_prod_recent_syncs_for_connector instead. 432 433 The actor_id field is the actor ID (superset of source_id/destination_id). 434 435 Returns list of dicts with keys: job_id, connection_id, job_status, started_at, 436 job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, 437 pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, 438 organization_id, dataplane_group_id, dataplane_name 439 440 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 441 level the effective pin came from. 442 """ 443 return query_syncs_for_version_pinned_connector( 444 connector_version_id, 445 days=days, 446 limit=limit, 447 successful_only=successful_only, 448 ) 449 450 451@mcp_tool( 452 read_only=True, 453 idempotent=True, 454 open_world=True, 455) 456def query_prod_recent_syncs_for_connector( 457 source_definition_id: Annotated[ 458 str | None, 459 Field( 460 description=( 461 "Source connector definition ID (UUID) to search for. " 462 "Provide this OR source_canonical_name OR destination_definition_id " 463 "OR destination_canonical_name (exactly one required). " 464 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 465 ), 466 default=None, 467 ), 468 ], 469 source_canonical_name: Annotated[ 470 str | None, 471 Field( 472 description=( 473 "Canonical source connector name to search for. " 474 "Provide this OR source_definition_id OR destination_definition_id " 475 "OR destination_canonical_name (exactly one required). " 476 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 477 ), 478 default=None, 479 ), 480 ], 481 destination_definition_id: Annotated[ 482 str | None, 483 Field( 484 description=( 485 "Destination connector definition ID (UUID) to search for. " 486 "Provide this OR destination_canonical_name OR source_definition_id " 487 "OR source_canonical_name (exactly one required). " 488 "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB." 489 ), 490 default=None, 491 ), 492 ], 493 destination_canonical_name: Annotated[ 494 str | None, 495 Field( 496 description=( 497 "Canonical destination connector name to search for. " 498 "Provide this OR destination_definition_id OR source_definition_id " 499 "OR source_canonical_name (exactly one required). " 500 "Examples: 'destination-duckdb', 'DuckDB'." 501 ), 502 default=None, 503 ), 504 ], 505 status_filter: Annotated[ 506 StatusFilter, 507 Field( 508 description=( 509 "Filter by job status: 'all' (default), 'succeeded', or 'failed'. " 510 "Use 'succeeded' to find healthy connections with recent successful syncs. " 511 "Use 'failed' to find connections with recent failures." 512 ), 513 default=StatusFilter.ALL, 514 ), 515 ], 516 organization_id: Annotated[ 517 str | OrganizationAliasEnum | None, 518 Field( 519 description=( 520 "Optional organization ID (UUID) or alias to filter results. " 521 "If provided, only syncs from this organization will be returned. " 522 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 523 ), 524 default=None, 525 ), 526 ], 527 lookback_days: Annotated[ 528 int, 529 Field(description="Number of days to look back (default: 7)", default=7), 530 ], 531 limit: Annotated[ 532 int, 533 Field(description="Maximum number of results (default: 100)", default=100), 534 ], 535 customer_tier_filter: Annotated[ 536 TierFilter, 537 Field( 538 description=( 539 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 540 "Filters results to only include connections belonging to organizations " 541 "in the specified tier. Use 'ALL' to include all tiers." 542 ), 543 ), 544 ] = "TIER_2", 545 *, 546 exclude_pinned: Annotated[ 547 bool, 548 Field( 549 description=( 550 "If True, exclude syncs for actors that are already pinned to a " 551 "specific version (at any scope level: actor, workspace, or organization). " 552 "Useful for 'prove fix' workflows where you want to find unpinned " 553 "connections for live testing. Default: False (include all syncs)." 554 ), 555 default=False, 556 ), 557 ], 558) -> list[dict[str, Any]]: 559 """List recent sync jobs for ALL actors using a connector type. 560 561 This tool finds all actors with the given connector definition and returns their 562 recent sync jobs, regardless of whether they have explicit version pins. It filters 563 out deleted actors, deleted workspaces, and deprecated connections. 564 565 Results are always enriched with customer_tier and is_eu fields. 566 The customer_tier_filter parameter is required to ensure tier-aware querying. 567 568 Use this tool to: 569 - Find healthy connections with recent successful syncs (status_filter='succeeded') 570 - Investigate connector issues across all users (status_filter='failed') 571 - Get an overview of all recent sync activity (status_filter='all') 572 573 Set exclude_pinned=True to filter out syncs for actors that are already pinned to a 574 specific version. This is useful for 'prove fix' live connection testing workflows 575 where you want to find unpinned connections to test against. 576 577 Supports both SOURCE and DESTINATION connectors. Provide exactly one of: 578 source_definition_id, source_canonical_name, destination_definition_id, 579 or destination_canonical_name. 580 581 Key fields in results: 582 - job_status: 'succeeded', 'failed', 'cancelled', etc. 583 - connection_id, connection_name: The connection that ran the sync 584 - actor_id, actor_name: The source or destination actor 585 - customer_tier: TIER_0, TIER_1, or TIER_2 586 - is_eu: Whether the workspace is in the EU region 587 - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned) 588 - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned) 589 """ 590 # Validate that exactly one connector parameter is provided 591 provided_params = [ 592 source_definition_id, 593 source_canonical_name, 594 destination_definition_id, 595 destination_canonical_name, 596 ] 597 num_provided = sum(p is not None for p in provided_params) 598 if num_provided != 1: 599 raise PyAirbyteInputError( 600 message=( 601 "Exactly one of source_definition_id, source_canonical_name, " 602 "destination_definition_id, or destination_canonical_name must be provided." 603 ), 604 ) 605 606 # Determine if this is a destination connector 607 is_destination = ( 608 destination_definition_id is not None or destination_canonical_name is not None 609 ) 610 611 # Resolve canonical name to definition ID if needed 612 resolved_definition_id: str 613 if source_canonical_name: 614 resolved_definition_id = resolve_canonical_name_to_definition_id( 615 canonical_name=source_canonical_name, 616 ) 617 elif destination_canonical_name: 618 resolved_definition_id = resolve_canonical_name_to_definition_id( 619 canonical_name=destination_canonical_name, 620 ) 621 elif source_definition_id: 622 resolved_definition_id = source_definition_id 623 else: 624 # We've validated exactly one param is provided, so this must be set 625 assert destination_definition_id is not None 626 resolved_definition_id = destination_definition_id 627 628 # Resolve organization ID alias 629 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 630 631 rows = query_recent_syncs_for_connector( 632 connector_definition_id=resolved_definition_id, 633 is_destination=is_destination, 634 status_filter=status_filter, 635 organization_id=resolved_organization_id, 636 days=lookback_days, 637 limit=limit, 638 exclude_pinned=exclude_pinned, 639 ) 640 641 enriched = enrich_rows_by_org(rows) 642 return filter_rows_by_tier(enriched, customer_tier_filter) 643 644 645@mcp_tool( 646 read_only=True, 647 idempotent=True, 648 open_world=True, 649) 650def query_prod_failed_sync_attempts_for_connector( 651 source_definition_id: Annotated[ 652 str | None, 653 Field( 654 description=( 655 "Source connector definition ID (UUID) to search for. " 656 "Exactly one of this or source_canonical_name is required. " 657 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 658 ), 659 default=None, 660 ), 661 ] = None, 662 source_canonical_name: Annotated[ 663 str | None, 664 Field( 665 description=( 666 "Canonical source connector name to search for. " 667 "Exactly one of this or source_definition_id is required. " 668 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 669 ), 670 default=None, 671 ), 672 ] = None, 673 organization_id: Annotated[ 674 str | OrganizationAliasEnum | None, 675 Field( 676 description=( 677 "Optional organization ID (UUID) or alias to filter results. " 678 "If provided, only failed attempts from this organization will be returned. " 679 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 680 ), 681 default=None, 682 ), 683 ] = None, 684 lookback_days: Annotated[ 685 int, 686 Field(description="Number of days to look back (default: 7)", default=7), 687 ] = 7, 688 limit: Annotated[ 689 int, 690 Field(description="Maximum number of results (default: 100)", default=100), 691 ] = 100, 692 customer_tier_filter: Annotated[ 693 TierFilter, 694 Field( 695 description=( 696 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 697 "Filters results to only include connections belonging to organizations " 698 "in the specified tier. Use 'ALL' to include all tiers." 699 ), 700 ), 701 ] = "TIER_2", 702) -> list[dict[str, Any]]: 703 """List failed sync attempts for ALL actors using a source connector type. 704 705 This tool finds all actors with the given connector definition and returns their 706 failed sync attempts, regardless of whether they have explicit version pins. 707 708 Results are always enriched with customer_tier and is_eu fields. 709 The customer_tier_filter parameter is required to ensure tier-aware querying. 710 711 This is useful for investigating connector issues across all users. Use this when 712 you want to find failures for a connector type regardless of which version users 713 are on. 714 715 Note: This tool only supports SOURCE connectors. For destination connectors, 716 a separate tool would be needed. 717 718 Key fields in results: 719 - failure_summary: JSON containing failure details including failureType and messages 720 - customer_tier: TIER_0, TIER_1, or TIER_2 721 - is_eu: Whether the workspace is in the EU region 722 - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned) 723 - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned) 724 """ 725 # Validate that exactly one of the two parameters is provided 726 if (source_definition_id is None) == (source_canonical_name is None): 727 raise PyAirbyteInputError( 728 message=( 729 "Exactly one of source_definition_id or source_canonical_name " 730 "must be provided, but not both." 731 ), 732 ) 733 734 # Resolve canonical name to definition ID if needed 735 resolved_definition_id: str 736 if source_canonical_name: 737 resolved_definition_id = resolve_canonical_name_to_definition_id( 738 canonical_name=source_canonical_name, 739 ) 740 else: 741 resolved_definition_id = source_definition_id # type: ignore[assignment] 742 743 # Resolve organization ID alias 744 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 745 746 rows = query_failed_sync_attempts_for_connector( 747 connector_definition_id=resolved_definition_id, 748 organization_id=resolved_organization_id, 749 days=lookback_days, 750 limit=limit, 751 ) 752 enriched = enrich_rows_by_org(rows) 753 return filter_rows_by_tier(enriched, customer_tier_filter) 754 755 756@mcp_tool( 757 read_only=True, 758 idempotent=True, 759 open_world=True, 760) 761def query_prod_connections_by_connector( 762 source_definition_id: Annotated[ 763 str | None, 764 Field( 765 description=( 766 "Source connector definition ID (UUID) to search for. " 767 "Exactly one of source_definition_id, source_canonical_name, " 768 "destination_definition_id, or destination_canonical_name is required. " 769 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 770 ), 771 default=None, 772 ), 773 ] = None, 774 source_canonical_name: Annotated[ 775 str | None, 776 Field( 777 description=( 778 "Canonical source connector name to search for. " 779 "Exactly one of source_definition_id, source_canonical_name, " 780 "destination_definition_id, or destination_canonical_name is required. " 781 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 782 ), 783 default=None, 784 ), 785 ] = None, 786 destination_definition_id: Annotated[ 787 str | None, 788 Field( 789 description=( 790 "Destination connector definition ID (UUID) to search for. " 791 "Exactly one of source_definition_id, source_canonical_name, " 792 "destination_definition_id, or destination_canonical_name is required. " 793 "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB." 794 ), 795 default=None, 796 ), 797 ] = None, 798 destination_canonical_name: Annotated[ 799 str | None, 800 Field( 801 description=( 802 "Canonical destination connector name to search for. " 803 "Exactly one of source_definition_id, source_canonical_name, " 804 "destination_definition_id, or destination_canonical_name is required. " 805 "Examples: 'destination-duckdb', 'DuckDB'." 806 ), 807 default=None, 808 ), 809 ] = None, 810 organization_id: Annotated[ 811 str | OrganizationAliasEnum | None, 812 Field( 813 description=( 814 "Optional organization ID (UUID) or alias to filter results. " 815 "If provided, only connections in this organization will be returned. " 816 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 817 ), 818 default=None, 819 ), 820 ] = None, 821 limit: Annotated[ 822 int, 823 Field(description="Maximum number of results (default: 1000)", default=1000), 824 ] = 1000, 825 customer_tier_filter: Annotated[ 826 TierFilter, 827 Field( 828 description=( 829 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 830 "Filters results to only include connections belonging to organizations " 831 "in the specified tier. Use 'ALL' to include all tiers." 832 ), 833 ), 834 ] = "TIER_2", 835 *, 836 exclude_pinned: Annotated[ 837 bool, 838 Field( 839 description=( 840 "If True, exclude connections whose connector is already pinned to a " 841 "specific version (at any scope level: actor, workspace, or organization). " 842 "Useful for 'prove fix' workflows where you want to find unpinned " 843 "connections for live testing. Default: False (include all connections)." 844 ), 845 default=False, 846 ), 847 ], 848) -> list[dict[str, Any]]: 849 """Search for all connections using a specific source or destination connector type. 850 851 This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. 852 It finds all connections where the source or destination connector matches the 853 specified type, regardless of how the connector is named by users. 854 855 Results are always enriched with customer_tier and is_eu fields. 856 The customer_tier_filter parameter is required to ensure tier-aware querying. 857 858 Optionally filter by organization_id to limit results to a specific organization. 859 Use '@airbyte-internal' as an alias for the Airbyte internal organization. 860 861 Set exclude_pinned=True to filter out connections that are already pinned to a 862 specific version. This is useful for 'prove fix' live connection testing workflows 863 where you want to find unpinned connections to test against. 864 865 Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. 866 For source queries, returns: connection_id, connection_name, connection_url, source_id, 867 source_name, source_definition_id, workspace_id, workspace_name, organization_id, 868 dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, 869 pin_scope_type, customer_tier, is_eu. 870 For destination queries, returns: connection_id, connection_name, connection_url, 871 destination_id, destination_name, destination_definition_id, workspace_id, 872 workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, 873 pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. 874 875 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 876 level the effective pin came from (NULL if not pinned). 877 """ 878 # Validate that exactly one of the four connector parameters is provided 879 provided_params = [ 880 source_definition_id, 881 source_canonical_name, 882 destination_definition_id, 883 destination_canonical_name, 884 ] 885 num_provided = sum(p is not None for p in provided_params) 886 if num_provided != 1: 887 raise PyAirbyteInputError( 888 message=( 889 "Exactly one of source_definition_id, source_canonical_name, " 890 "destination_definition_id, or destination_canonical_name must be provided." 891 ), 892 ) 893 894 # Determine if this is a source or destination query and resolve the definition ID 895 is_source_query = ( 896 source_definition_id is not None or source_canonical_name is not None 897 ) 898 resolved_definition_id: str 899 900 if source_canonical_name: 901 resolved_definition_id = resolve_canonical_name_to_definition_id( 902 canonical_name=source_canonical_name, 903 ) 904 elif source_definition_id: 905 resolved_definition_id = source_definition_id 906 elif destination_canonical_name: 907 resolved_definition_id = resolve_canonical_name_to_definition_id( 908 canonical_name=destination_canonical_name, 909 ) 910 else: 911 resolved_definition_id = destination_definition_id # type: ignore[assignment] 912 913 # Resolve organization ID alias 914 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 915 916 # Query the database based on connector type 917 if is_source_query: 918 rows = [ 919 { 920 "organization_id": str(row.get("organization_id", "")), 921 "workspace_id": str(row["workspace_id"]), 922 "workspace_name": row.get("workspace_name", ""), 923 "connection_id": str(row["connection_id"]), 924 "connection_name": row.get("connection_name", ""), 925 "connection_url": ( 926 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 927 f"/connections/{row['connection_id']}/status" 928 ), 929 "source_id": str(row["source_id"]), 930 "source_name": row.get("source_name", ""), 931 "source_definition_id": str(row["source_definition_id"]), 932 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 933 "dataplane_name": row.get("dataplane_name", ""), 934 "pin_origin_type": row.get("pin_origin_type"), 935 "pin_origin": row.get("pin_origin"), 936 "pinned_version_id": _opt_str(row.get("pinned_version_id")), 937 "pin_scope_type": row.get("pin_scope_type"), 938 } 939 for row in query_connections_by_connector( 940 connector_definition_id=resolved_definition_id, 941 organization_id=resolved_organization_id, 942 limit=limit, 943 exclude_pinned=exclude_pinned, 944 ) 945 ] 946 else: 947 # Destination query 948 rows = [ 949 { 950 "organization_id": str(row.get("organization_id", "")), 951 "workspace_id": str(row["workspace_id"]), 952 "workspace_name": row.get("workspace_name", ""), 953 "connection_id": str(row["connection_id"]), 954 "connection_name": row.get("connection_name", ""), 955 "connection_url": ( 956 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 957 f"/connections/{row['connection_id']}/status" 958 ), 959 "destination_id": str(row["destination_id"]), 960 "destination_name": row.get("destination_name", ""), 961 "destination_definition_id": str(row["destination_definition_id"]), 962 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 963 "dataplane_name": row.get("dataplane_name", ""), 964 "pin_origin_type": row.get("pin_origin_type"), 965 "pin_origin": row.get("pin_origin"), 966 "pinned_version_id": _opt_str(row.get("pinned_version_id")), 967 "pin_scope_type": row.get("pin_scope_type"), 968 } 969 for row in query_connections_by_destination_connector( 970 connector_definition_id=resolved_definition_id, 971 organization_id=resolved_organization_id, 972 limit=limit, 973 exclude_pinned=exclude_pinned, 974 ) 975 ] 976 977 enriched = enrich_rows_by_org(rows) 978 return filter_rows_by_tier(enriched, customer_tier_filter) 979 980 981@mcp_tool( 982 read_only=True, 983 idempotent=True, 984 open_world=True, 985) 986def query_prod_connections_by_stream( 987 stream_name: Annotated[ 988 str, 989 Field( 990 description=( 991 "Name of the stream to search for in connection catalogs. " 992 "This must match the exact stream name as configured in the connection. " 993 "Examples: 'global_exclusions', 'campaigns', 'users'." 994 ), 995 ), 996 ], 997 source_definition_id: Annotated[ 998 str | None, 999 Field( 1000 description=( 1001 "Source connector definition ID (UUID) to search for. " 1002 "Provide this OR source_canonical_name (exactly one required). " 1003 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 1004 ), 1005 default=None, 1006 ), 1007 ], 1008 source_canonical_name: Annotated[ 1009 str | None, 1010 Field( 1011 description=( 1012 "Canonical source connector name to search for. " 1013 "Provide this OR source_definition_id (exactly one required). " 1014 "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'." 1015 ), 1016 default=None, 1017 ), 1018 ], 1019 organization_id: Annotated[ 1020 str | OrganizationAliasEnum | None, 1021 Field( 1022 description=( 1023 "Optional organization ID (UUID) or alias to filter results. " 1024 "If provided, only connections in this organization will be returned. " 1025 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 1026 ), 1027 default=None, 1028 ), 1029 ], 1030 limit: Annotated[ 1031 int, 1032 Field(description="Maximum number of results (default: 100)", default=100), 1033 ], 1034 customer_tier_filter: Annotated[ 1035 TierFilter, 1036 Field( 1037 description=( 1038 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 1039 "Filters results to only include connections belonging to organizations " 1040 "in the specified tier. Use 'ALL' to include all tiers." 1041 ), 1042 ), 1043 ] = "TIER_2", 1044) -> list[dict[str, Any]]: 1045 """Find connections that have a specific stream enabled in their catalog. 1046 1047 This tool searches the connection's configured catalog (JSONB) for streams 1048 matching the specified name. It's particularly useful when validating 1049 connector fixes that affect specific streams - you can quickly find 1050 customer connections that use the affected stream. 1051 1052 Results are always enriched with customer_tier and is_eu fields. 1053 The customer_tier_filter parameter is required to ensure tier-aware querying. 1054 1055 Use cases: 1056 - Finding connections with a specific stream enabled for regression testing 1057 - Validating connector fixes that affect particular streams 1058 - Identifying which customers use rarely-enabled streams 1059 1060 Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. 1061 """ 1062 provided_params = [source_definition_id, source_canonical_name] 1063 num_provided = sum(p is not None for p in provided_params) 1064 if num_provided != 1: 1065 raise PyAirbyteInputError( 1066 message=( 1067 "Exactly one of source_definition_id or source_canonical_name " 1068 "must be provided." 1069 ), 1070 ) 1071 1072 resolved_definition_id: str 1073 if source_canonical_name: 1074 resolved_definition_id = resolve_canonical_name_to_definition_id( 1075 canonical_name=source_canonical_name, 1076 ) 1077 else: 1078 assert source_definition_id is not None 1079 resolved_definition_id = source_definition_id 1080 1081 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 1082 1083 rows = [ 1084 { 1085 "organization_id": str(row.get("organization_id", "")), 1086 "workspace_id": str(row["workspace_id"]), 1087 "workspace_name": row.get("workspace_name", ""), 1088 "connection_id": str(row["connection_id"]), 1089 "connection_name": row.get("connection_name", ""), 1090 "connection_status": row.get("connection_status", ""), 1091 "connection_url": ( 1092 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 1093 f"/connections/{row['connection_id']}/status" 1094 ), 1095 "source_id": str(row["source_id"]), 1096 "source_name": row.get("source_name", ""), 1097 "source_definition_id": str(row["source_definition_id"]), 1098 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 1099 "dataplane_name": row.get("dataplane_name", ""), 1100 } 1101 for row in query_connections_by_stream( 1102 connector_definition_id=resolved_definition_id, 1103 stream_name=stream_name, 1104 organization_id=resolved_organization_id, 1105 limit=limit, 1106 ) 1107 ] 1108 enriched = enrich_rows_by_org(rows) 1109 return filter_rows_by_tier(enriched, customer_tier_filter) 1110 1111 1112@mcp_tool( 1113 read_only=True, 1114 idempotent=True, 1115) 1116def query_prod_workspaces_by_email_domain( 1117 email_domain: Annotated[ 1118 str, 1119 Field( 1120 description=( 1121 "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). " 1122 "Do not include the '@' symbol. This will find workspaces where users " 1123 "have email addresses with this domain." 1124 ), 1125 ), 1126 ], 1127 limit: Annotated[ 1128 int, 1129 Field( 1130 description="Maximum number of workspaces to return (default: 100)", 1131 default=100, 1132 ), 1133 ] = 100, 1134) -> WorkspacesByEmailDomainResult: 1135 """Find workspaces by email domain. 1136 1137 This tool searches for workspaces where users have email addresses matching 1138 the specified domain. This is useful for identifying workspaces belonging to 1139 specific companies - for example, searching for "motherduck.com" will find 1140 workspaces belonging to MotherDuck employees. 1141 1142 Use cases: 1143 - Finding partner organization connections for testing connector fixes 1144 - Identifying internal test accounts for specific integrations 1145 - Locating workspaces belonging to technology partners 1146 1147 The returned organization IDs can be used with other tools like 1148 `query_prod_connections_by_connector` to find connections within 1149 those organizations for safe testing. 1150 """ 1151 # Strip leading @ if provided 1152 clean_domain = email_domain.lstrip("@") 1153 1154 # Query the database 1155 rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit) 1156 1157 # Convert rows to Pydantic models 1158 workspaces = [ 1159 WorkspaceInfo( 1160 organization_id=str(row["organization_id"]), 1161 workspace_id=str(row["workspace_id"]), 1162 workspace_name=row.get("workspace_name", ""), 1163 slug=row.get("slug"), 1164 email=row.get("email"), 1165 dataplane_group_id=_opt_str(row.get("dataplane_group_id")), 1166 dataplane_name=row.get("dataplane_name"), 1167 created_at=row.get("created_at"), 1168 ) 1169 for row in rows 1170 ] 1171 1172 # Enrich with tier annotation (annotation only, no filtering) 1173 unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces)) 1174 tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)} 1175 for ws in workspaces: 1176 tier_result = tier_results.get(ws.organization_id) 1177 if tier_result: 1178 ws.customer_tier = tier_result.customer_tier 1179 ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False 1180 1181 return WorkspacesByEmailDomainResult( 1182 email_domain=clean_domain, 1183 total_workspaces_found=len(workspaces), 1184 unique_organization_ids=unique_org_ids, 1185 workspaces=workspaces, 1186 ) 1187 1188 1189def _build_connector_stats( 1190 connector_definition_id: str, 1191 connector_type: str, 1192 canonical_name: str | None, 1193 rows: list[dict[str, Any]], 1194 version_tags: dict[str, str | None], 1195) -> ConnectorConnectionStats: 1196 """Build ConnectorConnectionStats from query result rows.""" 1197 # Aggregate totals across all version groups 1198 total_connections = 0 1199 enabled_connections = 0 1200 active_connections = 0 1201 pinned_connections = 0 1202 unpinned_connections = 0 1203 total_succeeded = 0 1204 total_failed = 0 1205 total_cancelled = 0 1206 total_running = 0 1207 total_unknown = 0 1208 1209 by_version: list[VersionPinStats] = [] 1210 1211 for row in rows: 1212 version_id = row.get("pinned_version_id") 1213 row_total = int(row.get("total_connections", 0)) 1214 row_enabled = int(row.get("enabled_connections", 0)) 1215 row_active = int(row.get("active_connections", 0)) 1216 row_pinned = int(row.get("pinned_connections", 0)) 1217 row_unpinned = int(row.get("unpinned_connections", 0)) 1218 row_succeeded = int(row.get("succeeded_connections", 0)) 1219 row_failed = int(row.get("failed_connections", 0)) 1220 row_cancelled = int(row.get("cancelled_connections", 0)) 1221 row_running = int(row.get("running_connections", 0)) 1222 row_unknown = int(row.get("unknown_connections", 0)) 1223 1224 total_connections += row_total 1225 enabled_connections += row_enabled 1226 active_connections += row_active 1227 pinned_connections += row_pinned 1228 unpinned_connections += row_unpinned 1229 total_succeeded += row_succeeded 1230 total_failed += row_failed 1231 total_cancelled += row_cancelled 1232 total_running += row_running 1233 total_unknown += row_unknown 1234 1235 by_version.append( 1236 VersionPinStats( 1237 pinned_version_id=str(version_id) if version_id else None, 1238 docker_image_tag=version_tags.get(str(version_id)) 1239 if version_id 1240 else None, 1241 total_connections=row_total, 1242 enabled_connections=row_enabled, 1243 active_connections=row_active, 1244 latest_attempt=LatestAttemptBreakdown( 1245 succeeded=row_succeeded, 1246 failed=row_failed, 1247 cancelled=row_cancelled, 1248 running=row_running, 1249 unknown=row_unknown, 1250 ), 1251 ) 1252 ) 1253 1254 return ConnectorConnectionStats( 1255 connector_definition_id=connector_definition_id, 1256 connector_type=connector_type, 1257 canonical_name=canonical_name, 1258 total_connections=total_connections, 1259 enabled_connections=enabled_connections, 1260 active_connections=active_connections, 1261 pinned_connections=pinned_connections, 1262 unpinned_connections=unpinned_connections, 1263 latest_attempt=LatestAttemptBreakdown( 1264 succeeded=total_succeeded, 1265 failed=total_failed, 1266 cancelled=total_cancelled, 1267 running=total_running, 1268 unknown=total_unknown, 1269 ), 1270 by_version=by_version, 1271 ) 1272 1273 1274@mcp_tool( 1275 read_only=True, 1276 idempotent=True, 1277 open_world=True, 1278) 1279def query_prod_connector_connection_stats( 1280 source_definition_ids: Annotated[ 1281 list[str] | None, 1282 Field( 1283 description=( 1284 "List of source connector definition IDs (UUIDs) to get stats for. " 1285 "Example: ['afa734e4-3571-11ec-991a-1e0031268139']" 1286 ), 1287 default=None, 1288 ), 1289 ] = None, 1290 destination_definition_ids: Annotated[ 1291 list[str] | None, 1292 Field( 1293 description=( 1294 "List of destination connector definition IDs (UUIDs) to get stats for. " 1295 "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']" 1296 ), 1297 default=None, 1298 ), 1299 ] = None, 1300 lookback_days: Annotated[ 1301 int, 1302 Field( 1303 description=( 1304 "Number of days to look back for 'active' connections (default: 7). " 1305 "Connections with sync activity within this window are counted as active." 1306 ), 1307 default=7, 1308 ), 1309 ] = 7, 1310) -> ConnectorConnectionStatsResponse: 1311 """Get aggregate connection stats for multiple connectors. 1312 1313 Returns counts of connections grouped by pinned version for each connector, 1314 including: 1315 - Total, enabled, and active connection counts 1316 - Pinned vs unpinned breakdown 1317 - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown) 1318 1319 This tool is designed for release monitoring workflows. It allows you to: 1320 1. Query recently released connectors to identify which ones to monitor 1321 2. Get aggregate stats showing how many connections are using each version 1322 3. See health metrics (pass/fail) broken down by version 1323 1324 The `lookback_days` parameter controls the lookback window for: 1325 - Counting 'active' connections (those with recent sync activity) 1326 - Determining 'latest attempt status' (most recent attempt within the window) 1327 1328 Connections with no sync activity in the lookback window will have 1329 'unknown' status in the latest_attempt breakdown. 1330 """ 1331 # Initialize empty lists if None 1332 source_ids = source_definition_ids or [] 1333 destination_ids = destination_definition_ids or [] 1334 1335 if not source_ids and not destination_ids: 1336 raise PyAirbyteInputError( 1337 message=( 1338 "At least one of source_definition_ids or destination_definition_ids " 1339 "must be provided." 1340 ), 1341 ) 1342 1343 sources: list[ConnectorConnectionStats] = [] 1344 destinations: list[ConnectorConnectionStats] = [] 1345 1346 # Process source connectors 1347 for source_def_id in source_ids: 1348 # Get version info for tag lookup 1349 versions = query_connector_versions(source_def_id) 1350 version_tags = { 1351 str(v["version_id"]): v.get("docker_image_tag") for v in versions 1352 } 1353 1354 # Get aggregate stats 1355 rows = query_source_connection_stats(source_def_id, days=lookback_days) 1356 1357 sources.append( 1358 _build_connector_stats( 1359 connector_definition_id=source_def_id, 1360 connector_type="source", 1361 canonical_name=None, 1362 rows=rows, 1363 version_tags=version_tags, 1364 ) 1365 ) 1366 1367 # Process destination connectors 1368 for dest_def_id in destination_ids: 1369 # Get version info for tag lookup 1370 versions = query_connector_versions(dest_def_id) 1371 version_tags = { 1372 str(v["version_id"]): v.get("docker_image_tag") for v in versions 1373 } 1374 1375 # Get aggregate stats 1376 rows = query_destination_connection_stats(dest_def_id, days=lookback_days) 1377 1378 destinations.append( 1379 _build_connector_stats( 1380 connector_definition_id=dest_def_id, 1381 connector_type="destination", 1382 canonical_name=None, 1383 rows=rows, 1384 version_tags=version_tags, 1385 ) 1386 ) 1387 1388 return ConnectorConnectionStatsResponse( 1389 sources=sources, 1390 destinations=destinations, 1391 lookback_days=lookback_days, 1392 generated_at=datetime.now(timezone.utc), 1393 ) 1394 1395 1396# ============================================================================= 1397# Connector Rollout Models and Tools 1398# ============================================================================= 1399 1400 1401class ConnectorRolloutInfo(BaseModel): 1402 """Information about a connector rollout.""" 1403 1404 rollout_id: str = Field(description="The rollout UUID") 1405 actor_definition_id: str = Field(description="The connector definition UUID") 1406 state: str = Field( 1407 description="Rollout state: initialized, workflow_started, in_progress, " 1408 "paused, finalizing, succeeded, errored, failed_rolled_back, canceled" 1409 ) 1410 initial_rollout_pct: int | None = Field( 1411 default=None, description="Initial rollout percentage" 1412 ) 1413 current_target_rollout_pct: int | None = Field( 1414 default=None, description="Current target rollout percentage" 1415 ) 1416 final_target_rollout_pct: int | None = Field( 1417 default=None, description="Final target rollout percentage" 1418 ) 1419 has_breaking_changes: bool = Field( 1420 description="Whether the RC has breaking changes" 1421 ) 1422 max_step_wait_time_mins: int | None = Field( 1423 default=None, description="Maximum wait time between rollout steps in minutes" 1424 ) 1425 rollout_strategy: str | None = Field( 1426 default=None, description="Rollout strategy: manual, automated, overridden" 1427 ) 1428 updated_by_user_id: str | None = Field( 1429 default=None, 1430 description="User ID recorded as last updating the rollout", 1431 ) 1432 updated_by_user_name: str | None = Field( 1433 default=None, 1434 description="Name recorded as last updating the rollout", 1435 ) 1436 updated_by_user_email: str | None = Field( 1437 default=None, 1438 description="Email recorded as last updating the rollout", 1439 ) 1440 workflow_run_id: str | None = Field( 1441 default=None, description="Temporal workflow run ID" 1442 ) 1443 error_msg: str | None = Field(default=None, description="Error message if errored") 1444 failed_reason: str | None = Field( 1445 default=None, description="Reason for failure if failed" 1446 ) 1447 paused_reason: str | None = Field( 1448 default=None, description="Reason for pause if paused" 1449 ) 1450 tag: str | None = Field(default=None, description="Optional tag for the rollout") 1451 created_at: datetime | None = Field( 1452 default=None, description="When the rollout was created" 1453 ) 1454 updated_at: datetime | None = Field( 1455 default=None, description="When the rollout was last updated" 1456 ) 1457 completed_at: datetime | None = Field( 1458 default=None, description="When the rollout completed (if terminal)" 1459 ) 1460 expires_at: datetime | None = Field( 1461 default=None, description="When the rollout expires" 1462 ) 1463 rc_docker_image_tag: str | None = Field( 1464 default=None, description="Docker image tag of the release candidate" 1465 ) 1466 rc_docker_repository: str | None = Field( 1467 default=None, description="Docker repository of the release candidate" 1468 ) 1469 initial_docker_image_tag: str | None = Field( 1470 default=None, description="Docker image tag of the initial version" 1471 ) 1472 initial_docker_repository: str | None = Field( 1473 default=None, description="Docker repository of the initial version" 1474 ) 1475 filters: dict[str, Any] | None = Field( 1476 default=None, 1477 description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})", 1478 ) 1479 customer_tier: str | None = Field( 1480 default=None, 1481 description="Customer tier targeted by this rollout (extracted from filters), " 1482 "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.", 1483 ) 1484 1485 1486def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None: 1487 """Parse the rollout filters field from a database row. 1488 1489 The filters field may be a JSON string, a dict, or None. 1490 """ 1491 if filters_raw is None: 1492 return None 1493 if isinstance(filters_raw, dict): 1494 return filters_raw 1495 if isinstance(filters_raw, str): 1496 try: 1497 parsed = json.loads(filters_raw) 1498 if isinstance(parsed, dict): 1499 return parsed 1500 except (json.JSONDecodeError, TypeError): 1501 pass 1502 return None 1503 1504 1505def _extract_tier_from_filters(filters_raw: Any) -> str | None: 1506 """Extract customer tier from rollout filters JSON. 1507 1508 Supports two formats: 1509 - Legacy: `{"tierFilter": {"tier": "TIER_0"}}` 1510 - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}` 1511 """ 1512 parsed = _parse_rollout_filters(filters_raw) 1513 if parsed is None: 1514 return None 1515 1516 # Current format: customerTierFilters list 1517 tier_filters = parsed.get("customerTierFilters") 1518 if isinstance(tier_filters, list): 1519 for entry in tier_filters: 1520 if isinstance(entry, dict) and entry.get("name") == "TIER": 1521 values = entry.get("value") 1522 if isinstance(values, list) and len(values) == 1: 1523 return str(values[0]) 1524 if isinstance(values, list) and len(values) > 1: 1525 return ", ".join(str(v) for v in values) 1526 1527 # Legacy format: tierFilter dict 1528 tier_filter = parsed.get("tierFilter") 1529 if isinstance(tier_filter, dict): 1530 tier = tier_filter.get("tier") 1531 if isinstance(tier, str): 1532 return tier 1533 1534 return None 1535 1536 1537def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo: 1538 """Convert a database row to a ConnectorRolloutInfo model.""" 1539 return ConnectorRolloutInfo( 1540 rollout_id=str(row["rollout_id"]), 1541 actor_definition_id=str(row["actor_definition_id"]), 1542 state=row["state"], 1543 initial_rollout_pct=row.get("initial_rollout_pct"), 1544 current_target_rollout_pct=row.get("current_target_rollout_pct"), 1545 final_target_rollout_pct=row.get("final_target_rollout_pct"), 1546 has_breaking_changes=row["has_breaking_changes"], 1547 max_step_wait_time_mins=row.get("max_step_wait_time_mins"), 1548 rollout_strategy=row.get("rollout_strategy"), 1549 updated_by_user_id=str(row["updated_by_user_id"]) 1550 if row.get("updated_by_user_id") is not None 1551 else None, 1552 updated_by_user_name=row.get("updated_by_user_name"), 1553 updated_by_user_email=row.get("updated_by_user_email"), 1554 workflow_run_id=row.get("workflow_run_id"), 1555 error_msg=row.get("error_msg"), 1556 failed_reason=row.get("failed_reason"), 1557 paused_reason=row.get("paused_reason"), 1558 tag=row.get("tag"), 1559 created_at=row.get("created_at"), 1560 updated_at=row.get("updated_at"), 1561 completed_at=row.get("completed_at"), 1562 expires_at=row.get("expires_at"), 1563 rc_docker_image_tag=row.get("rc_docker_image_tag"), 1564 rc_docker_repository=row.get("rc_docker_repository"), 1565 initial_docker_image_tag=row.get("initial_docker_image_tag"), 1566 initial_docker_repository=row.get("initial_docker_repository"), 1567 filters=_parse_rollout_filters(row.get("filters")), 1568 customer_tier=_extract_tier_from_filters(row.get("filters")), 1569 ) 1570 1571 1572@mcp_tool( 1573 read_only=True, 1574 idempotent=True, 1575) 1576def query_prod_connector_rollouts( 1577 actor_definition_id: Annotated[ 1578 str | None, 1579 Field(description="Connector definition UUID to filter by (optional)"), 1580 ] = None, 1581 rollout_id: Annotated[ 1582 str | None, 1583 Field(description="Specific rollout UUID to look up (optional)"), 1584 ] = None, 1585 active_only: Annotated[ 1586 bool, 1587 Field(description="If true, only return active (non-terminal) rollouts"), 1588 ] = False, 1589 limit: Annotated[ 1590 int, 1591 Field(description="Maximum number of results (default: 100)"), 1592 ] = 100, 1593) -> list[ConnectorRolloutInfo]: 1594 """Query connector rollouts with flexible filtering. 1595 1596 Returns rollouts based on the provided filters. If no filters are specified, 1597 returns all active rollouts. Useful for monitoring rollout status and history. 1598 1599 Filter behavior: 1600 - rollout_id: Returns that specific rollout (ignores other filters) 1601 - active_only: Returns only active (non-terminal) rollouts 1602 - actor_definition_id: Returns rollouts for that specific connector 1603 - No filters: Returns all active rollouts (same as active_only=True) 1604 """ 1605 rows = query_connector_rollouts( 1606 actor_definition_id=actor_definition_id, 1607 rollout_id=rollout_id, 1608 active_only=active_only, 1609 limit=limit, 1610 ) 1611 return [_row_to_connector_rollout_info(row) for row in rows] 1612 1613 1614@mcp_tool( 1615 read_only=True, 1616 idempotent=True, 1617 open_world=True, 1618) 1619def query_prod_connection_sync_activity( 1620 start_at: Annotated[ 1621 datetime, 1622 Field( 1623 description=( 1624 "Inclusive start timestamp for the sync activity window. " 1625 "Must be timezone-aware (ISO 8601 with offset or `Z`)." 1626 ), 1627 ), 1628 ], 1629 end_at: Annotated[ 1630 datetime, 1631 Field( 1632 description=( 1633 "Exclusive end timestamp for the sync activity window. " 1634 "Must be timezone-aware and strictly after `start_at`." 1635 ), 1636 ), 1637 ], 1638 organization_id: Annotated[ 1639 str | OrganizationAliasEnum | None, 1640 Field( 1641 description=( 1642 "Optional organization UUID or alias. At least one of " 1643 "`organization_id`, `workspace_id`, or `connection_ids` is " 1644 "required. Accepts `@airbyte-internal` as an alias for the " 1645 "Airbyte internal org." 1646 ), 1647 default=None, 1648 ), 1649 ] = None, 1650 workspace_id: Annotated[ 1651 str | WorkspaceAliasEnum | None, 1652 Field( 1653 description=( 1654 "Optional workspace UUID or alias. At least one of " 1655 "`organization_id`, `workspace_id`, or `connection_ids` is " 1656 "required. Accepts `@devin-ai-sandbox` as an alias for the " 1657 "Devin AI sandbox workspace." 1658 ), 1659 default=None, 1660 ), 1661 ] = None, 1662 connection_ids: Annotated[ 1663 list[str] | None, 1664 Field( 1665 description=( 1666 "Optional list of connection UUIDs. At least one of " 1667 "`organization_id`, `workspace_id`, or `connection_ids` is " 1668 "required." 1669 ), 1670 default=None, 1671 ), 1672 ] = None, 1673 status_filter: Annotated[ 1674 StatusFilter, 1675 Field( 1676 description=( 1677 "Filter by job status: `all` (default), `succeeded`, or " 1678 "`failed`. Applied to `jobs.status` in the Prod DB Replica." 1679 ), 1680 default=StatusFilter.ALL, 1681 ), 1682 ] = StatusFilter.ALL, 1683 limit: Annotated[ 1684 int, 1685 Field( 1686 description="Maximum number of attempt rows to return.", 1687 default=1000, 1688 ), 1689 ] = 1000, 1690) -> list[dict[str, Any]]: 1691 """List recent sync jobs and attempts from the Prod DB Replica. 1692 1693 Returns one row per `(job, attempt)` pair for sync jobs whose `updated_at` 1694 falls in `[start_at, end_at)`, scoped to the provided organization, 1695 workspace, or connection IDs. Designed for live operational lookups — 1696 e.g. "what happened on this connection in the last hour" — not for 1697 historical analysis. 1698 1699 Each row is enriched with `customer_tier` and `is_eu` for the owning 1700 organization. Tier filtering is intentionally not applied — this is a 1701 read-only observability query. 1702 1703 Input requirements: 1704 - At least one of `organization_id`, `workspace_id`, or `connection_ids` 1705 must be provided (any combination is accepted). 1706 - `start_at` and `end_at` must be timezone-aware and `start_at < end_at`. 1707 1708 Key fields in each row: 1709 - `job_id`, `attempt_id`, `attempt_number` 1710 - `job_status`, `attempt_status` 1711 - `job_started_at`, `job_updated_at`, `attempt_ended_at` 1712 - `failure_summary` (JSON; populated when an attempt failed) 1713 - `connection_id`, `connection_name`, `connection_status` 1714 - `source_actor_id`, `source_actor_name`, `source_actor_definition_id` 1715 - `destination_actor_id`, `destination_actor_name`, 1716 `destination_actor_definition_id` 1717 - `workspace_id`, `workspace_name`, `organization_id` 1718 - `dataplane_group_id`, `dataplane_name` 1719 - `customer_tier`, `is_eu` (added by tier enrichment) 1720 """ 1721 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 1722 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 1723 _validate_sync_activity_scope( 1724 organization_id=resolved_organization_id, 1725 workspace_id=resolved_workspace_id, 1726 connection_ids=connection_ids, 1727 ) 1728 normalized_start_at, normalized_end_at = _validate_sync_activity_window( 1729 start_at=start_at, 1730 end_at=end_at, 1731 ) 1732 1733 rows = query_connection_sync_activity_from_prod( 1734 start_at=normalized_start_at, 1735 end_at=normalized_end_at, 1736 organization_id=resolved_organization_id, 1737 workspace_id=resolved_workspace_id, 1738 connection_ids=connection_ids, 1739 status_filter=status_filter.value, 1740 limit=limit, 1741 ) 1742 return enrich_rows_by_org(rows) 1743 1744 1745def register_prod_db_query_tools(app: FastMCP) -> None: 1746 """Register prod DB query tools with the FastMCP app.""" 1747 register_mcp_tools(app, mcp_module=__name__)