airbyte_ops_mcp.mcp.prod_db_queries
MCP tools for querying the Airbyte Cloud Prod DB Replica.
This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
MCP reference
MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 13 tool(s), 0 prompt(s), 0 resource(s).
Tools (13)
query_prod_actors_by_pinned_connector_version
Hints: read-only · idempotent
List actors (sources/destinations) effectively pinned to a specific connector version.
Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.
The actor_id field is the actor ID (superset of source_id/destination_id).
Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_version_id |
string |
yes | — | Connector version UUID to find pinned instances for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_version_id": {
"description": "Connector version UUID to find pinned instances for",
"type": "string"
}
},
"required": [
"connector_version_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connections_by_connector
Hints: read-only · idempotent · open-world
Search for all connections using a specific source or destination connector type.
This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.
Set exclude_pinned=True to filter out connections that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.
Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
destination_definition_id |
string | null |
no | null |
Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB. |
destination_canonical_name |
string | null |
no | null |
Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
limit |
integer |
no | 1000 |
Maximum number of results (default: 1000) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
exclude_pinned |
boolean |
no | false |
If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"destination_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
},
"destination_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"limit": {
"default": 1000,
"description": "Maximum number of results (default: 1000)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
"exclude_pinned": {
"default": false,
"description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connections_by_stream
Hints: read-only · idempotent · open-world
Find connections that have a specific stream enabled in their catalog.
This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Use cases:
- Finding connections with a specific stream enabled for regression testing
- Validating connector fixes that affect particular streams
- Identifying which customers use rarely-enabled streams
Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
stream_name |
string |
yes | — | Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'. |
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"stream_name": {
"description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
"type": "string"
},
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"required": [
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connector_connection_stats
Hints: read-only · idempotent · open-world
Get aggregate connection stats for multiple connectors.
Returns counts of connections grouped by pinned version for each connector, including:
- Total, enabled, and active connection counts
- Pinned vs unpinned breakdown
- Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
This tool is designed for release monitoring workflows. It allows you to:
- Query recently released connectors to identify which ones to monitor
- Get aggregate stats showing how many connections are using each version
- See health metrics (pass/fail) broken down by version
The 'active_within_days' parameter controls the lookback window for:
- Counting 'active' connections (those with recent sync activity)
- Determining 'latest attempt status' (most recent attempt within the window)
Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_ids |
array<string> | null |
no | null |
List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139'] |
destination_definition_ids |
array<string> | null |
no | null |
List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610'] |
active_within_days |
integer |
no | 7 |
Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
},
"destination_definition_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
},
"active_within_days": {
"default": 7,
"description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Response containing connection stats for multiple connectors.",
"properties": {
"sources": {
"description": "Stats for source connectors",
"items": {
"description": "Aggregate connection stats for a connector.",
"properties": {
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_type": {
"description": "'source' or 'destination'",
"type": "string"
},
"canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The canonical connector name if resolved"
},
"total_connections": {
"description": "Total number of non-deprecated connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"pinned_connections": {
"description": "Number of connections with explicit version pins",
"type": "integer"
},
"unpinned_connections": {
"description": "Number of connections on default version",
"type": "integer"
},
"latest_attempt": {
"description": "Overall breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
},
"by_version": {
"description": "Stats broken down by pinned version",
"items": {
"description": "Stats for connections pinned to a specific version.",
"properties": {
"pinned_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"description": "The connector version UUID (None for unpinned connections)"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag for this version"
},
"total_connections": {
"description": "Total number of connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"latest_attempt": {
"description": "Breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"pinned_version_id",
"total_connections",
"enabled_connections",
"active_connections",
"latest_attempt"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"connector_definition_id",
"connector_type",
"total_connections",
"enabled_connections",
"active_connections",
"pinned_connections",
"unpinned_connections",
"latest_attempt",
"by_version"
],
"type": "object"
},
"type": "array"
},
"destinations": {
"description": "Stats for destination connectors",
"items": {
"description": "Aggregate connection stats for a connector.",
"properties": {
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_type": {
"description": "'source' or 'destination'",
"type": "string"
},
"canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The canonical connector name if resolved"
},
"total_connections": {
"description": "Total number of non-deprecated connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"pinned_connections": {
"description": "Number of connections with explicit version pins",
"type": "integer"
},
"unpinned_connections": {
"description": "Number of connections on default version",
"type": "integer"
},
"latest_attempt": {
"description": "Overall breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
},
"by_version": {
"description": "Stats broken down by pinned version",
"items": {
"description": "Stats for connections pinned to a specific version.",
"properties": {
"pinned_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"description": "The connector version UUID (None for unpinned connections)"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag for this version"
},
"total_connections": {
"description": "Total number of connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"latest_attempt": {
"description": "Breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"pinned_version_id",
"total_connections",
"enabled_connections",
"active_connections",
"latest_attempt"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"connector_definition_id",
"connector_type",
"total_connections",
"enabled_connections",
"active_connections",
"pinned_connections",
"unpinned_connections",
"latest_attempt",
"by_version"
],
"type": "object"
},
"type": "array"
},
"active_within_days": {
"description": "Lookback window used for 'active' connections",
"type": "integer"
},
"generated_at": {
"description": "When this response was generated",
"format": "date-time",
"type": "string"
}
},
"required": [
"active_within_days",
"generated_at"
],
"type": "object"
}
query_prod_connector_rollouts
Hints: read-only · idempotent
Query connector rollouts with flexible filtering.
Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.
Filter behavior:
- rollout_id: Returns that specific rollout (ignores other filters)
- active_only: Returns only active (non-terminal) rollouts
- actor_definition_id: Returns rollouts for that specific connector
- No filters: Returns all active rollouts (same as active_only=True)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
actor_definition_id |
string | null |
no | null |
Connector definition UUID to filter by (optional) |
rollout_id |
string | null |
no | null |
Specific rollout UUID to look up (optional) |
active_only |
boolean |
no | false |
If true, only return active (non-terminal) rollouts |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"actor_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connector definition UUID to filter by (optional)"
},
"rollout_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Specific rollout UUID to look up (optional)"
},
"active_only": {
"default": false,
"description": "If true, only return active (non-terminal) rollouts",
"type": "boolean"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a connector rollout.",
"properties": {
"rollout_id": {
"description": "The rollout UUID",
"type": "string"
},
"actor_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"state": {
"description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
"type": "string"
},
"initial_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Initial rollout percentage"
},
"current_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Current target rollout percentage"
},
"final_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Final target rollout percentage"
},
"has_breaking_changes": {
"description": "Whether the RC has breaking changes",
"type": "boolean"
},
"max_step_wait_time_mins": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Maximum wait time between rollout steps in minutes"
},
"rollout_strategy": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Rollout strategy: manual, automated, overridden"
},
"workflow_run_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Temporal workflow run ID"
},
"error_msg": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Error message if errored"
},
"failed_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Reason for failure if failed"
},
"paused_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Reason for pause if paused"
},
"tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional tag for the rollout"
},
"created_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout was created"
},
"updated_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout was last updated"
},
"completed_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout completed (if terminal)"
},
"expires_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout expires"
},
"rc_docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker image tag of the release candidate"
},
"rc_docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker repository of the release candidate"
},
"initial_docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker image tag of the initial version"
},
"initial_docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker repository of the initial version"
},
"filters": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
}
},
"required": [
"rollout_id",
"actor_definition_id",
"state",
"has_breaking_changes"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connector_versions
Hints: read-only · idempotent
List all versions for a connector definition.
Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.
Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_definition_id |
string |
yes | — | Connector definition UUID to list versions for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_definition_id": {
"description": "Connector definition UUID to list versions for",
"type": "string"
}
},
"required": [
"connector_definition_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_dataplanes
Hints: read-only · idempotent
List all dataplane groups with workspace counts.
Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).
Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count
Parameters:
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_failed_sync_attempts_for_connector
Hints: read-only · idempotent · open-world
List failed sync attempts for ALL actors using a source connector type.
This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.
Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.
Key fields in results:
- failure_summary: JSON containing failure details including failureType and messages
- customer_tier: TIER_0, TIER_1, or TIER_2
- is_eu: Whether the workspace is in the EU region
- pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
- pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_new_connector_releases
Hints: read-only · idempotent
List recently published connector versions.
Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).
Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_recent_syncs_for_connector
Hints: read-only · idempotent · open-world
List recent sync jobs for ALL actors using a connector type.
This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Use this tool to:
- Find healthy connections with recent successful syncs (status_filter='succeeded')
- Investigate connector issues across all users (status_filter='failed')
- Get an overview of all recent sync activity (status_filter='all')
Set exclude_pinned=True to filter out syncs for actors that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.
Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.
Key fields in results:
- job_status: 'succeeded', 'failed', 'cancelled', etc.
- connection_id, connection_name: The connection that ran the sync
- actor_id, actor_name: The source or destination actor
- customer_tier: TIER_0, TIER_1, or TIER_2
- is_eu: Whether the workspace is in the EU region
- pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
- pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
destination_definition_id |
string | null |
no | null |
Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB. |
destination_canonical_name |
string | null |
no | null |
Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'. |
status_filter |
enum("all", "succeeded", "failed") |
no | "all" |
Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
lookback_days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
exclude_pinned |
boolean |
no | false |
If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"destination_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
},
"destination_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
},
"status_filter": {
"description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
"enum": [
"all",
"succeeded",
"failed"
],
"type": "string",
"default": "all"
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
"exclude_pinned": {
"default": false,
"description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_recent_syncs_for_version_pinned_connector
Hints: read-only · idempotent
List sync job results for actors PINNED to a specific connector version.
IMPORTANT: This tool ONLY returns results for actors that are effectively pinned to the specified version via scoped_configuration at any scope level (actor, workspace, or organization). Most connections run unpinned and will NOT appear in these results.
Use this tool when you want to monitor rollout health for actors that have been pinned to a pre-release or specific version. For finding healthy connections across ALL actors using a connector type (regardless of pinning), use query_prod_recent_syncs_for_connector instead.
The actor_id field is the actor ID (superset of source_id/destination_id).
Returns list of dicts with keys: job_id, connection_id, job_status, started_at, job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_version_id |
string |
yes | — | Connector version UUID to find sync results for |
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
successful_only |
boolean |
no | false |
If True, only return successful syncs (default: False) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_version_id": {
"description": "Connector version UUID to find sync results for",
"type": "string"
},
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"successful_only": {
"default": false,
"description": "If True, only return successful syncs (default: False)",
"type": "boolean"
}
},
"required": [
"connector_version_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_workspace_info
Hints: read-only · idempotent
Get workspace information including dataplane group.
Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.
Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
}
},
"required": [
"workspace_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_workspaces_by_email_domain
Hints: read-only · idempotent
Find workspaces by email domain.
This tool searches for workspaces where users have email addresses matching the specified domain. This is useful for identifying workspaces belonging to specific companies - for example, searching for "motherduck.com" will find workspaces belonging to MotherDuck employees.
Use cases:
- Finding partner organization connections for testing connector fixes
- Identifying internal test accounts for specific integrations
- Locating workspaces belonging to technology partners
The returned organization IDs can be used with other tools like
query_prod_connections_by_connector to find connections within
those organizations for safe testing.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
email_domain |
string |
yes | — | Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain. |
limit |
integer |
no | 100 |
Maximum number of workspaces to return (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"email_domain": {
"description": "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.",
"type": "string"
},
"limit": {
"default": 100,
"description": "Maximum number of workspaces to return (default: 100)",
"type": "integer"
}
},
"required": [
"email_domain"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of looking up workspaces by email domain.",
"properties": {
"email_domain": {
"description": "The email domain that was searched for (e.g., 'motherduck.com')",
"type": "string"
},
"total_workspaces_found": {
"description": "Total number of workspaces matching the email domain",
"type": "integer"
},
"unique_organization_ids": {
"description": "List of unique organization IDs found",
"items": {
"type": "string"
},
"type": "array"
},
"workspaces": {
"description": "List of workspaces matching the email domain",
"items": {
"description": "Information about a workspace found by email domain search.",
"properties": {
"organization_id": {
"description": "The organization UUID",
"type": "string"
},
"workspace_id": {
"description": "The workspace UUID",
"type": "string"
},
"workspace_name": {
"description": "The name of the workspace",
"type": "string"
},
"slug": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The workspace slug (URL-friendly identifier)"
},
"email": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The email address associated with the workspace"
},
"dataplane_group_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The dataplane group UUID (region)"
},
"dataplane_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the dataplane (e.g., 'US', 'EU')"
},
"created_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the workspace was created"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
},
"is_eu": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether the workspace is in the EU region (derived from dataplane_name)."
}
},
"required": [
"organization_id",
"workspace_id",
"workspace_name"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"email_domain",
"total_workspaces_found",
"unique_organization_ids",
"workspaces"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for querying the Airbyte Cloud Prod DB Replica. 3 4This module provides MCP tools that wrap the query functions from 5airbyte_ops_mcp.prod_db_access.queries for use by AI agents. 6 7## MCP reference 8 9.. include:: ../../../docs/mcp-generated/prod_db_queries.md 10 :start-line: 2 11""" 12 13from __future__ import annotations 14 15__all__: list[str] = [] 16 17import json 18from datetime import datetime, timezone 19from enum import StrEnum 20from typing import Annotated, Any 21 22import requests 23from airbyte.exceptions import PyAirbyteInputError 24from fastmcp import FastMCP 25from fastmcp_extensions import mcp_tool, register_mcp_tools 26from pydantic import BaseModel, Field 27 28from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum 29from airbyte_ops_mcp.prod_db_access.queries import ( 30 query_actors_pinned_to_version, 31 query_connections_by_connector, 32 query_connections_by_destination_connector, 33 query_connections_by_stream, 34 query_connector_rollouts, 35 query_connector_versions, 36 query_dataplanes_list, 37 query_destination_connection_stats, 38 query_failed_sync_attempts_for_connector, 39 query_new_connector_releases, 40 query_recent_syncs_for_connector, 41 query_source_connection_stats, 42 query_syncs_for_version_pinned_connector, 43 query_workspace_info, 44 query_workspaces_by_email_domain, 45) 46from airbyte_ops_mcp.tier_cache import ( 47 TierFilter, 48 enrich_rows_by_org, 49 filter_rows_by_tier, 50 get_org_tiers, 51) 52 53 54class StatusFilter(StrEnum): 55 """Filter for job status in sync queries.""" 56 57 ALL = "all" 58 SUCCEEDED = "succeeded" 59 FAILED = "failed" 60 61 62# Cloud UI base URL for building connection URLs 63CLOUD_UI_BASE_URL = "https://cloud.airbyte.com" 64 65 66# ============================================================================= 67# Pydantic Models for MCP Tool Responses 68# ============================================================================= 69 70 71class WorkspaceInfo(BaseModel): 72 """Information about a workspace found by email domain search.""" 73 74 organization_id: str = Field(description="The organization UUID") 75 workspace_id: str = Field(description="The workspace UUID") 76 workspace_name: str = Field(description="The name of the workspace") 77 slug: str | None = Field( 78 default=None, description="The workspace slug (URL-friendly identifier)" 79 ) 80 email: str | None = Field( 81 default=None, description="The email address associated with the workspace" 82 ) 83 dataplane_group_id: str | None = Field( 84 default=None, description="The dataplane group UUID (region)" 85 ) 86 dataplane_name: str | None = Field( 87 default=None, description="The name of the dataplane (e.g., 'US', 'EU')" 88 ) 89 created_at: datetime | None = Field( 90 default=None, description="When the workspace was created" 91 ) 92 customer_tier: str | None = Field( 93 default=None, 94 description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.", 95 ) 96 is_eu: bool | None = Field( 97 default=None, 98 description="Whether the workspace is in the EU region (derived from dataplane_name).", 99 ) 100 101 102class WorkspacesByEmailDomainResult(BaseModel): 103 """Result of looking up workspaces by email domain.""" 104 105 email_domain: str = Field( 106 description="The email domain that was searched for (e.g., 'motherduck.com')" 107 ) 108 total_workspaces_found: int = Field( 109 description="Total number of workspaces matching the email domain" 110 ) 111 unique_organization_ids: list[str] = Field( 112 description="List of unique organization IDs found" 113 ) 114 workspaces: list[WorkspaceInfo] = Field( 115 description="List of workspaces matching the email domain" 116 ) 117 118 119class LatestAttemptBreakdown(BaseModel): 120 """Breakdown of connections by latest attempt status.""" 121 122 succeeded: int = Field( 123 default=0, description="Connections where latest attempt succeeded" 124 ) 125 failed: int = Field( 126 default=0, description="Connections where latest attempt failed" 127 ) 128 cancelled: int = Field( 129 default=0, description="Connections where latest attempt was cancelled" 130 ) 131 running: int = Field( 132 default=0, description="Connections where latest attempt is still running" 133 ) 134 unknown: int = Field( 135 default=0, 136 description="Connections with no recent attempts in the lookback window", 137 ) 138 139 140class VersionPinStats(BaseModel): 141 """Stats for connections pinned to a specific version.""" 142 143 pinned_version_id: str | None = Field( 144 description="The connector version UUID (None for unpinned connections)" 145 ) 146 docker_image_tag: str | None = Field( 147 default=None, description="The docker image tag for this version" 148 ) 149 total_connections: int = Field(description="Total number of connections") 150 enabled_connections: int = Field( 151 description="Number of enabled (active status) connections" 152 ) 153 active_connections: int = Field( 154 description="Number of connections with recent sync activity" 155 ) 156 latest_attempt: LatestAttemptBreakdown = Field( 157 description="Breakdown by latest attempt status" 158 ) 159 160 161class ConnectorConnectionStats(BaseModel): 162 """Aggregate connection stats for a connector.""" 163 164 connector_definition_id: str = Field(description="The connector definition UUID") 165 connector_type: str = Field(description="'source' or 'destination'") 166 canonical_name: str | None = Field( 167 default=None, description="The canonical connector name if resolved" 168 ) 169 total_connections: int = Field( 170 description="Total number of non-deprecated connections" 171 ) 172 enabled_connections: int = Field( 173 description="Number of enabled (active status) connections" 174 ) 175 active_connections: int = Field( 176 description="Number of connections with recent sync activity" 177 ) 178 pinned_connections: int = Field( 179 description="Number of connections with explicit version pins" 180 ) 181 unpinned_connections: int = Field( 182 description="Number of connections on default version" 183 ) 184 latest_attempt: LatestAttemptBreakdown = Field( 185 description="Overall breakdown by latest attempt status" 186 ) 187 by_version: list[VersionPinStats] = Field( 188 description="Stats broken down by pinned version" 189 ) 190 191 192class ConnectorConnectionStatsResponse(BaseModel): 193 """Response containing connection stats for multiple connectors.""" 194 195 sources: list[ConnectorConnectionStats] = Field( 196 default_factory=list, description="Stats for source connectors" 197 ) 198 destinations: list[ConnectorConnectionStats] = Field( 199 default_factory=list, description="Stats for destination connectors" 200 ) 201 active_within_days: int = Field( 202 description="Lookback window used for 'active' connections" 203 ) 204 generated_at: datetime = Field(description="When this response was generated") 205 206 207def _opt_str(value: Any) -> str | None: 208 """Convert a nullable value to str, returning None if the value is None/falsy.""" 209 return str(value) if value else None 210 211 212# Cloud registry URL for resolving canonical names 213CLOUD_REGISTRY_URL = ( 214 "https://connectors.airbyte.com/files/registries/v0/cloud_registry.json" 215) 216 217 218def _resolve_canonical_name_to_definition_id(canonical_name: str) -> str: 219 """Resolve a canonical connector name to a definition ID. 220 221 Auto-detects whether the connector is a source or destination based on the 222 canonical name prefix ("source-" or "destination-"). If no prefix is present, 223 searches both sources and destinations. 224 225 Args: 226 canonical_name: Canonical connector name (e.g., 'source-youtube-analytics', 227 'destination-duckdb', 'YouTube Analytics', 'DuckDB'). 228 229 Returns: 230 The connector definition ID (UUID). 231 232 Raises: 233 PyAirbyteInputError: If the canonical name cannot be resolved. 234 """ 235 response = requests.get(CLOUD_REGISTRY_URL, timeout=60) 236 237 if response.status_code != 200: 238 raise PyAirbyteInputError( 239 message=f"Failed to fetch connector registry: {response.status_code}", 240 context={"response": response.text}, 241 ) 242 243 data = response.json() 244 normalized_input = canonical_name.lower().strip() 245 246 # Determine which registries to search based on prefix 247 is_source = normalized_input.startswith("source-") 248 is_destination = normalized_input.startswith("destination-") 249 250 # Search sources if it looks like a source or has no prefix 251 if is_source or not is_destination: 252 sources = data.get("sources", []) 253 for source in sources: 254 source_name = source.get("name", "").lower() 255 if source_name == normalized_input: 256 return source["sourceDefinitionId"] 257 slugified = source_name.replace(" ", "-") 258 if ( 259 slugified == normalized_input 260 or f"source-{slugified}" == normalized_input 261 ): 262 return source["sourceDefinitionId"] 263 264 # Search destinations if it looks like a destination or has no prefix 265 if is_destination or not is_source: 266 destinations = data.get("destinations", []) 267 for destination in destinations: 268 destination_name = destination.get("name", "").lower() 269 if destination_name == normalized_input: 270 return destination["destinationDefinitionId"] 271 slugified = destination_name.replace(" ", "-") 272 if ( 273 slugified == normalized_input 274 or f"destination-{slugified}" == normalized_input 275 ): 276 return destination["destinationDefinitionId"] 277 278 # Build appropriate error message based on what was searched 279 if is_source: 280 connector_type = "source" 281 hint = ( 282 "Use the exact canonical name (e.g., 'source-youtube-analytics') " 283 "or display name (e.g., 'YouTube Analytics')." 284 ) 285 elif is_destination: 286 connector_type = "destination" 287 hint = ( 288 "Use the exact canonical name (e.g., 'destination-duckdb') " 289 "or display name (e.g., 'DuckDB')." 290 ) 291 else: 292 connector_type = "connector" 293 hint = ( 294 "Use the exact canonical name (e.g., 'source-youtube-analytics', " 295 "'destination-duckdb') or display name (e.g., 'YouTube Analytics', 'DuckDB')." 296 ) 297 298 raise PyAirbyteInputError( 299 message=f"Could not find {connector_type} definition for canonical name: {canonical_name}", 300 context={ 301 "hint": hint 302 + " You can list available connectors using the connector registry tools.", 303 "searched_for": canonical_name, 304 }, 305 ) 306 307 308@mcp_tool( 309 read_only=True, 310 idempotent=True, 311) 312def query_prod_dataplanes() -> list[dict[str, Any]]: 313 """List all dataplane groups with workspace counts. 314 315 Returns information about all active dataplane groups in Airbyte Cloud, 316 including the number of workspaces in each. Useful for understanding 317 the distribution of workspaces across regions (US, US-Central, EU). 318 319 Returns list of dicts with keys: dataplane_group_id, dataplane_name, 320 organization_id, enabled, tombstone, created_at, workspace_count 321 """ 322 return query_dataplanes_list() 323 324 325@mcp_tool( 326 read_only=True, 327 idempotent=True, 328) 329def query_prod_workspace_info( 330 workspace_id: Annotated[ 331 str | WorkspaceAliasEnum, 332 Field( 333 description="Workspace UUID or alias to look up. " 334 "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace." 335 ), 336 ], 337) -> dict[str, Any] | None: 338 """Get workspace information including dataplane group. 339 340 Returns details about a specific workspace, including which dataplane 341 (region) it belongs to. Useful for determining if a workspace is in 342 the EU region for filtering purposes. 343 344 Returns dict with keys: workspace_id, workspace_name, slug, organization_id, 345 dataplane_group_id, dataplane_name, created_at, tombstone 346 Or None if workspace not found. 347 """ 348 # Resolve workspace ID alias (workspace_id is required, so resolved value is never None) 349 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 350 assert resolved_workspace_id is not None # Type narrowing: workspace_id is required 351 352 return query_workspace_info(resolved_workspace_id) 353 354 355@mcp_tool( 356 read_only=True, 357 idempotent=True, 358) 359def query_prod_connector_versions( 360 connector_definition_id: Annotated[ 361 str, 362 Field(description="Connector definition UUID to list versions for"), 363 ], 364) -> list[dict[str, Any]]: 365 """List all versions for a connector definition. 366 367 Returns all published versions of a connector, ordered by last_published 368 date descending. Useful for understanding version history and finding 369 specific version IDs for pinning or rollout monitoring. 370 371 Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, 372 release_stage, support_level, cdk_version, language, last_published, release_date 373 """ 374 return query_connector_versions(connector_definition_id) 375 376 377@mcp_tool( 378 read_only=True, 379 idempotent=True, 380) 381def query_prod_new_connector_releases( 382 days: Annotated[ 383 int, 384 Field(description="Number of days to look back (default: 7)", default=7), 385 ] = 7, 386 limit: Annotated[ 387 int, 388 Field(description="Maximum number of results (default: 100)", default=100), 389 ] = 100, 390) -> list[dict[str, Any]]: 391 """List recently published connector versions. 392 393 Returns connector versions published within the specified number of days. 394 Uses last_published timestamp which reflects when the version was actually 395 deployed to the registry (not the changelog date). 396 397 Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, 398 docker_image_tag, last_published, release_date, release_stage, support_level, 399 cdk_version, language, created_at 400 """ 401 return query_new_connector_releases(days=days, limit=limit) 402 403 404@mcp_tool( 405 read_only=True, 406 idempotent=True, 407) 408def query_prod_actors_by_pinned_connector_version( 409 connector_version_id: Annotated[ 410 str, 411 Field(description="Connector version UUID to find pinned instances for"), 412 ], 413) -> list[dict[str, Any]]: 414 """List actors (sources/destinations) effectively pinned to a specific connector version. 415 416 Returns all actors that are effectively pinned to a specific connector version, 417 considering all scope levels: actor-level pins, workspace-level pins, and 418 organization-level pins (with actor > workspace > organization precedence). 419 Useful for monitoring rollouts and understanding which customers are affected. 420 421 The actor_id field is the actor ID (superset of source_id/destination_id). 422 423 Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, 424 origin, description, created_at, expires_at, pin_scope_type, actor_name, 425 workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name 426 427 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 428 level the effective pin came from. 429 """ 430 return query_actors_pinned_to_version(connector_version_id) 431 432 433@mcp_tool( 434 read_only=True, 435 idempotent=True, 436) 437def query_prod_recent_syncs_for_version_pinned_connector( 438 connector_version_id: Annotated[ 439 str, 440 Field(description="Connector version UUID to find sync results for"), 441 ], 442 days: Annotated[ 443 int, 444 Field(description="Number of days to look back (default: 7)", default=7), 445 ] = 7, 446 limit: Annotated[ 447 int, 448 Field(description="Maximum number of results (default: 100)", default=100), 449 ] = 100, 450 successful_only: Annotated[ 451 bool, 452 Field( 453 description="If True, only return successful syncs (default: False)", 454 default=False, 455 ), 456 ] = False, 457) -> list[dict[str, Any]]: 458 """List sync job results for actors PINNED to a specific connector version. 459 460 IMPORTANT: This tool ONLY returns results for actors that are effectively 461 pinned to the specified version via scoped_configuration at any scope level 462 (actor, workspace, or organization). Most connections run unpinned and will 463 NOT appear in these results. 464 465 Use this tool when you want to monitor rollout health for actors that have been 466 pinned to a pre-release or specific version. For finding healthy connections 467 across ALL actors using a connector type (regardless of pinning), 468 use query_prod_recent_syncs_for_connector instead. 469 470 The actor_id field is the actor ID (superset of source_id/destination_id). 471 472 Returns list of dicts with keys: job_id, connection_id, job_status, started_at, 473 job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, 474 pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, 475 organization_id, dataplane_group_id, dataplane_name 476 477 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 478 level the effective pin came from. 479 """ 480 return query_syncs_for_version_pinned_connector( 481 connector_version_id, 482 days=days, 483 limit=limit, 484 successful_only=successful_only, 485 ) 486 487 488@mcp_tool( 489 read_only=True, 490 idempotent=True, 491 open_world=True, 492) 493def query_prod_recent_syncs_for_connector( 494 source_definition_id: Annotated[ 495 str | None, 496 Field( 497 description=( 498 "Source connector definition ID (UUID) to search for. " 499 "Provide this OR source_canonical_name OR destination_definition_id " 500 "OR destination_canonical_name (exactly one required). " 501 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 502 ), 503 default=None, 504 ), 505 ], 506 source_canonical_name: Annotated[ 507 str | None, 508 Field( 509 description=( 510 "Canonical source connector name to search for. " 511 "Provide this OR source_definition_id OR destination_definition_id " 512 "OR destination_canonical_name (exactly one required). " 513 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 514 ), 515 default=None, 516 ), 517 ], 518 destination_definition_id: Annotated[ 519 str | None, 520 Field( 521 description=( 522 "Destination connector definition ID (UUID) to search for. " 523 "Provide this OR destination_canonical_name OR source_definition_id " 524 "OR source_canonical_name (exactly one required). " 525 "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB." 526 ), 527 default=None, 528 ), 529 ], 530 destination_canonical_name: Annotated[ 531 str | None, 532 Field( 533 description=( 534 "Canonical destination connector name to search for. " 535 "Provide this OR destination_definition_id OR source_definition_id " 536 "OR source_canonical_name (exactly one required). " 537 "Examples: 'destination-duckdb', 'DuckDB'." 538 ), 539 default=None, 540 ), 541 ], 542 status_filter: Annotated[ 543 StatusFilter, 544 Field( 545 description=( 546 "Filter by job status: 'all' (default), 'succeeded', or 'failed'. " 547 "Use 'succeeded' to find healthy connections with recent successful syncs. " 548 "Use 'failed' to find connections with recent failures." 549 ), 550 default=StatusFilter.ALL, 551 ), 552 ], 553 organization_id: Annotated[ 554 str | OrganizationAliasEnum | None, 555 Field( 556 description=( 557 "Optional organization ID (UUID) or alias to filter results. " 558 "If provided, only syncs from this organization will be returned. " 559 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 560 ), 561 default=None, 562 ), 563 ], 564 lookback_days: Annotated[ 565 int, 566 Field(description="Number of days to look back (default: 7)", default=7), 567 ], 568 limit: Annotated[ 569 int, 570 Field(description="Maximum number of results (default: 100)", default=100), 571 ], 572 customer_tier_filter: Annotated[ 573 TierFilter, 574 Field( 575 description=( 576 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 577 "Filters results to only include connections belonging to organizations " 578 "in the specified tier. Use 'ALL' to include all tiers." 579 ), 580 ), 581 ] = "TIER_2", 582 *, 583 exclude_pinned: Annotated[ 584 bool, 585 Field( 586 description=( 587 "If True, exclude syncs for actors that are already pinned to a " 588 "specific version (at any scope level: actor, workspace, or organization). " 589 "Useful for 'prove fix' workflows where you want to find unpinned " 590 "connections for live testing. Default: False (include all syncs)." 591 ), 592 default=False, 593 ), 594 ], 595) -> list[dict[str, Any]]: 596 """List recent sync jobs for ALL actors using a connector type. 597 598 This tool finds all actors with the given connector definition and returns their 599 recent sync jobs, regardless of whether they have explicit version pins. It filters 600 out deleted actors, deleted workspaces, and deprecated connections. 601 602 Results are always enriched with customer_tier and is_eu fields. 603 The customer_tier_filter parameter is required to ensure tier-aware querying. 604 605 Use this tool to: 606 - Find healthy connections with recent successful syncs (status_filter='succeeded') 607 - Investigate connector issues across all users (status_filter='failed') 608 - Get an overview of all recent sync activity (status_filter='all') 609 610 Set exclude_pinned=True to filter out syncs for actors that are already pinned to a 611 specific version. This is useful for 'prove fix' live connection testing workflows 612 where you want to find unpinned connections to test against. 613 614 Supports both SOURCE and DESTINATION connectors. Provide exactly one of: 615 source_definition_id, source_canonical_name, destination_definition_id, 616 or destination_canonical_name. 617 618 Key fields in results: 619 - job_status: 'succeeded', 'failed', 'cancelled', etc. 620 - connection_id, connection_name: The connection that ran the sync 621 - actor_id, actor_name: The source or destination actor 622 - customer_tier: TIER_0, TIER_1, or TIER_2 623 - is_eu: Whether the workspace is in the EU region 624 - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned) 625 - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned) 626 """ 627 # Validate that exactly one connector parameter is provided 628 provided_params = [ 629 source_definition_id, 630 source_canonical_name, 631 destination_definition_id, 632 destination_canonical_name, 633 ] 634 num_provided = sum(p is not None for p in provided_params) 635 if num_provided != 1: 636 raise PyAirbyteInputError( 637 message=( 638 "Exactly one of source_definition_id, source_canonical_name, " 639 "destination_definition_id, or destination_canonical_name must be provided." 640 ), 641 ) 642 643 # Determine if this is a destination connector 644 is_destination = ( 645 destination_definition_id is not None or destination_canonical_name is not None 646 ) 647 648 # Resolve canonical name to definition ID if needed 649 resolved_definition_id: str 650 if source_canonical_name: 651 resolved_definition_id = _resolve_canonical_name_to_definition_id( 652 canonical_name=source_canonical_name, 653 ) 654 elif destination_canonical_name: 655 resolved_definition_id = _resolve_canonical_name_to_definition_id( 656 canonical_name=destination_canonical_name, 657 ) 658 elif source_definition_id: 659 resolved_definition_id = source_definition_id 660 else: 661 # We've validated exactly one param is provided, so this must be set 662 assert destination_definition_id is not None 663 resolved_definition_id = destination_definition_id 664 665 # Resolve organization ID alias 666 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 667 668 rows = query_recent_syncs_for_connector( 669 connector_definition_id=resolved_definition_id, 670 is_destination=is_destination, 671 status_filter=status_filter, 672 organization_id=resolved_organization_id, 673 days=lookback_days, 674 limit=limit, 675 exclude_pinned=exclude_pinned, 676 ) 677 678 enriched = enrich_rows_by_org(rows) 679 return filter_rows_by_tier(enriched, customer_tier_filter) 680 681 682@mcp_tool( 683 read_only=True, 684 idempotent=True, 685 open_world=True, 686) 687def query_prod_failed_sync_attempts_for_connector( 688 source_definition_id: Annotated[ 689 str | None, 690 Field( 691 description=( 692 "Source connector definition ID (UUID) to search for. " 693 "Exactly one of this or source_canonical_name is required. " 694 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 695 ), 696 default=None, 697 ), 698 ] = None, 699 source_canonical_name: Annotated[ 700 str | None, 701 Field( 702 description=( 703 "Canonical source connector name to search for. " 704 "Exactly one of this or source_definition_id is required. " 705 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 706 ), 707 default=None, 708 ), 709 ] = None, 710 organization_id: Annotated[ 711 str | OrganizationAliasEnum | None, 712 Field( 713 description=( 714 "Optional organization ID (UUID) or alias to filter results. " 715 "If provided, only failed attempts from this organization will be returned. " 716 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 717 ), 718 default=None, 719 ), 720 ] = None, 721 days: Annotated[ 722 int, 723 Field(description="Number of days to look back (default: 7)", default=7), 724 ] = 7, 725 limit: Annotated[ 726 int, 727 Field(description="Maximum number of results (default: 100)", default=100), 728 ] = 100, 729 customer_tier_filter: Annotated[ 730 TierFilter, 731 Field( 732 description=( 733 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 734 "Filters results to only include connections belonging to organizations " 735 "in the specified tier. Use 'ALL' to include all tiers." 736 ), 737 ), 738 ] = "TIER_2", 739) -> list[dict[str, Any]]: 740 """List failed sync attempts for ALL actors using a source connector type. 741 742 This tool finds all actors with the given connector definition and returns their 743 failed sync attempts, regardless of whether they have explicit version pins. 744 745 Results are always enriched with customer_tier and is_eu fields. 746 The customer_tier_filter parameter is required to ensure tier-aware querying. 747 748 This is useful for investigating connector issues across all users. Use this when 749 you want to find failures for a connector type regardless of which version users 750 are on. 751 752 Note: This tool only supports SOURCE connectors. For destination connectors, 753 a separate tool would be needed. 754 755 Key fields in results: 756 - failure_summary: JSON containing failure details including failureType and messages 757 - customer_tier: TIER_0, TIER_1, or TIER_2 758 - is_eu: Whether the workspace is in the EU region 759 - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned) 760 - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned) 761 """ 762 # Validate that exactly one of the two parameters is provided 763 if (source_definition_id is None) == (source_canonical_name is None): 764 raise PyAirbyteInputError( 765 message=( 766 "Exactly one of source_definition_id or source_canonical_name " 767 "must be provided, but not both." 768 ), 769 ) 770 771 # Resolve canonical name to definition ID if needed 772 resolved_definition_id: str 773 if source_canonical_name: 774 resolved_definition_id = _resolve_canonical_name_to_definition_id( 775 canonical_name=source_canonical_name, 776 ) 777 else: 778 resolved_definition_id = source_definition_id # type: ignore[assignment] 779 780 # Resolve organization ID alias 781 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 782 783 rows = query_failed_sync_attempts_for_connector( 784 connector_definition_id=resolved_definition_id, 785 organization_id=resolved_organization_id, 786 days=days, 787 limit=limit, 788 ) 789 enriched = enrich_rows_by_org(rows) 790 return filter_rows_by_tier(enriched, customer_tier_filter) 791 792 793@mcp_tool( 794 read_only=True, 795 idempotent=True, 796 open_world=True, 797) 798def query_prod_connections_by_connector( 799 source_definition_id: Annotated[ 800 str | None, 801 Field( 802 description=( 803 "Source connector definition ID (UUID) to search for. " 804 "Exactly one of source_definition_id, source_canonical_name, " 805 "destination_definition_id, or destination_canonical_name is required. " 806 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 807 ), 808 default=None, 809 ), 810 ] = None, 811 source_canonical_name: Annotated[ 812 str | None, 813 Field( 814 description=( 815 "Canonical source connector name to search for. " 816 "Exactly one of source_definition_id, source_canonical_name, " 817 "destination_definition_id, or destination_canonical_name is required. " 818 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 819 ), 820 default=None, 821 ), 822 ] = None, 823 destination_definition_id: Annotated[ 824 str | None, 825 Field( 826 description=( 827 "Destination connector definition ID (UUID) to search for. " 828 "Exactly one of source_definition_id, source_canonical_name, " 829 "destination_definition_id, or destination_canonical_name is required. " 830 "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB." 831 ), 832 default=None, 833 ), 834 ] = None, 835 destination_canonical_name: Annotated[ 836 str | None, 837 Field( 838 description=( 839 "Canonical destination connector name to search for. " 840 "Exactly one of source_definition_id, source_canonical_name, " 841 "destination_definition_id, or destination_canonical_name is required. " 842 "Examples: 'destination-duckdb', 'DuckDB'." 843 ), 844 default=None, 845 ), 846 ] = None, 847 organization_id: Annotated[ 848 str | OrganizationAliasEnum | None, 849 Field( 850 description=( 851 "Optional organization ID (UUID) or alias to filter results. " 852 "If provided, only connections in this organization will be returned. " 853 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 854 ), 855 default=None, 856 ), 857 ] = None, 858 limit: Annotated[ 859 int, 860 Field(description="Maximum number of results (default: 1000)", default=1000), 861 ] = 1000, 862 customer_tier_filter: Annotated[ 863 TierFilter, 864 Field( 865 description=( 866 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 867 "Filters results to only include connections belonging to organizations " 868 "in the specified tier. Use 'ALL' to include all tiers." 869 ), 870 ), 871 ] = "TIER_2", 872 *, 873 exclude_pinned: Annotated[ 874 bool, 875 Field( 876 description=( 877 "If True, exclude connections whose connector is already pinned to a " 878 "specific version (at any scope level: actor, workspace, or organization). " 879 "Useful for 'prove fix' workflows where you want to find unpinned " 880 "connections for live testing. Default: False (include all connections)." 881 ), 882 default=False, 883 ), 884 ], 885) -> list[dict[str, Any]]: 886 """Search for all connections using a specific source or destination connector type. 887 888 This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. 889 It finds all connections where the source or destination connector matches the 890 specified type, regardless of how the connector is named by users. 891 892 Results are always enriched with customer_tier and is_eu fields. 893 The customer_tier_filter parameter is required to ensure tier-aware querying. 894 895 Optionally filter by organization_id to limit results to a specific organization. 896 Use '@airbyte-internal' as an alias for the Airbyte internal organization. 897 898 Set exclude_pinned=True to filter out connections that are already pinned to a 899 specific version. This is useful for 'prove fix' live connection testing workflows 900 where you want to find unpinned connections to test against. 901 902 Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. 903 For source queries, returns: connection_id, connection_name, connection_url, source_id, 904 source_name, source_definition_id, workspace_id, workspace_name, organization_id, 905 dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, 906 pin_scope_type, customer_tier, is_eu. 907 For destination queries, returns: connection_id, connection_name, connection_url, 908 destination_id, destination_name, destination_definition_id, workspace_id, 909 workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, 910 pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. 911 912 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 913 level the effective pin came from (NULL if not pinned). 914 """ 915 # Validate that exactly one of the four connector parameters is provided 916 provided_params = [ 917 source_definition_id, 918 source_canonical_name, 919 destination_definition_id, 920 destination_canonical_name, 921 ] 922 num_provided = sum(p is not None for p in provided_params) 923 if num_provided != 1: 924 raise PyAirbyteInputError( 925 message=( 926 "Exactly one of source_definition_id, source_canonical_name, " 927 "destination_definition_id, or destination_canonical_name must be provided." 928 ), 929 ) 930 931 # Determine if this is a source or destination query and resolve the definition ID 932 is_source_query = ( 933 source_definition_id is not None or source_canonical_name is not None 934 ) 935 resolved_definition_id: str 936 937 if source_canonical_name: 938 resolved_definition_id = _resolve_canonical_name_to_definition_id( 939 canonical_name=source_canonical_name, 940 ) 941 elif source_definition_id: 942 resolved_definition_id = source_definition_id 943 elif destination_canonical_name: 944 resolved_definition_id = _resolve_canonical_name_to_definition_id( 945 canonical_name=destination_canonical_name, 946 ) 947 else: 948 resolved_definition_id = destination_definition_id # type: ignore[assignment] 949 950 # Resolve organization ID alias 951 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 952 953 # Query the database based on connector type 954 if is_source_query: 955 rows = [ 956 { 957 "organization_id": str(row.get("organization_id", "")), 958 "workspace_id": str(row["workspace_id"]), 959 "workspace_name": row.get("workspace_name", ""), 960 "connection_id": str(row["connection_id"]), 961 "connection_name": row.get("connection_name", ""), 962 "connection_url": ( 963 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 964 f"/connections/{row['connection_id']}/status" 965 ), 966 "source_id": str(row["source_id"]), 967 "source_name": row.get("source_name", ""), 968 "source_definition_id": str(row["source_definition_id"]), 969 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 970 "dataplane_name": row.get("dataplane_name", ""), 971 "pin_origin_type": row.get("pin_origin_type"), 972 "pin_origin": row.get("pin_origin"), 973 "pinned_version_id": _opt_str(row.get("pinned_version_id")), 974 "pin_scope_type": row.get("pin_scope_type"), 975 } 976 for row in query_connections_by_connector( 977 connector_definition_id=resolved_definition_id, 978 organization_id=resolved_organization_id, 979 limit=limit, 980 exclude_pinned=exclude_pinned, 981 ) 982 ] 983 else: 984 # Destination query 985 rows = [ 986 { 987 "organization_id": str(row.get("organization_id", "")), 988 "workspace_id": str(row["workspace_id"]), 989 "workspace_name": row.get("workspace_name", ""), 990 "connection_id": str(row["connection_id"]), 991 "connection_name": row.get("connection_name", ""), 992 "connection_url": ( 993 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 994 f"/connections/{row['connection_id']}/status" 995 ), 996 "destination_id": str(row["destination_id"]), 997 "destination_name": row.get("destination_name", ""), 998 "destination_definition_id": str(row["destination_definition_id"]), 999 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 1000 "dataplane_name": row.get("dataplane_name", ""), 1001 "pin_origin_type": row.get("pin_origin_type"), 1002 "pin_origin": row.get("pin_origin"), 1003 "pinned_version_id": _opt_str(row.get("pinned_version_id")), 1004 "pin_scope_type": row.get("pin_scope_type"), 1005 } 1006 for row in query_connections_by_destination_connector( 1007 connector_definition_id=resolved_definition_id, 1008 organization_id=resolved_organization_id, 1009 limit=limit, 1010 exclude_pinned=exclude_pinned, 1011 ) 1012 ] 1013 1014 enriched = enrich_rows_by_org(rows) 1015 return filter_rows_by_tier(enriched, customer_tier_filter) 1016 1017 1018@mcp_tool( 1019 read_only=True, 1020 idempotent=True, 1021 open_world=True, 1022) 1023def query_prod_connections_by_stream( 1024 stream_name: Annotated[ 1025 str, 1026 Field( 1027 description=( 1028 "Name of the stream to search for in connection catalogs. " 1029 "This must match the exact stream name as configured in the connection. " 1030 "Examples: 'global_exclusions', 'campaigns', 'users'." 1031 ), 1032 ), 1033 ], 1034 source_definition_id: Annotated[ 1035 str | None, 1036 Field( 1037 description=( 1038 "Source connector definition ID (UUID) to search for. " 1039 "Provide this OR source_canonical_name (exactly one required). " 1040 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 1041 ), 1042 default=None, 1043 ), 1044 ], 1045 source_canonical_name: Annotated[ 1046 str | None, 1047 Field( 1048 description=( 1049 "Canonical source connector name to search for. " 1050 "Provide this OR source_definition_id (exactly one required). " 1051 "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'." 1052 ), 1053 default=None, 1054 ), 1055 ], 1056 organization_id: Annotated[ 1057 str | OrganizationAliasEnum | None, 1058 Field( 1059 description=( 1060 "Optional organization ID (UUID) or alias to filter results. " 1061 "If provided, only connections in this organization will be returned. " 1062 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 1063 ), 1064 default=None, 1065 ), 1066 ], 1067 limit: Annotated[ 1068 int, 1069 Field(description="Maximum number of results (default: 100)", default=100), 1070 ], 1071 customer_tier_filter: Annotated[ 1072 TierFilter, 1073 Field( 1074 description=( 1075 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 1076 "Filters results to only include connections belonging to organizations " 1077 "in the specified tier. Use 'ALL' to include all tiers." 1078 ), 1079 ), 1080 ] = "TIER_2", 1081) -> list[dict[str, Any]]: 1082 """Find connections that have a specific stream enabled in their catalog. 1083 1084 This tool searches the connection's configured catalog (JSONB) for streams 1085 matching the specified name. It's particularly useful when validating 1086 connector fixes that affect specific streams - you can quickly find 1087 customer connections that use the affected stream. 1088 1089 Results are always enriched with customer_tier and is_eu fields. 1090 The customer_tier_filter parameter is required to ensure tier-aware querying. 1091 1092 Use cases: 1093 - Finding connections with a specific stream enabled for regression testing 1094 - Validating connector fixes that affect particular streams 1095 - Identifying which customers use rarely-enabled streams 1096 1097 Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. 1098 """ 1099 provided_params = [source_definition_id, source_canonical_name] 1100 num_provided = sum(p is not None for p in provided_params) 1101 if num_provided != 1: 1102 raise PyAirbyteInputError( 1103 message=( 1104 "Exactly one of source_definition_id or source_canonical_name " 1105 "must be provided." 1106 ), 1107 ) 1108 1109 resolved_definition_id: str 1110 if source_canonical_name: 1111 resolved_definition_id = _resolve_canonical_name_to_definition_id( 1112 canonical_name=source_canonical_name, 1113 ) 1114 else: 1115 assert source_definition_id is not None 1116 resolved_definition_id = source_definition_id 1117 1118 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 1119 1120 rows = [ 1121 { 1122 "organization_id": str(row.get("organization_id", "")), 1123 "workspace_id": str(row["workspace_id"]), 1124 "workspace_name": row.get("workspace_name", ""), 1125 "connection_id": str(row["connection_id"]), 1126 "connection_name": row.get("connection_name", ""), 1127 "connection_status": row.get("connection_status", ""), 1128 "connection_url": ( 1129 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 1130 f"/connections/{row['connection_id']}/status" 1131 ), 1132 "source_id": str(row["source_id"]), 1133 "source_name": row.get("source_name", ""), 1134 "source_definition_id": str(row["source_definition_id"]), 1135 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 1136 "dataplane_name": row.get("dataplane_name", ""), 1137 } 1138 for row in query_connections_by_stream( 1139 connector_definition_id=resolved_definition_id, 1140 stream_name=stream_name, 1141 organization_id=resolved_organization_id, 1142 limit=limit, 1143 ) 1144 ] 1145 enriched = enrich_rows_by_org(rows) 1146 return filter_rows_by_tier(enriched, customer_tier_filter) 1147 1148 1149@mcp_tool( 1150 read_only=True, 1151 idempotent=True, 1152) 1153def query_prod_workspaces_by_email_domain( 1154 email_domain: Annotated[ 1155 str, 1156 Field( 1157 description=( 1158 "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). " 1159 "Do not include the '@' symbol. This will find workspaces where users " 1160 "have email addresses with this domain." 1161 ), 1162 ), 1163 ], 1164 limit: Annotated[ 1165 int, 1166 Field( 1167 description="Maximum number of workspaces to return (default: 100)", 1168 default=100, 1169 ), 1170 ] = 100, 1171) -> WorkspacesByEmailDomainResult: 1172 """Find workspaces by email domain. 1173 1174 This tool searches for workspaces where users have email addresses matching 1175 the specified domain. This is useful for identifying workspaces belonging to 1176 specific companies - for example, searching for "motherduck.com" will find 1177 workspaces belonging to MotherDuck employees. 1178 1179 Use cases: 1180 - Finding partner organization connections for testing connector fixes 1181 - Identifying internal test accounts for specific integrations 1182 - Locating workspaces belonging to technology partners 1183 1184 The returned organization IDs can be used with other tools like 1185 `query_prod_connections_by_connector` to find connections within 1186 those organizations for safe testing. 1187 """ 1188 # Strip leading @ if provided 1189 clean_domain = email_domain.lstrip("@") 1190 1191 # Query the database 1192 rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit) 1193 1194 # Convert rows to Pydantic models 1195 workspaces = [ 1196 WorkspaceInfo( 1197 organization_id=str(row["organization_id"]), 1198 workspace_id=str(row["workspace_id"]), 1199 workspace_name=row.get("workspace_name", ""), 1200 slug=row.get("slug"), 1201 email=row.get("email"), 1202 dataplane_group_id=_opt_str(row.get("dataplane_group_id")), 1203 dataplane_name=row.get("dataplane_name"), 1204 created_at=row.get("created_at"), 1205 ) 1206 for row in rows 1207 ] 1208 1209 # Enrich with tier annotation (annotation only, no filtering) 1210 unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces)) 1211 tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)} 1212 for ws in workspaces: 1213 tier_result = tier_results.get(ws.organization_id) 1214 if tier_result: 1215 ws.customer_tier = tier_result.customer_tier 1216 ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False 1217 1218 return WorkspacesByEmailDomainResult( 1219 email_domain=clean_domain, 1220 total_workspaces_found=len(workspaces), 1221 unique_organization_ids=unique_org_ids, 1222 workspaces=workspaces, 1223 ) 1224 1225 1226def _build_connector_stats( 1227 connector_definition_id: str, 1228 connector_type: str, 1229 canonical_name: str | None, 1230 rows: list[dict[str, Any]], 1231 version_tags: dict[str, str | None], 1232) -> ConnectorConnectionStats: 1233 """Build ConnectorConnectionStats from query result rows.""" 1234 # Aggregate totals across all version groups 1235 total_connections = 0 1236 enabled_connections = 0 1237 active_connections = 0 1238 pinned_connections = 0 1239 unpinned_connections = 0 1240 total_succeeded = 0 1241 total_failed = 0 1242 total_cancelled = 0 1243 total_running = 0 1244 total_unknown = 0 1245 1246 by_version: list[VersionPinStats] = [] 1247 1248 for row in rows: 1249 version_id = row.get("pinned_version_id") 1250 row_total = int(row.get("total_connections", 0)) 1251 row_enabled = int(row.get("enabled_connections", 0)) 1252 row_active = int(row.get("active_connections", 0)) 1253 row_pinned = int(row.get("pinned_connections", 0)) 1254 row_unpinned = int(row.get("unpinned_connections", 0)) 1255 row_succeeded = int(row.get("succeeded_connections", 0)) 1256 row_failed = int(row.get("failed_connections", 0)) 1257 row_cancelled = int(row.get("cancelled_connections", 0)) 1258 row_running = int(row.get("running_connections", 0)) 1259 row_unknown = int(row.get("unknown_connections", 0)) 1260 1261 total_connections += row_total 1262 enabled_connections += row_enabled 1263 active_connections += row_active 1264 pinned_connections += row_pinned 1265 unpinned_connections += row_unpinned 1266 total_succeeded += row_succeeded 1267 total_failed += row_failed 1268 total_cancelled += row_cancelled 1269 total_running += row_running 1270 total_unknown += row_unknown 1271 1272 by_version.append( 1273 VersionPinStats( 1274 pinned_version_id=str(version_id) if version_id else None, 1275 docker_image_tag=version_tags.get(str(version_id)) 1276 if version_id 1277 else None, 1278 total_connections=row_total, 1279 enabled_connections=row_enabled, 1280 active_connections=row_active, 1281 latest_attempt=LatestAttemptBreakdown( 1282 succeeded=row_succeeded, 1283 failed=row_failed, 1284 cancelled=row_cancelled, 1285 running=row_running, 1286 unknown=row_unknown, 1287 ), 1288 ) 1289 ) 1290 1291 return ConnectorConnectionStats( 1292 connector_definition_id=connector_definition_id, 1293 connector_type=connector_type, 1294 canonical_name=canonical_name, 1295 total_connections=total_connections, 1296 enabled_connections=enabled_connections, 1297 active_connections=active_connections, 1298 pinned_connections=pinned_connections, 1299 unpinned_connections=unpinned_connections, 1300 latest_attempt=LatestAttemptBreakdown( 1301 succeeded=total_succeeded, 1302 failed=total_failed, 1303 cancelled=total_cancelled, 1304 running=total_running, 1305 unknown=total_unknown, 1306 ), 1307 by_version=by_version, 1308 ) 1309 1310 1311@mcp_tool( 1312 read_only=True, 1313 idempotent=True, 1314 open_world=True, 1315) 1316def query_prod_connector_connection_stats( 1317 source_definition_ids: Annotated[ 1318 list[str] | None, 1319 Field( 1320 description=( 1321 "List of source connector definition IDs (UUIDs) to get stats for. " 1322 "Example: ['afa734e4-3571-11ec-991a-1e0031268139']" 1323 ), 1324 default=None, 1325 ), 1326 ] = None, 1327 destination_definition_ids: Annotated[ 1328 list[str] | None, 1329 Field( 1330 description=( 1331 "List of destination connector definition IDs (UUIDs) to get stats for. " 1332 "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']" 1333 ), 1334 default=None, 1335 ), 1336 ] = None, 1337 active_within_days: Annotated[ 1338 int, 1339 Field( 1340 description=( 1341 "Number of days to look back for 'active' connections (default: 7). " 1342 "Connections with sync activity within this window are counted as active." 1343 ), 1344 default=7, 1345 ), 1346 ] = 7, 1347) -> ConnectorConnectionStatsResponse: 1348 """Get aggregate connection stats for multiple connectors. 1349 1350 Returns counts of connections grouped by pinned version for each connector, 1351 including: 1352 - Total, enabled, and active connection counts 1353 - Pinned vs unpinned breakdown 1354 - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown) 1355 1356 This tool is designed for release monitoring workflows. It allows you to: 1357 1. Query recently released connectors to identify which ones to monitor 1358 2. Get aggregate stats showing how many connections are using each version 1359 3. See health metrics (pass/fail) broken down by version 1360 1361 The 'active_within_days' parameter controls the lookback window for: 1362 - Counting 'active' connections (those with recent sync activity) 1363 - Determining 'latest attempt status' (most recent attempt within the window) 1364 1365 Connections with no sync activity in the lookback window will have 1366 'unknown' status in the latest_attempt breakdown. 1367 """ 1368 # Initialize empty lists if None 1369 source_ids = source_definition_ids or [] 1370 destination_ids = destination_definition_ids or [] 1371 1372 if not source_ids and not destination_ids: 1373 raise PyAirbyteInputError( 1374 message=( 1375 "At least one of source_definition_ids or destination_definition_ids " 1376 "must be provided." 1377 ), 1378 ) 1379 1380 sources: list[ConnectorConnectionStats] = [] 1381 destinations: list[ConnectorConnectionStats] = [] 1382 1383 # Process source connectors 1384 for source_def_id in source_ids: 1385 # Get version info for tag lookup 1386 versions = query_connector_versions(source_def_id) 1387 version_tags = { 1388 str(v["version_id"]): v.get("docker_image_tag") for v in versions 1389 } 1390 1391 # Get aggregate stats 1392 rows = query_source_connection_stats(source_def_id, days=active_within_days) 1393 1394 sources.append( 1395 _build_connector_stats( 1396 connector_definition_id=source_def_id, 1397 connector_type="source", 1398 canonical_name=None, 1399 rows=rows, 1400 version_tags=version_tags, 1401 ) 1402 ) 1403 1404 # Process destination connectors 1405 for dest_def_id in destination_ids: 1406 # Get version info for tag lookup 1407 versions = query_connector_versions(dest_def_id) 1408 version_tags = { 1409 str(v["version_id"]): v.get("docker_image_tag") for v in versions 1410 } 1411 1412 # Get aggregate stats 1413 rows = query_destination_connection_stats(dest_def_id, days=active_within_days) 1414 1415 destinations.append( 1416 _build_connector_stats( 1417 connector_definition_id=dest_def_id, 1418 connector_type="destination", 1419 canonical_name=None, 1420 rows=rows, 1421 version_tags=version_tags, 1422 ) 1423 ) 1424 1425 return ConnectorConnectionStatsResponse( 1426 sources=sources, 1427 destinations=destinations, 1428 active_within_days=active_within_days, 1429 generated_at=datetime.now(timezone.utc), 1430 ) 1431 1432 1433# ============================================================================= 1434# Connector Rollout Models and Tools 1435# ============================================================================= 1436 1437 1438class ConnectorRolloutInfo(BaseModel): 1439 """Information about a connector rollout.""" 1440 1441 rollout_id: str = Field(description="The rollout UUID") 1442 actor_definition_id: str = Field(description="The connector definition UUID") 1443 state: str = Field( 1444 description="Rollout state: initialized, workflow_started, in_progress, " 1445 "paused, finalizing, succeeded, errored, failed_rolled_back, canceled" 1446 ) 1447 initial_rollout_pct: int | None = Field( 1448 default=None, description="Initial rollout percentage" 1449 ) 1450 current_target_rollout_pct: int | None = Field( 1451 default=None, description="Current target rollout percentage" 1452 ) 1453 final_target_rollout_pct: int | None = Field( 1454 default=None, description="Final target rollout percentage" 1455 ) 1456 has_breaking_changes: bool = Field( 1457 description="Whether the RC has breaking changes" 1458 ) 1459 max_step_wait_time_mins: int | None = Field( 1460 default=None, description="Maximum wait time between rollout steps in minutes" 1461 ) 1462 rollout_strategy: str | None = Field( 1463 default=None, description="Rollout strategy: manual, automated, overridden" 1464 ) 1465 workflow_run_id: str | None = Field( 1466 default=None, description="Temporal workflow run ID" 1467 ) 1468 error_msg: str | None = Field(default=None, description="Error message if errored") 1469 failed_reason: str | None = Field( 1470 default=None, description="Reason for failure if failed" 1471 ) 1472 paused_reason: str | None = Field( 1473 default=None, description="Reason for pause if paused" 1474 ) 1475 tag: str | None = Field(default=None, description="Optional tag for the rollout") 1476 created_at: datetime | None = Field( 1477 default=None, description="When the rollout was created" 1478 ) 1479 updated_at: datetime | None = Field( 1480 default=None, description="When the rollout was last updated" 1481 ) 1482 completed_at: datetime | None = Field( 1483 default=None, description="When the rollout completed (if terminal)" 1484 ) 1485 expires_at: datetime | None = Field( 1486 default=None, description="When the rollout expires" 1487 ) 1488 rc_docker_image_tag: str | None = Field( 1489 default=None, description="Docker image tag of the release candidate" 1490 ) 1491 rc_docker_repository: str | None = Field( 1492 default=None, description="Docker repository of the release candidate" 1493 ) 1494 initial_docker_image_tag: str | None = Field( 1495 default=None, description="Docker image tag of the initial version" 1496 ) 1497 initial_docker_repository: str | None = Field( 1498 default=None, description="Docker repository of the initial version" 1499 ) 1500 filters: dict[str, Any] | None = Field( 1501 default=None, 1502 description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})", 1503 ) 1504 customer_tier: str | None = Field( 1505 default=None, 1506 description="Customer tier targeted by this rollout (extracted from filters), " 1507 "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.", 1508 ) 1509 1510 1511def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None: 1512 """Parse the rollout filters field from a database row. 1513 1514 The filters field may be a JSON string, a dict, or None. 1515 """ 1516 if filters_raw is None: 1517 return None 1518 if isinstance(filters_raw, dict): 1519 return filters_raw 1520 if isinstance(filters_raw, str): 1521 try: 1522 parsed = json.loads(filters_raw) 1523 if isinstance(parsed, dict): 1524 return parsed 1525 except (json.JSONDecodeError, TypeError): 1526 pass 1527 return None 1528 1529 1530def _extract_tier_from_filters(filters_raw: Any) -> str | None: 1531 """Extract customer tier from rollout filters JSON. 1532 1533 Supports two formats: 1534 - Legacy: `{"tierFilter": {"tier": "TIER_0"}}` 1535 - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}` 1536 """ 1537 parsed = _parse_rollout_filters(filters_raw) 1538 if parsed is None: 1539 return None 1540 1541 # Current format: customerTierFilters list 1542 tier_filters = parsed.get("customerTierFilters") 1543 if isinstance(tier_filters, list): 1544 for entry in tier_filters: 1545 if isinstance(entry, dict) and entry.get("name") == "TIER": 1546 values = entry.get("value") 1547 if isinstance(values, list) and len(values) == 1: 1548 return str(values[0]) 1549 if isinstance(values, list) and len(values) > 1: 1550 return ", ".join(str(v) for v in values) 1551 1552 # Legacy format: tierFilter dict 1553 tier_filter = parsed.get("tierFilter") 1554 if isinstance(tier_filter, dict): 1555 tier = tier_filter.get("tier") 1556 if isinstance(tier, str): 1557 return tier 1558 1559 return None 1560 1561 1562def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo: 1563 """Convert a database row to a ConnectorRolloutInfo model.""" 1564 return ConnectorRolloutInfo( 1565 rollout_id=str(row["rollout_id"]), 1566 actor_definition_id=str(row["actor_definition_id"]), 1567 state=row["state"], 1568 initial_rollout_pct=row.get("initial_rollout_pct"), 1569 current_target_rollout_pct=row.get("current_target_rollout_pct"), 1570 final_target_rollout_pct=row.get("final_target_rollout_pct"), 1571 has_breaking_changes=row["has_breaking_changes"], 1572 max_step_wait_time_mins=row.get("max_step_wait_time_mins"), 1573 rollout_strategy=row.get("rollout_strategy"), 1574 workflow_run_id=row.get("workflow_run_id"), 1575 error_msg=row.get("error_msg"), 1576 failed_reason=row.get("failed_reason"), 1577 paused_reason=row.get("paused_reason"), 1578 tag=row.get("tag"), 1579 created_at=row.get("created_at"), 1580 updated_at=row.get("updated_at"), 1581 completed_at=row.get("completed_at"), 1582 expires_at=row.get("expires_at"), 1583 rc_docker_image_tag=row.get("rc_docker_image_tag"), 1584 rc_docker_repository=row.get("rc_docker_repository"), 1585 initial_docker_image_tag=row.get("initial_docker_image_tag"), 1586 initial_docker_repository=row.get("initial_docker_repository"), 1587 filters=_parse_rollout_filters(row.get("filters")), 1588 customer_tier=_extract_tier_from_filters(row.get("filters")), 1589 ) 1590 1591 1592@mcp_tool( 1593 read_only=True, 1594 idempotent=True, 1595) 1596def query_prod_connector_rollouts( 1597 actor_definition_id: Annotated[ 1598 str | None, 1599 Field(description="Connector definition UUID to filter by (optional)"), 1600 ] = None, 1601 rollout_id: Annotated[ 1602 str | None, 1603 Field(description="Specific rollout UUID to look up (optional)"), 1604 ] = None, 1605 active_only: Annotated[ 1606 bool, 1607 Field(description="If true, only return active (non-terminal) rollouts"), 1608 ] = False, 1609 limit: Annotated[ 1610 int, 1611 Field(description="Maximum number of results (default: 100)"), 1612 ] = 100, 1613) -> list[ConnectorRolloutInfo]: 1614 """Query connector rollouts with flexible filtering. 1615 1616 Returns rollouts based on the provided filters. If no filters are specified, 1617 returns all active rollouts. Useful for monitoring rollout status and history. 1618 1619 Filter behavior: 1620 - rollout_id: Returns that specific rollout (ignores other filters) 1621 - active_only: Returns only active (non-terminal) rollouts 1622 - actor_definition_id: Returns rollouts for that specific connector 1623 - No filters: Returns all active rollouts (same as active_only=True) 1624 """ 1625 rows = query_connector_rollouts( 1626 actor_definition_id=actor_definition_id, 1627 rollout_id=rollout_id, 1628 active_only=active_only, 1629 limit=limit, 1630 ) 1631 return [_row_to_connector_rollout_info(row) for row in rows] 1632 1633 1634def register_prod_db_query_tools(app: FastMCP) -> None: 1635 """Register prod DB query tools with the FastMCP app.""" 1636 register_mcp_tools(app, mcp_module=__name__)