airbyte_ops_mcp.mcp.prod_db_queries

MCP tools for querying the Airbyte Cloud Prod DB Replica.

This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.

MCP reference

MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 13 tool(s), 0 prompt(s), 0 resource(s).

Tools (13)

query_prod_actors_by_pinned_connector_version

Hints: read-only · idempotent

List actors (sources/destinations) effectively pinned to a specific connector version.

Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find pinned instances for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find pinned instances for",
      "type": "string"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_connector

Hints: read-only · idempotent · open-world

Search for all connections using a specific source or destination connector type.

This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.

Set exclude_pinned=True to filter out connections that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 1000 Maximum number of results (default: 1000)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 1000,
      "description": "Maximum number of results (default: 1000)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_stream

Hints: read-only · idempotent · open-world

Find connections that have a specific stream enabled in their catalog.

This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use cases:

  • Finding connections with a specific stream enabled for regression testing
  • Validating connector fixes that affect particular streams
  • Identifying which customers use rarely-enabled streams

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.

Parameters:

Name Type Required Default Description
stream_name string yes Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "stream_name": {
      "description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
      "type": "string"
    },
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_connection_stats

Hints: read-only · idempotent · open-world

Get aggregate connection stats for multiple connectors.

Returns counts of connections grouped by pinned version for each connector, including:

  • Total, enabled, and active connection counts
  • Pinned vs unpinned breakdown
  • Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)

This tool is designed for release monitoring workflows. It allows you to:

  1. Query recently released connectors to identify which ones to monitor
  2. Get aggregate stats showing how many connections are using each version
  3. See health metrics (pass/fail) broken down by version

The 'active_within_days' parameter controls the lookback window for:

  • Counting 'active' connections (those with recent sync activity)
  • Determining 'latest attempt status' (most recent attempt within the window)

Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.

Parameters:

Name Type Required Default Description
source_definition_ids array<string> | null no null List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']
destination_definition_ids array<string> | null no null List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']
active_within_days integer no 7 Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
    },
    "destination_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
    },
    "active_within_days": {
      "default": 7,
      "description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Response containing connection stats for multiple connectors.",
  "properties": {
    "sources": {
      "description": "Stats for source connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "destinations": {
      "description": "Stats for destination connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "active_within_days": {
      "description": "Lookback window used for 'active' connections",
      "type": "integer"
    },
    "generated_at": {
      "description": "When this response was generated",
      "format": "date-time",
      "type": "string"
    }
  },
  "required": [
    "active_within_days",
    "generated_at"
  ],
  "type": "object"
}

query_prod_connector_rollouts

Hints: read-only · idempotent

Query connector rollouts with flexible filtering.

Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.

Filter behavior:

  • rollout_id: Returns that specific rollout (ignores other filters)
  • active_only: Returns only active (non-terminal) rollouts
  • actor_definition_id: Returns rollouts for that specific connector
  • No filters: Returns all active rollouts (same as active_only=True)

Parameters:

Name Type Required Default Description
actor_definition_id string | null no null Connector definition UUID to filter by (optional)
rollout_id string | null no null Specific rollout UUID to look up (optional)
active_only boolean no false If true, only return active (non-terminal) rollouts
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "actor_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector definition UUID to filter by (optional)"
    },
    "rollout_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Specific rollout UUID to look up (optional)"
    },
    "active_only": {
      "default": false,
      "description": "If true, only return active (non-terminal) rollouts",
      "type": "boolean"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a connector rollout.",
        "properties": {
          "rollout_id": {
            "description": "The rollout UUID",
            "type": "string"
          },
          "actor_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "state": {
            "description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
            "type": "string"
          },
          "initial_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Initial rollout percentage"
          },
          "current_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Current target rollout percentage"
          },
          "final_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Final target rollout percentage"
          },
          "has_breaking_changes": {
            "description": "Whether the RC has breaking changes",
            "type": "boolean"
          },
          "max_step_wait_time_mins": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Maximum wait time between rollout steps in minutes"
          },
          "rollout_strategy": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Rollout strategy: manual, automated, overridden"
          },
          "workflow_run_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Temporal workflow run ID"
          },
          "error_msg": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Error message if errored"
          },
          "failed_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for failure if failed"
          },
          "paused_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for pause if paused"
          },
          "tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Optional tag for the rollout"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was created"
          },
          "updated_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was last updated"
          },
          "completed_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout completed (if terminal)"
          },
          "expires_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout expires"
          },
          "rc_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the release candidate"
          },
          "rc_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the release candidate"
          },
          "initial_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the initial version"
          },
          "initial_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the initial version"
          },
          "filters": {
            "anyOf": [
              {
                "additionalProperties": true,
                "type": "object"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
          }
        },
        "required": [
          "rollout_id",
          "actor_definition_id",
          "state",
          "has_breaking_changes"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_versions

Hints: read-only · idempotent

List all versions for a connector definition.

Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.

Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date

Parameters:

Name Type Required Default Description
connector_definition_id string yes Connector definition UUID to list versions for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_definition_id": {
      "description": "Connector definition UUID to list versions for",
      "type": "string"
    }
  },
  "required": [
    "connector_definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_dataplanes

Hints: read-only · idempotent

List all dataplane groups with workspace counts.

Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).

Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count

Parameters:

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_failed_sync_attempts_for_connector

Hints: read-only · idempotent · open-world

List failed sync attempts for ALL actors using a source connector type.

This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.

Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.

Key fields in results:

  • failure_summary: JSON containing failure details including failureType and messages
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_new_connector_releases

Hints: read-only · idempotent

List recently published connector versions.

Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).

Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at

Parameters:

Name Type Required Default Description
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_connector

Hints: read-only · idempotent · open-world

List recent sync jobs for ALL actors using a connector type.

This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use this tool to:

  • Find healthy connections with recent successful syncs (status_filter='succeeded')
  • Investigate connector issues across all users (status_filter='failed')
  • Get an overview of all recent sync activity (status_filter='all')

Set exclude_pinned=True to filter out syncs for actors that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.

Key fields in results:

  • job_status: 'succeeded', 'failed', 'cancelled', etc.
  • connection_id, connection_name: The connection that ran the sync
  • actor_id, actor_name: The source or destination actor
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'.
status_filter enum("all", "succeeded", "failed") no "all" Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
    },
    "status_filter": {
      "description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
      "enum": [
        "all",
        "succeeded",
        "failed"
      ],
      "type": "string",
      "default": "all"
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_version_pinned_connector

Hints: read-only · idempotent

List sync job results for actors PINNED to a specific connector version.

IMPORTANT: This tool ONLY returns results for actors that are effectively pinned to the specified version via scoped_configuration at any scope level (actor, workspace, or organization). Most connections run unpinned and will NOT appear in these results.

Use this tool when you want to monitor rollout health for actors that have been pinned to a pre-release or specific version. For finding healthy connections across ALL actors using a connector type (regardless of pinning), use query_prod_recent_syncs_for_connector instead.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: job_id, connection_id, job_status, started_at, job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find sync results for
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
successful_only boolean no false If True, only return successful syncs (default: False)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find sync results for",
      "type": "string"
    },
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "successful_only": {
      "default": false,
      "description": "If True, only return successful syncs (default: False)",
      "type": "boolean"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspace_info

Hints: read-only · idempotent

Get workspace information including dataplane group.

Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.

Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    }
  },
  "required": [
    "workspace_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspaces_by_email_domain

Hints: read-only · idempotent

Find workspaces by email domain.

This tool searches for workspaces where users have email addresses matching the specified domain. This is useful for identifying workspaces belonging to specific companies - for example, searching for "motherduck.com" will find workspaces belonging to MotherDuck employees.

Use cases:

  • Finding partner organization connections for testing connector fixes
  • Identifying internal test accounts for specific integrations
  • Locating workspaces belonging to technology partners

The returned organization IDs can be used with other tools like query_prod_connections_by_connector to find connections within those organizations for safe testing.

Parameters:

Name Type Required Default Description
email_domain string yes Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.
limit integer no 100 Maximum number of workspaces to return (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "email_domain": {
      "description": "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.",
      "type": "string"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of workspaces to return (default: 100)",
      "type": "integer"
    }
  },
  "required": [
    "email_domain"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of looking up workspaces by email domain.",
  "properties": {
    "email_domain": {
      "description": "The email domain that was searched for (e.g., 'motherduck.com')",
      "type": "string"
    },
    "total_workspaces_found": {
      "description": "Total number of workspaces matching the email domain",
      "type": "integer"
    },
    "unique_organization_ids": {
      "description": "List of unique organization IDs found",
      "items": {
        "type": "string"
      },
      "type": "array"
    },
    "workspaces": {
      "description": "List of workspaces matching the email domain",
      "items": {
        "description": "Information about a workspace found by email domain search.",
        "properties": {
          "organization_id": {
            "description": "The organization UUID",
            "type": "string"
          },
          "workspace_id": {
            "description": "The workspace UUID",
            "type": "string"
          },
          "workspace_name": {
            "description": "The name of the workspace",
            "type": "string"
          },
          "slug": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The workspace slug (URL-friendly identifier)"
          },
          "email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The email address associated with the workspace"
          },
          "dataplane_group_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The dataplane group UUID (region)"
          },
          "dataplane_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The name of the dataplane (e.g., 'US', 'EU')"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the workspace was created"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
          },
          "is_eu": {
            "anyOf": [
              {
                "type": "boolean"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Whether the workspace is in the EU region (derived from dataplane_name)."
          }
        },
        "required": [
          "organization_id",
          "workspace_id",
          "workspace_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "email_domain",
    "total_workspaces_found",
    "unique_organization_ids",
    "workspaces"
  ],
  "type": "object"
}

   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""MCP tools for querying the Airbyte Cloud Prod DB Replica.
   3
   4This module provides MCP tools that wrap the query functions from
   5airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
   6
   7## MCP reference
   8
   9.. include:: ../../../docs/mcp-generated/prod_db_queries.md
  10    :start-line: 2
  11"""
  12
  13from __future__ import annotations
  14
  15__all__: list[str] = []
  16
  17import json
  18from datetime import datetime, timezone
  19from enum import StrEnum
  20from typing import Annotated, Any
  21
  22import requests
  23from airbyte.exceptions import PyAirbyteInputError
  24from fastmcp import FastMCP
  25from fastmcp_extensions import mcp_tool, register_mcp_tools
  26from pydantic import BaseModel, Field
  27
  28from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum
  29from airbyte_ops_mcp.prod_db_access.queries import (
  30    query_actors_pinned_to_version,
  31    query_connections_by_connector,
  32    query_connections_by_destination_connector,
  33    query_connections_by_stream,
  34    query_connector_rollouts,
  35    query_connector_versions,
  36    query_dataplanes_list,
  37    query_destination_connection_stats,
  38    query_failed_sync_attempts_for_connector,
  39    query_new_connector_releases,
  40    query_recent_syncs_for_connector,
  41    query_source_connection_stats,
  42    query_syncs_for_version_pinned_connector,
  43    query_workspace_info,
  44    query_workspaces_by_email_domain,
  45)
  46from airbyte_ops_mcp.tier_cache import (
  47    TierFilter,
  48    enrich_rows_by_org,
  49    filter_rows_by_tier,
  50    get_org_tiers,
  51)
  52
  53
  54class StatusFilter(StrEnum):
  55    """Filter for job status in sync queries."""
  56
  57    ALL = "all"
  58    SUCCEEDED = "succeeded"
  59    FAILED = "failed"
  60
  61
  62# Cloud UI base URL for building connection URLs
  63CLOUD_UI_BASE_URL = "https://cloud.airbyte.com"
  64
  65
  66# =============================================================================
  67# Pydantic Models for MCP Tool Responses
  68# =============================================================================
  69
  70
  71class WorkspaceInfo(BaseModel):
  72    """Information about a workspace found by email domain search."""
  73
  74    organization_id: str = Field(description="The organization UUID")
  75    workspace_id: str = Field(description="The workspace UUID")
  76    workspace_name: str = Field(description="The name of the workspace")
  77    slug: str | None = Field(
  78        default=None, description="The workspace slug (URL-friendly identifier)"
  79    )
  80    email: str | None = Field(
  81        default=None, description="The email address associated with the workspace"
  82    )
  83    dataplane_group_id: str | None = Field(
  84        default=None, description="The dataplane group UUID (region)"
  85    )
  86    dataplane_name: str | None = Field(
  87        default=None, description="The name of the dataplane (e.g., 'US', 'EU')"
  88    )
  89    created_at: datetime | None = Field(
  90        default=None, description="When the workspace was created"
  91    )
  92    customer_tier: str | None = Field(
  93        default=None,
  94        description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.",
  95    )
  96    is_eu: bool | None = Field(
  97        default=None,
  98        description="Whether the workspace is in the EU region (derived from dataplane_name).",
  99    )
 100
 101
 102class WorkspacesByEmailDomainResult(BaseModel):
 103    """Result of looking up workspaces by email domain."""
 104
 105    email_domain: str = Field(
 106        description="The email domain that was searched for (e.g., 'motherduck.com')"
 107    )
 108    total_workspaces_found: int = Field(
 109        description="Total number of workspaces matching the email domain"
 110    )
 111    unique_organization_ids: list[str] = Field(
 112        description="List of unique organization IDs found"
 113    )
 114    workspaces: list[WorkspaceInfo] = Field(
 115        description="List of workspaces matching the email domain"
 116    )
 117
 118
 119class LatestAttemptBreakdown(BaseModel):
 120    """Breakdown of connections by latest attempt status."""
 121
 122    succeeded: int = Field(
 123        default=0, description="Connections where latest attempt succeeded"
 124    )
 125    failed: int = Field(
 126        default=0, description="Connections where latest attempt failed"
 127    )
 128    cancelled: int = Field(
 129        default=0, description="Connections where latest attempt was cancelled"
 130    )
 131    running: int = Field(
 132        default=0, description="Connections where latest attempt is still running"
 133    )
 134    unknown: int = Field(
 135        default=0,
 136        description="Connections with no recent attempts in the lookback window",
 137    )
 138
 139
 140class VersionPinStats(BaseModel):
 141    """Stats for connections pinned to a specific version."""
 142
 143    pinned_version_id: str | None = Field(
 144        description="The connector version UUID (None for unpinned connections)"
 145    )
 146    docker_image_tag: str | None = Field(
 147        default=None, description="The docker image tag for this version"
 148    )
 149    total_connections: int = Field(description="Total number of connections")
 150    enabled_connections: int = Field(
 151        description="Number of enabled (active status) connections"
 152    )
 153    active_connections: int = Field(
 154        description="Number of connections with recent sync activity"
 155    )
 156    latest_attempt: LatestAttemptBreakdown = Field(
 157        description="Breakdown by latest attempt status"
 158    )
 159
 160
 161class ConnectorConnectionStats(BaseModel):
 162    """Aggregate connection stats for a connector."""
 163
 164    connector_definition_id: str = Field(description="The connector definition UUID")
 165    connector_type: str = Field(description="'source' or 'destination'")
 166    canonical_name: str | None = Field(
 167        default=None, description="The canonical connector name if resolved"
 168    )
 169    total_connections: int = Field(
 170        description="Total number of non-deprecated connections"
 171    )
 172    enabled_connections: int = Field(
 173        description="Number of enabled (active status) connections"
 174    )
 175    active_connections: int = Field(
 176        description="Number of connections with recent sync activity"
 177    )
 178    pinned_connections: int = Field(
 179        description="Number of connections with explicit version pins"
 180    )
 181    unpinned_connections: int = Field(
 182        description="Number of connections on default version"
 183    )
 184    latest_attempt: LatestAttemptBreakdown = Field(
 185        description="Overall breakdown by latest attempt status"
 186    )
 187    by_version: list[VersionPinStats] = Field(
 188        description="Stats broken down by pinned version"
 189    )
 190
 191
 192class ConnectorConnectionStatsResponse(BaseModel):
 193    """Response containing connection stats for multiple connectors."""
 194
 195    sources: list[ConnectorConnectionStats] = Field(
 196        default_factory=list, description="Stats for source connectors"
 197    )
 198    destinations: list[ConnectorConnectionStats] = Field(
 199        default_factory=list, description="Stats for destination connectors"
 200    )
 201    active_within_days: int = Field(
 202        description="Lookback window used for 'active' connections"
 203    )
 204    generated_at: datetime = Field(description="When this response was generated")
 205
 206
 207def _opt_str(value: Any) -> str | None:
 208    """Convert a nullable value to str, returning None if the value is None/falsy."""
 209    return str(value) if value else None
 210
 211
 212# Cloud registry URL for resolving canonical names
 213CLOUD_REGISTRY_URL = (
 214    "https://connectors.airbyte.com/files/registries/v0/cloud_registry.json"
 215)
 216
 217
 218def _resolve_canonical_name_to_definition_id(canonical_name: str) -> str:
 219    """Resolve a canonical connector name to a definition ID.
 220
 221    Auto-detects whether the connector is a source or destination based on the
 222    canonical name prefix ("source-" or "destination-"). If no prefix is present,
 223    searches both sources and destinations.
 224
 225    Args:
 226        canonical_name: Canonical connector name (e.g., 'source-youtube-analytics',
 227            'destination-duckdb', 'YouTube Analytics', 'DuckDB').
 228
 229    Returns:
 230        The connector definition ID (UUID).
 231
 232    Raises:
 233        PyAirbyteInputError: If the canonical name cannot be resolved.
 234    """
 235    response = requests.get(CLOUD_REGISTRY_URL, timeout=60)
 236
 237    if response.status_code != 200:
 238        raise PyAirbyteInputError(
 239            message=f"Failed to fetch connector registry: {response.status_code}",
 240            context={"response": response.text},
 241        )
 242
 243    data = response.json()
 244    normalized_input = canonical_name.lower().strip()
 245
 246    # Determine which registries to search based on prefix
 247    is_source = normalized_input.startswith("source-")
 248    is_destination = normalized_input.startswith("destination-")
 249
 250    # Search sources if it looks like a source or has no prefix
 251    if is_source or not is_destination:
 252        sources = data.get("sources", [])
 253        for source in sources:
 254            source_name = source.get("name", "").lower()
 255            if source_name == normalized_input:
 256                return source["sourceDefinitionId"]
 257            slugified = source_name.replace(" ", "-")
 258            if (
 259                slugified == normalized_input
 260                or f"source-{slugified}" == normalized_input
 261            ):
 262                return source["sourceDefinitionId"]
 263
 264    # Search destinations if it looks like a destination or has no prefix
 265    if is_destination or not is_source:
 266        destinations = data.get("destinations", [])
 267        for destination in destinations:
 268            destination_name = destination.get("name", "").lower()
 269            if destination_name == normalized_input:
 270                return destination["destinationDefinitionId"]
 271            slugified = destination_name.replace(" ", "-")
 272            if (
 273                slugified == normalized_input
 274                or f"destination-{slugified}" == normalized_input
 275            ):
 276                return destination["destinationDefinitionId"]
 277
 278    # Build appropriate error message based on what was searched
 279    if is_source:
 280        connector_type = "source"
 281        hint = (
 282            "Use the exact canonical name (e.g., 'source-youtube-analytics') "
 283            "or display name (e.g., 'YouTube Analytics')."
 284        )
 285    elif is_destination:
 286        connector_type = "destination"
 287        hint = (
 288            "Use the exact canonical name (e.g., 'destination-duckdb') "
 289            "or display name (e.g., 'DuckDB')."
 290        )
 291    else:
 292        connector_type = "connector"
 293        hint = (
 294            "Use the exact canonical name (e.g., 'source-youtube-analytics', "
 295            "'destination-duckdb') or display name (e.g., 'YouTube Analytics', 'DuckDB')."
 296        )
 297
 298    raise PyAirbyteInputError(
 299        message=f"Could not find {connector_type} definition for canonical name: {canonical_name}",
 300        context={
 301            "hint": hint
 302            + " You can list available connectors using the connector registry tools.",
 303            "searched_for": canonical_name,
 304        },
 305    )
 306
 307
 308@mcp_tool(
 309    read_only=True,
 310    idempotent=True,
 311)
 312def query_prod_dataplanes() -> list[dict[str, Any]]:
 313    """List all dataplane groups with workspace counts.
 314
 315    Returns information about all active dataplane groups in Airbyte Cloud,
 316    including the number of workspaces in each. Useful for understanding
 317    the distribution of workspaces across regions (US, US-Central, EU).
 318
 319    Returns list of dicts with keys: dataplane_group_id, dataplane_name,
 320    organization_id, enabled, tombstone, created_at, workspace_count
 321    """
 322    return query_dataplanes_list()
 323
 324
 325@mcp_tool(
 326    read_only=True,
 327    idempotent=True,
 328)
 329def query_prod_workspace_info(
 330    workspace_id: Annotated[
 331        str | WorkspaceAliasEnum,
 332        Field(
 333            description="Workspace UUID or alias to look up. "
 334            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 335        ),
 336    ],
 337) -> dict[str, Any] | None:
 338    """Get workspace information including dataplane group.
 339
 340    Returns details about a specific workspace, including which dataplane
 341    (region) it belongs to. Useful for determining if a workspace is in
 342    the EU region for filtering purposes.
 343
 344    Returns dict with keys: workspace_id, workspace_name, slug, organization_id,
 345    dataplane_group_id, dataplane_name, created_at, tombstone
 346    Or None if workspace not found.
 347    """
 348    # Resolve workspace ID alias (workspace_id is required, so resolved value is never None)
 349    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 350    assert resolved_workspace_id is not None  # Type narrowing: workspace_id is required
 351
 352    return query_workspace_info(resolved_workspace_id)
 353
 354
 355@mcp_tool(
 356    read_only=True,
 357    idempotent=True,
 358)
 359def query_prod_connector_versions(
 360    connector_definition_id: Annotated[
 361        str,
 362        Field(description="Connector definition UUID to list versions for"),
 363    ],
 364) -> list[dict[str, Any]]:
 365    """List all versions for a connector definition.
 366
 367    Returns all published versions of a connector, ordered by last_published
 368    date descending. Useful for understanding version history and finding
 369    specific version IDs for pinning or rollout monitoring.
 370
 371    Returns list of dicts with keys: version_id, docker_image_tag, docker_repository,
 372    release_stage, support_level, cdk_version, language, last_published, release_date
 373    """
 374    return query_connector_versions(connector_definition_id)
 375
 376
 377@mcp_tool(
 378    read_only=True,
 379    idempotent=True,
 380)
 381def query_prod_new_connector_releases(
 382    days: Annotated[
 383        int,
 384        Field(description="Number of days to look back (default: 7)", default=7),
 385    ] = 7,
 386    limit: Annotated[
 387        int,
 388        Field(description="Maximum number of results (default: 100)", default=100),
 389    ] = 100,
 390) -> list[dict[str, Any]]:
 391    """List recently published connector versions.
 392
 393    Returns connector versions published within the specified number of days.
 394    Uses last_published timestamp which reflects when the version was actually
 395    deployed to the registry (not the changelog date).
 396
 397    Returns list of dicts with keys: version_id, connector_definition_id, docker_repository,
 398    docker_image_tag, last_published, release_date, release_stage, support_level,
 399    cdk_version, language, created_at
 400    """
 401    return query_new_connector_releases(days=days, limit=limit)
 402
 403
 404@mcp_tool(
 405    read_only=True,
 406    idempotent=True,
 407)
 408def query_prod_actors_by_pinned_connector_version(
 409    connector_version_id: Annotated[
 410        str,
 411        Field(description="Connector version UUID to find pinned instances for"),
 412    ],
 413) -> list[dict[str, Any]]:
 414    """List actors (sources/destinations) effectively pinned to a specific connector version.
 415
 416    Returns all actors that are effectively pinned to a specific connector version,
 417    considering all scope levels: actor-level pins, workspace-level pins, and
 418    organization-level pins (with actor > workspace > organization precedence).
 419    Useful for monitoring rollouts and understanding which customers are affected.
 420
 421    The actor_id field is the actor ID (superset of source_id/destination_id).
 422
 423    Returns list of dicts with keys: actor_id, connector_definition_id, origin_type,
 424    origin, description, created_at, expires_at, pin_scope_type, actor_name,
 425    workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
 426
 427    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 428    level the effective pin came from.
 429    """
 430    return query_actors_pinned_to_version(connector_version_id)
 431
 432
 433@mcp_tool(
 434    read_only=True,
 435    idempotent=True,
 436)
 437def query_prod_recent_syncs_for_version_pinned_connector(
 438    connector_version_id: Annotated[
 439        str,
 440        Field(description="Connector version UUID to find sync results for"),
 441    ],
 442    days: Annotated[
 443        int,
 444        Field(description="Number of days to look back (default: 7)", default=7),
 445    ] = 7,
 446    limit: Annotated[
 447        int,
 448        Field(description="Maximum number of results (default: 100)", default=100),
 449    ] = 100,
 450    successful_only: Annotated[
 451        bool,
 452        Field(
 453            description="If True, only return successful syncs (default: False)",
 454            default=False,
 455        ),
 456    ] = False,
 457) -> list[dict[str, Any]]:
 458    """List sync job results for actors PINNED to a specific connector version.
 459
 460    IMPORTANT: This tool ONLY returns results for actors that are effectively
 461    pinned to the specified version via scoped_configuration at any scope level
 462    (actor, workspace, or organization). Most connections run unpinned and will
 463    NOT appear in these results.
 464
 465    Use this tool when you want to monitor rollout health for actors that have been
 466    pinned to a pre-release or specific version. For finding healthy connections
 467    across ALL actors using a connector type (regardless of pinning),
 468    use query_prod_recent_syncs_for_connector instead.
 469
 470    The actor_id field is the actor ID (superset of source_id/destination_id).
 471
 472    Returns list of dicts with keys: job_id, connection_id, job_status, started_at,
 473    job_updated_at, connection_name, actor_id, actor_name, connector_definition_id,
 474    pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name,
 475    organization_id, dataplane_group_id, dataplane_name
 476
 477    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 478    level the effective pin came from.
 479    """
 480    return query_syncs_for_version_pinned_connector(
 481        connector_version_id,
 482        days=days,
 483        limit=limit,
 484        successful_only=successful_only,
 485    )
 486
 487
 488@mcp_tool(
 489    read_only=True,
 490    idempotent=True,
 491    open_world=True,
 492)
 493def query_prod_recent_syncs_for_connector(
 494    source_definition_id: Annotated[
 495        str | None,
 496        Field(
 497            description=(
 498                "Source connector definition ID (UUID) to search for. "
 499                "Provide this OR source_canonical_name OR destination_definition_id "
 500                "OR destination_canonical_name (exactly one required). "
 501                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 502            ),
 503            default=None,
 504        ),
 505    ],
 506    source_canonical_name: Annotated[
 507        str | None,
 508        Field(
 509            description=(
 510                "Canonical source connector name to search for. "
 511                "Provide this OR source_definition_id OR destination_definition_id "
 512                "OR destination_canonical_name (exactly one required). "
 513                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 514            ),
 515            default=None,
 516        ),
 517    ],
 518    destination_definition_id: Annotated[
 519        str | None,
 520        Field(
 521            description=(
 522                "Destination connector definition ID (UUID) to search for. "
 523                "Provide this OR destination_canonical_name OR source_definition_id "
 524                "OR source_canonical_name (exactly one required). "
 525                "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
 526            ),
 527            default=None,
 528        ),
 529    ],
 530    destination_canonical_name: Annotated[
 531        str | None,
 532        Field(
 533            description=(
 534                "Canonical destination connector name to search for. "
 535                "Provide this OR destination_definition_id OR source_definition_id "
 536                "OR source_canonical_name (exactly one required). "
 537                "Examples: 'destination-duckdb', 'DuckDB'."
 538            ),
 539            default=None,
 540        ),
 541    ],
 542    status_filter: Annotated[
 543        StatusFilter,
 544        Field(
 545            description=(
 546                "Filter by job status: 'all' (default), 'succeeded', or 'failed'. "
 547                "Use 'succeeded' to find healthy connections with recent successful syncs. "
 548                "Use 'failed' to find connections with recent failures."
 549            ),
 550            default=StatusFilter.ALL,
 551        ),
 552    ],
 553    organization_id: Annotated[
 554        str | OrganizationAliasEnum | None,
 555        Field(
 556            description=(
 557                "Optional organization ID (UUID) or alias to filter results. "
 558                "If provided, only syncs from this organization will be returned. "
 559                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 560            ),
 561            default=None,
 562        ),
 563    ],
 564    lookback_days: Annotated[
 565        int,
 566        Field(description="Number of days to look back (default: 7)", default=7),
 567    ],
 568    limit: Annotated[
 569        int,
 570        Field(description="Maximum number of results (default: 100)", default=100),
 571    ],
 572    customer_tier_filter: Annotated[
 573        TierFilter,
 574        Field(
 575            description=(
 576                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 577                "Filters results to only include connections belonging to organizations "
 578                "in the specified tier. Use 'ALL' to include all tiers."
 579            ),
 580        ),
 581    ] = "TIER_2",
 582    *,
 583    exclude_pinned: Annotated[
 584        bool,
 585        Field(
 586            description=(
 587                "If True, exclude syncs for actors that are already pinned to a "
 588                "specific version (at any scope level: actor, workspace, or organization). "
 589                "Useful for 'prove fix' workflows where you want to find unpinned "
 590                "connections for live testing. Default: False (include all syncs)."
 591            ),
 592            default=False,
 593        ),
 594    ],
 595) -> list[dict[str, Any]]:
 596    """List recent sync jobs for ALL actors using a connector type.
 597
 598    This tool finds all actors with the given connector definition and returns their
 599    recent sync jobs, regardless of whether they have explicit version pins. It filters
 600    out deleted actors, deleted workspaces, and deprecated connections.
 601
 602    Results are always enriched with customer_tier and is_eu fields.
 603    The customer_tier_filter parameter is required to ensure tier-aware querying.
 604
 605    Use this tool to:
 606    - Find healthy connections with recent successful syncs (status_filter='succeeded')
 607    - Investigate connector issues across all users (status_filter='failed')
 608    - Get an overview of all recent sync activity (status_filter='all')
 609
 610    Set exclude_pinned=True to filter out syncs for actors that are already pinned to a
 611    specific version. This is useful for 'prove fix' live connection testing workflows
 612    where you want to find unpinned connections to test against.
 613
 614    Supports both SOURCE and DESTINATION connectors. Provide exactly one of:
 615    source_definition_id, source_canonical_name, destination_definition_id,
 616    or destination_canonical_name.
 617
 618    Key fields in results:
 619    - job_status: 'succeeded', 'failed', 'cancelled', etc.
 620    - connection_id, connection_name: The connection that ran the sync
 621    - actor_id, actor_name: The source or destination actor
 622    - customer_tier: TIER_0, TIER_1, or TIER_2
 623    - is_eu: Whether the workspace is in the EU region
 624    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 625    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 626    """
 627    # Validate that exactly one connector parameter is provided
 628    provided_params = [
 629        source_definition_id,
 630        source_canonical_name,
 631        destination_definition_id,
 632        destination_canonical_name,
 633    ]
 634    num_provided = sum(p is not None for p in provided_params)
 635    if num_provided != 1:
 636        raise PyAirbyteInputError(
 637            message=(
 638                "Exactly one of source_definition_id, source_canonical_name, "
 639                "destination_definition_id, or destination_canonical_name must be provided."
 640            ),
 641        )
 642
 643    # Determine if this is a destination connector
 644    is_destination = (
 645        destination_definition_id is not None or destination_canonical_name is not None
 646    )
 647
 648    # Resolve canonical name to definition ID if needed
 649    resolved_definition_id: str
 650    if source_canonical_name:
 651        resolved_definition_id = _resolve_canonical_name_to_definition_id(
 652            canonical_name=source_canonical_name,
 653        )
 654    elif destination_canonical_name:
 655        resolved_definition_id = _resolve_canonical_name_to_definition_id(
 656            canonical_name=destination_canonical_name,
 657        )
 658    elif source_definition_id:
 659        resolved_definition_id = source_definition_id
 660    else:
 661        # We've validated exactly one param is provided, so this must be set
 662        assert destination_definition_id is not None
 663        resolved_definition_id = destination_definition_id
 664
 665    # Resolve organization ID alias
 666    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 667
 668    rows = query_recent_syncs_for_connector(
 669        connector_definition_id=resolved_definition_id,
 670        is_destination=is_destination,
 671        status_filter=status_filter,
 672        organization_id=resolved_organization_id,
 673        days=lookback_days,
 674        limit=limit,
 675        exclude_pinned=exclude_pinned,
 676    )
 677
 678    enriched = enrich_rows_by_org(rows)
 679    return filter_rows_by_tier(enriched, customer_tier_filter)
 680
 681
 682@mcp_tool(
 683    read_only=True,
 684    idempotent=True,
 685    open_world=True,
 686)
 687def query_prod_failed_sync_attempts_for_connector(
 688    source_definition_id: Annotated[
 689        str | None,
 690        Field(
 691            description=(
 692                "Source connector definition ID (UUID) to search for. "
 693                "Exactly one of this or source_canonical_name is required. "
 694                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 695            ),
 696            default=None,
 697        ),
 698    ] = None,
 699    source_canonical_name: Annotated[
 700        str | None,
 701        Field(
 702            description=(
 703                "Canonical source connector name to search for. "
 704                "Exactly one of this or source_definition_id is required. "
 705                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 706            ),
 707            default=None,
 708        ),
 709    ] = None,
 710    organization_id: Annotated[
 711        str | OrganizationAliasEnum | None,
 712        Field(
 713            description=(
 714                "Optional organization ID (UUID) or alias to filter results. "
 715                "If provided, only failed attempts from this organization will be returned. "
 716                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 717            ),
 718            default=None,
 719        ),
 720    ] = None,
 721    days: Annotated[
 722        int,
 723        Field(description="Number of days to look back (default: 7)", default=7),
 724    ] = 7,
 725    limit: Annotated[
 726        int,
 727        Field(description="Maximum number of results (default: 100)", default=100),
 728    ] = 100,
 729    customer_tier_filter: Annotated[
 730        TierFilter,
 731        Field(
 732            description=(
 733                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 734                "Filters results to only include connections belonging to organizations "
 735                "in the specified tier. Use 'ALL' to include all tiers."
 736            ),
 737        ),
 738    ] = "TIER_2",
 739) -> list[dict[str, Any]]:
 740    """List failed sync attempts for ALL actors using a source connector type.
 741
 742    This tool finds all actors with the given connector definition and returns their
 743    failed sync attempts, regardless of whether they have explicit version pins.
 744
 745    Results are always enriched with customer_tier and is_eu fields.
 746    The customer_tier_filter parameter is required to ensure tier-aware querying.
 747
 748    This is useful for investigating connector issues across all users. Use this when
 749    you want to find failures for a connector type regardless of which version users
 750    are on.
 751
 752    Note: This tool only supports SOURCE connectors. For destination connectors,
 753    a separate tool would be needed.
 754
 755    Key fields in results:
 756    - failure_summary: JSON containing failure details including failureType and messages
 757    - customer_tier: TIER_0, TIER_1, or TIER_2
 758    - is_eu: Whether the workspace is in the EU region
 759    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 760    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 761    """
 762    # Validate that exactly one of the two parameters is provided
 763    if (source_definition_id is None) == (source_canonical_name is None):
 764        raise PyAirbyteInputError(
 765            message=(
 766                "Exactly one of source_definition_id or source_canonical_name "
 767                "must be provided, but not both."
 768            ),
 769        )
 770
 771    # Resolve canonical name to definition ID if needed
 772    resolved_definition_id: str
 773    if source_canonical_name:
 774        resolved_definition_id = _resolve_canonical_name_to_definition_id(
 775            canonical_name=source_canonical_name,
 776        )
 777    else:
 778        resolved_definition_id = source_definition_id  # type: ignore[assignment]
 779
 780    # Resolve organization ID alias
 781    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 782
 783    rows = query_failed_sync_attempts_for_connector(
 784        connector_definition_id=resolved_definition_id,
 785        organization_id=resolved_organization_id,
 786        days=days,
 787        limit=limit,
 788    )
 789    enriched = enrich_rows_by_org(rows)
 790    return filter_rows_by_tier(enriched, customer_tier_filter)
 791
 792
 793@mcp_tool(
 794    read_only=True,
 795    idempotent=True,
 796    open_world=True,
 797)
 798def query_prod_connections_by_connector(
 799    source_definition_id: Annotated[
 800        str | None,
 801        Field(
 802            description=(
 803                "Source connector definition ID (UUID) to search for. "
 804                "Exactly one of source_definition_id, source_canonical_name, "
 805                "destination_definition_id, or destination_canonical_name is required. "
 806                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 807            ),
 808            default=None,
 809        ),
 810    ] = None,
 811    source_canonical_name: Annotated[
 812        str | None,
 813        Field(
 814            description=(
 815                "Canonical source connector name to search for. "
 816                "Exactly one of source_definition_id, source_canonical_name, "
 817                "destination_definition_id, or destination_canonical_name is required. "
 818                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 819            ),
 820            default=None,
 821        ),
 822    ] = None,
 823    destination_definition_id: Annotated[
 824        str | None,
 825        Field(
 826            description=(
 827                "Destination connector definition ID (UUID) to search for. "
 828                "Exactly one of source_definition_id, source_canonical_name, "
 829                "destination_definition_id, or destination_canonical_name is required. "
 830                "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
 831            ),
 832            default=None,
 833        ),
 834    ] = None,
 835    destination_canonical_name: Annotated[
 836        str | None,
 837        Field(
 838            description=(
 839                "Canonical destination connector name to search for. "
 840                "Exactly one of source_definition_id, source_canonical_name, "
 841                "destination_definition_id, or destination_canonical_name is required. "
 842                "Examples: 'destination-duckdb', 'DuckDB'."
 843            ),
 844            default=None,
 845        ),
 846    ] = None,
 847    organization_id: Annotated[
 848        str | OrganizationAliasEnum | None,
 849        Field(
 850            description=(
 851                "Optional organization ID (UUID) or alias to filter results. "
 852                "If provided, only connections in this organization will be returned. "
 853                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 854            ),
 855            default=None,
 856        ),
 857    ] = None,
 858    limit: Annotated[
 859        int,
 860        Field(description="Maximum number of results (default: 1000)", default=1000),
 861    ] = 1000,
 862    customer_tier_filter: Annotated[
 863        TierFilter,
 864        Field(
 865            description=(
 866                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 867                "Filters results to only include connections belonging to organizations "
 868                "in the specified tier. Use 'ALL' to include all tiers."
 869            ),
 870        ),
 871    ] = "TIER_2",
 872    *,
 873    exclude_pinned: Annotated[
 874        bool,
 875        Field(
 876            description=(
 877                "If True, exclude connections whose connector is already pinned to a "
 878                "specific version (at any scope level: actor, workspace, or organization). "
 879                "Useful for 'prove fix' workflows where you want to find unpinned "
 880                "connections for live testing. Default: False (include all connections)."
 881            ),
 882            default=False,
 883        ),
 884    ],
 885) -> list[dict[str, Any]]:
 886    """Search for all connections using a specific source or destination connector type.
 887
 888    This tool queries the Airbyte Cloud Prod DB Replica directly for fast results.
 889    It finds all connections where the source or destination connector matches the
 890    specified type, regardless of how the connector is named by users.
 891
 892    Results are always enriched with customer_tier and is_eu fields.
 893    The customer_tier_filter parameter is required to ensure tier-aware querying.
 894
 895    Optionally filter by organization_id to limit results to a specific organization.
 896    Use '@airbyte-internal' as an alias for the Airbyte internal organization.
 897
 898    Set exclude_pinned=True to filter out connections that are already pinned to a
 899    specific version. This is useful for 'prove fix' live connection testing workflows
 900    where you want to find unpinned connections to test against.
 901
 902    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
 903    For source queries, returns: connection_id, connection_name, connection_url, source_id,
 904    source_name, source_definition_id, workspace_id, workspace_name, organization_id,
 905    dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id,
 906    pin_scope_type, customer_tier, is_eu.
 907    For destination queries, returns: connection_id, connection_name, connection_url,
 908    destination_id, destination_name, destination_definition_id, workspace_id,
 909    workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type,
 910    pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
 911
 912    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 913    level the effective pin came from (NULL if not pinned).
 914    """
 915    # Validate that exactly one of the four connector parameters is provided
 916    provided_params = [
 917        source_definition_id,
 918        source_canonical_name,
 919        destination_definition_id,
 920        destination_canonical_name,
 921    ]
 922    num_provided = sum(p is not None for p in provided_params)
 923    if num_provided != 1:
 924        raise PyAirbyteInputError(
 925            message=(
 926                "Exactly one of source_definition_id, source_canonical_name, "
 927                "destination_definition_id, or destination_canonical_name must be provided."
 928            ),
 929        )
 930
 931    # Determine if this is a source or destination query and resolve the definition ID
 932    is_source_query = (
 933        source_definition_id is not None or source_canonical_name is not None
 934    )
 935    resolved_definition_id: str
 936
 937    if source_canonical_name:
 938        resolved_definition_id = _resolve_canonical_name_to_definition_id(
 939            canonical_name=source_canonical_name,
 940        )
 941    elif source_definition_id:
 942        resolved_definition_id = source_definition_id
 943    elif destination_canonical_name:
 944        resolved_definition_id = _resolve_canonical_name_to_definition_id(
 945            canonical_name=destination_canonical_name,
 946        )
 947    else:
 948        resolved_definition_id = destination_definition_id  # type: ignore[assignment]
 949
 950    # Resolve organization ID alias
 951    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 952
 953    # Query the database based on connector type
 954    if is_source_query:
 955        rows = [
 956            {
 957                "organization_id": str(row.get("organization_id", "")),
 958                "workspace_id": str(row["workspace_id"]),
 959                "workspace_name": row.get("workspace_name", ""),
 960                "connection_id": str(row["connection_id"]),
 961                "connection_name": row.get("connection_name", ""),
 962                "connection_url": (
 963                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
 964                    f"/connections/{row['connection_id']}/status"
 965                ),
 966                "source_id": str(row["source_id"]),
 967                "source_name": row.get("source_name", ""),
 968                "source_definition_id": str(row["source_definition_id"]),
 969                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
 970                "dataplane_name": row.get("dataplane_name", ""),
 971                "pin_origin_type": row.get("pin_origin_type"),
 972                "pin_origin": row.get("pin_origin"),
 973                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
 974                "pin_scope_type": row.get("pin_scope_type"),
 975            }
 976            for row in query_connections_by_connector(
 977                connector_definition_id=resolved_definition_id,
 978                organization_id=resolved_organization_id,
 979                limit=limit,
 980                exclude_pinned=exclude_pinned,
 981            )
 982        ]
 983    else:
 984        # Destination query
 985        rows = [
 986            {
 987                "organization_id": str(row.get("organization_id", "")),
 988                "workspace_id": str(row["workspace_id"]),
 989                "workspace_name": row.get("workspace_name", ""),
 990                "connection_id": str(row["connection_id"]),
 991                "connection_name": row.get("connection_name", ""),
 992                "connection_url": (
 993                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
 994                    f"/connections/{row['connection_id']}/status"
 995                ),
 996                "destination_id": str(row["destination_id"]),
 997                "destination_name": row.get("destination_name", ""),
 998                "destination_definition_id": str(row["destination_definition_id"]),
 999                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1000                "dataplane_name": row.get("dataplane_name", ""),
1001                "pin_origin_type": row.get("pin_origin_type"),
1002                "pin_origin": row.get("pin_origin"),
1003                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
1004                "pin_scope_type": row.get("pin_scope_type"),
1005            }
1006            for row in query_connections_by_destination_connector(
1007                connector_definition_id=resolved_definition_id,
1008                organization_id=resolved_organization_id,
1009                limit=limit,
1010                exclude_pinned=exclude_pinned,
1011            )
1012        ]
1013
1014    enriched = enrich_rows_by_org(rows)
1015    return filter_rows_by_tier(enriched, customer_tier_filter)
1016
1017
1018@mcp_tool(
1019    read_only=True,
1020    idempotent=True,
1021    open_world=True,
1022)
1023def query_prod_connections_by_stream(
1024    stream_name: Annotated[
1025        str,
1026        Field(
1027            description=(
1028                "Name of the stream to search for in connection catalogs. "
1029                "This must match the exact stream name as configured in the connection. "
1030                "Examples: 'global_exclusions', 'campaigns', 'users'."
1031            ),
1032        ),
1033    ],
1034    source_definition_id: Annotated[
1035        str | None,
1036        Field(
1037            description=(
1038                "Source connector definition ID (UUID) to search for. "
1039                "Provide this OR source_canonical_name (exactly one required). "
1040                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
1041            ),
1042            default=None,
1043        ),
1044    ],
1045    source_canonical_name: Annotated[
1046        str | None,
1047        Field(
1048            description=(
1049                "Canonical source connector name to search for. "
1050                "Provide this OR source_definition_id (exactly one required). "
1051                "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
1052            ),
1053            default=None,
1054        ),
1055    ],
1056    organization_id: Annotated[
1057        str | OrganizationAliasEnum | None,
1058        Field(
1059            description=(
1060                "Optional organization ID (UUID) or alias to filter results. "
1061                "If provided, only connections in this organization will be returned. "
1062                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
1063            ),
1064            default=None,
1065        ),
1066    ],
1067    limit: Annotated[
1068        int,
1069        Field(description="Maximum number of results (default: 100)", default=100),
1070    ],
1071    customer_tier_filter: Annotated[
1072        TierFilter,
1073        Field(
1074            description=(
1075                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
1076                "Filters results to only include connections belonging to organizations "
1077                "in the specified tier. Use 'ALL' to include all tiers."
1078            ),
1079        ),
1080    ] = "TIER_2",
1081) -> list[dict[str, Any]]:
1082    """Find connections that have a specific stream enabled in their catalog.
1083
1084    This tool searches the connection's configured catalog (JSONB) for streams
1085    matching the specified name. It's particularly useful when validating
1086    connector fixes that affect specific streams - you can quickly find
1087    customer connections that use the affected stream.
1088
1089    Results are always enriched with customer_tier and is_eu fields.
1090    The customer_tier_filter parameter is required to ensure tier-aware querying.
1091
1092    Use cases:
1093    - Finding connections with a specific stream enabled for regression testing
1094    - Validating connector fixes that affect particular streams
1095    - Identifying which customers use rarely-enabled streams
1096
1097    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
1098    """
1099    provided_params = [source_definition_id, source_canonical_name]
1100    num_provided = sum(p is not None for p in provided_params)
1101    if num_provided != 1:
1102        raise PyAirbyteInputError(
1103            message=(
1104                "Exactly one of source_definition_id or source_canonical_name "
1105                "must be provided."
1106            ),
1107        )
1108
1109    resolved_definition_id: str
1110    if source_canonical_name:
1111        resolved_definition_id = _resolve_canonical_name_to_definition_id(
1112            canonical_name=source_canonical_name,
1113        )
1114    else:
1115        assert source_definition_id is not None
1116        resolved_definition_id = source_definition_id
1117
1118    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1119
1120    rows = [
1121        {
1122            "organization_id": str(row.get("organization_id", "")),
1123            "workspace_id": str(row["workspace_id"]),
1124            "workspace_name": row.get("workspace_name", ""),
1125            "connection_id": str(row["connection_id"]),
1126            "connection_name": row.get("connection_name", ""),
1127            "connection_status": row.get("connection_status", ""),
1128            "connection_url": (
1129                f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
1130                f"/connections/{row['connection_id']}/status"
1131            ),
1132            "source_id": str(row["source_id"]),
1133            "source_name": row.get("source_name", ""),
1134            "source_definition_id": str(row["source_definition_id"]),
1135            "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1136            "dataplane_name": row.get("dataplane_name", ""),
1137        }
1138        for row in query_connections_by_stream(
1139            connector_definition_id=resolved_definition_id,
1140            stream_name=stream_name,
1141            organization_id=resolved_organization_id,
1142            limit=limit,
1143        )
1144    ]
1145    enriched = enrich_rows_by_org(rows)
1146    return filter_rows_by_tier(enriched, customer_tier_filter)
1147
1148
1149@mcp_tool(
1150    read_only=True,
1151    idempotent=True,
1152)
1153def query_prod_workspaces_by_email_domain(
1154    email_domain: Annotated[
1155        str,
1156        Field(
1157            description=(
1158                "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). "
1159                "Do not include the '@' symbol. This will find workspaces where users "
1160                "have email addresses with this domain."
1161            ),
1162        ),
1163    ],
1164    limit: Annotated[
1165        int,
1166        Field(
1167            description="Maximum number of workspaces to return (default: 100)",
1168            default=100,
1169        ),
1170    ] = 100,
1171) -> WorkspacesByEmailDomainResult:
1172    """Find workspaces by email domain.
1173
1174    This tool searches for workspaces where users have email addresses matching
1175    the specified domain. This is useful for identifying workspaces belonging to
1176    specific companies - for example, searching for "motherduck.com" will find
1177    workspaces belonging to MotherDuck employees.
1178
1179    Use cases:
1180    - Finding partner organization connections for testing connector fixes
1181    - Identifying internal test accounts for specific integrations
1182    - Locating workspaces belonging to technology partners
1183
1184    The returned organization IDs can be used with other tools like
1185    `query_prod_connections_by_connector` to find connections within
1186    those organizations for safe testing.
1187    """
1188    # Strip leading @ if provided
1189    clean_domain = email_domain.lstrip("@")
1190
1191    # Query the database
1192    rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit)
1193
1194    # Convert rows to Pydantic models
1195    workspaces = [
1196        WorkspaceInfo(
1197            organization_id=str(row["organization_id"]),
1198            workspace_id=str(row["workspace_id"]),
1199            workspace_name=row.get("workspace_name", ""),
1200            slug=row.get("slug"),
1201            email=row.get("email"),
1202            dataplane_group_id=_opt_str(row.get("dataplane_group_id")),
1203            dataplane_name=row.get("dataplane_name"),
1204            created_at=row.get("created_at"),
1205        )
1206        for row in rows
1207    ]
1208
1209    # Enrich with tier annotation (annotation only, no filtering)
1210    unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces))
1211    tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)}
1212    for ws in workspaces:
1213        tier_result = tier_results.get(ws.organization_id)
1214        if tier_result:
1215            ws.customer_tier = tier_result.customer_tier
1216        ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False
1217
1218    return WorkspacesByEmailDomainResult(
1219        email_domain=clean_domain,
1220        total_workspaces_found=len(workspaces),
1221        unique_organization_ids=unique_org_ids,
1222        workspaces=workspaces,
1223    )
1224
1225
1226def _build_connector_stats(
1227    connector_definition_id: str,
1228    connector_type: str,
1229    canonical_name: str | None,
1230    rows: list[dict[str, Any]],
1231    version_tags: dict[str, str | None],
1232) -> ConnectorConnectionStats:
1233    """Build ConnectorConnectionStats from query result rows."""
1234    # Aggregate totals across all version groups
1235    total_connections = 0
1236    enabled_connections = 0
1237    active_connections = 0
1238    pinned_connections = 0
1239    unpinned_connections = 0
1240    total_succeeded = 0
1241    total_failed = 0
1242    total_cancelled = 0
1243    total_running = 0
1244    total_unknown = 0
1245
1246    by_version: list[VersionPinStats] = []
1247
1248    for row in rows:
1249        version_id = row.get("pinned_version_id")
1250        row_total = int(row.get("total_connections", 0))
1251        row_enabled = int(row.get("enabled_connections", 0))
1252        row_active = int(row.get("active_connections", 0))
1253        row_pinned = int(row.get("pinned_connections", 0))
1254        row_unpinned = int(row.get("unpinned_connections", 0))
1255        row_succeeded = int(row.get("succeeded_connections", 0))
1256        row_failed = int(row.get("failed_connections", 0))
1257        row_cancelled = int(row.get("cancelled_connections", 0))
1258        row_running = int(row.get("running_connections", 0))
1259        row_unknown = int(row.get("unknown_connections", 0))
1260
1261        total_connections += row_total
1262        enabled_connections += row_enabled
1263        active_connections += row_active
1264        pinned_connections += row_pinned
1265        unpinned_connections += row_unpinned
1266        total_succeeded += row_succeeded
1267        total_failed += row_failed
1268        total_cancelled += row_cancelled
1269        total_running += row_running
1270        total_unknown += row_unknown
1271
1272        by_version.append(
1273            VersionPinStats(
1274                pinned_version_id=str(version_id) if version_id else None,
1275                docker_image_tag=version_tags.get(str(version_id))
1276                if version_id
1277                else None,
1278                total_connections=row_total,
1279                enabled_connections=row_enabled,
1280                active_connections=row_active,
1281                latest_attempt=LatestAttemptBreakdown(
1282                    succeeded=row_succeeded,
1283                    failed=row_failed,
1284                    cancelled=row_cancelled,
1285                    running=row_running,
1286                    unknown=row_unknown,
1287                ),
1288            )
1289        )
1290
1291    return ConnectorConnectionStats(
1292        connector_definition_id=connector_definition_id,
1293        connector_type=connector_type,
1294        canonical_name=canonical_name,
1295        total_connections=total_connections,
1296        enabled_connections=enabled_connections,
1297        active_connections=active_connections,
1298        pinned_connections=pinned_connections,
1299        unpinned_connections=unpinned_connections,
1300        latest_attempt=LatestAttemptBreakdown(
1301            succeeded=total_succeeded,
1302            failed=total_failed,
1303            cancelled=total_cancelled,
1304            running=total_running,
1305            unknown=total_unknown,
1306        ),
1307        by_version=by_version,
1308    )
1309
1310
1311@mcp_tool(
1312    read_only=True,
1313    idempotent=True,
1314    open_world=True,
1315)
1316def query_prod_connector_connection_stats(
1317    source_definition_ids: Annotated[
1318        list[str] | None,
1319        Field(
1320            description=(
1321                "List of source connector definition IDs (UUIDs) to get stats for. "
1322                "Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
1323            ),
1324            default=None,
1325        ),
1326    ] = None,
1327    destination_definition_ids: Annotated[
1328        list[str] | None,
1329        Field(
1330            description=(
1331                "List of destination connector definition IDs (UUIDs) to get stats for. "
1332                "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
1333            ),
1334            default=None,
1335        ),
1336    ] = None,
1337    active_within_days: Annotated[
1338        int,
1339        Field(
1340            description=(
1341                "Number of days to look back for 'active' connections (default: 7). "
1342                "Connections with sync activity within this window are counted as active."
1343            ),
1344            default=7,
1345        ),
1346    ] = 7,
1347) -> ConnectorConnectionStatsResponse:
1348    """Get aggregate connection stats for multiple connectors.
1349
1350    Returns counts of connections grouped by pinned version for each connector,
1351    including:
1352    - Total, enabled, and active connection counts
1353    - Pinned vs unpinned breakdown
1354    - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
1355
1356    This tool is designed for release monitoring workflows. It allows you to:
1357    1. Query recently released connectors to identify which ones to monitor
1358    2. Get aggregate stats showing how many connections are using each version
1359    3. See health metrics (pass/fail) broken down by version
1360
1361    The 'active_within_days' parameter controls the lookback window for:
1362    - Counting 'active' connections (those with recent sync activity)
1363    - Determining 'latest attempt status' (most recent attempt within the window)
1364
1365    Connections with no sync activity in the lookback window will have
1366    'unknown' status in the latest_attempt breakdown.
1367    """
1368    # Initialize empty lists if None
1369    source_ids = source_definition_ids or []
1370    destination_ids = destination_definition_ids or []
1371
1372    if not source_ids and not destination_ids:
1373        raise PyAirbyteInputError(
1374            message=(
1375                "At least one of source_definition_ids or destination_definition_ids "
1376                "must be provided."
1377            ),
1378        )
1379
1380    sources: list[ConnectorConnectionStats] = []
1381    destinations: list[ConnectorConnectionStats] = []
1382
1383    # Process source connectors
1384    for source_def_id in source_ids:
1385        # Get version info for tag lookup
1386        versions = query_connector_versions(source_def_id)
1387        version_tags = {
1388            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1389        }
1390
1391        # Get aggregate stats
1392        rows = query_source_connection_stats(source_def_id, days=active_within_days)
1393
1394        sources.append(
1395            _build_connector_stats(
1396                connector_definition_id=source_def_id,
1397                connector_type="source",
1398                canonical_name=None,
1399                rows=rows,
1400                version_tags=version_tags,
1401            )
1402        )
1403
1404    # Process destination connectors
1405    for dest_def_id in destination_ids:
1406        # Get version info for tag lookup
1407        versions = query_connector_versions(dest_def_id)
1408        version_tags = {
1409            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1410        }
1411
1412        # Get aggregate stats
1413        rows = query_destination_connection_stats(dest_def_id, days=active_within_days)
1414
1415        destinations.append(
1416            _build_connector_stats(
1417                connector_definition_id=dest_def_id,
1418                connector_type="destination",
1419                canonical_name=None,
1420                rows=rows,
1421                version_tags=version_tags,
1422            )
1423        )
1424
1425    return ConnectorConnectionStatsResponse(
1426        sources=sources,
1427        destinations=destinations,
1428        active_within_days=active_within_days,
1429        generated_at=datetime.now(timezone.utc),
1430    )
1431
1432
1433# =============================================================================
1434# Connector Rollout Models and Tools
1435# =============================================================================
1436
1437
1438class ConnectorRolloutInfo(BaseModel):
1439    """Information about a connector rollout."""
1440
1441    rollout_id: str = Field(description="The rollout UUID")
1442    actor_definition_id: str = Field(description="The connector definition UUID")
1443    state: str = Field(
1444        description="Rollout state: initialized, workflow_started, in_progress, "
1445        "paused, finalizing, succeeded, errored, failed_rolled_back, canceled"
1446    )
1447    initial_rollout_pct: int | None = Field(
1448        default=None, description="Initial rollout percentage"
1449    )
1450    current_target_rollout_pct: int | None = Field(
1451        default=None, description="Current target rollout percentage"
1452    )
1453    final_target_rollout_pct: int | None = Field(
1454        default=None, description="Final target rollout percentage"
1455    )
1456    has_breaking_changes: bool = Field(
1457        description="Whether the RC has breaking changes"
1458    )
1459    max_step_wait_time_mins: int | None = Field(
1460        default=None, description="Maximum wait time between rollout steps in minutes"
1461    )
1462    rollout_strategy: str | None = Field(
1463        default=None, description="Rollout strategy: manual, automated, overridden"
1464    )
1465    workflow_run_id: str | None = Field(
1466        default=None, description="Temporal workflow run ID"
1467    )
1468    error_msg: str | None = Field(default=None, description="Error message if errored")
1469    failed_reason: str | None = Field(
1470        default=None, description="Reason for failure if failed"
1471    )
1472    paused_reason: str | None = Field(
1473        default=None, description="Reason for pause if paused"
1474    )
1475    tag: str | None = Field(default=None, description="Optional tag for the rollout")
1476    created_at: datetime | None = Field(
1477        default=None, description="When the rollout was created"
1478    )
1479    updated_at: datetime | None = Field(
1480        default=None, description="When the rollout was last updated"
1481    )
1482    completed_at: datetime | None = Field(
1483        default=None, description="When the rollout completed (if terminal)"
1484    )
1485    expires_at: datetime | None = Field(
1486        default=None, description="When the rollout expires"
1487    )
1488    rc_docker_image_tag: str | None = Field(
1489        default=None, description="Docker image tag of the release candidate"
1490    )
1491    rc_docker_repository: str | None = Field(
1492        default=None, description="Docker repository of the release candidate"
1493    )
1494    initial_docker_image_tag: str | None = Field(
1495        default=None, description="Docker image tag of the initial version"
1496    )
1497    initial_docker_repository: str | None = Field(
1498        default=None, description="Docker repository of the initial version"
1499    )
1500    filters: dict[str, Any] | None = Field(
1501        default=None,
1502        description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})",
1503    )
1504    customer_tier: str | None = Field(
1505        default=None,
1506        description="Customer tier targeted by this rollout (extracted from filters), "
1507        "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.",
1508    )
1509
1510
1511def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None:
1512    """Parse the rollout filters field from a database row.
1513
1514    The filters field may be a JSON string, a dict, or None.
1515    """
1516    if filters_raw is None:
1517        return None
1518    if isinstance(filters_raw, dict):
1519        return filters_raw
1520    if isinstance(filters_raw, str):
1521        try:
1522            parsed = json.loads(filters_raw)
1523            if isinstance(parsed, dict):
1524                return parsed
1525        except (json.JSONDecodeError, TypeError):
1526            pass
1527    return None
1528
1529
1530def _extract_tier_from_filters(filters_raw: Any) -> str | None:
1531    """Extract customer tier from rollout filters JSON.
1532
1533    Supports two formats:
1534    - Legacy: `{"tierFilter": {"tier": "TIER_0"}}`
1535    - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}`
1536    """
1537    parsed = _parse_rollout_filters(filters_raw)
1538    if parsed is None:
1539        return None
1540
1541    # Current format: customerTierFilters list
1542    tier_filters = parsed.get("customerTierFilters")
1543    if isinstance(tier_filters, list):
1544        for entry in tier_filters:
1545            if isinstance(entry, dict) and entry.get("name") == "TIER":
1546                values = entry.get("value")
1547                if isinstance(values, list) and len(values) == 1:
1548                    return str(values[0])
1549                if isinstance(values, list) and len(values) > 1:
1550                    return ", ".join(str(v) for v in values)
1551
1552    # Legacy format: tierFilter dict
1553    tier_filter = parsed.get("tierFilter")
1554    if isinstance(tier_filter, dict):
1555        tier = tier_filter.get("tier")
1556        if isinstance(tier, str):
1557            return tier
1558
1559    return None
1560
1561
1562def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo:
1563    """Convert a database row to a ConnectorRolloutInfo model."""
1564    return ConnectorRolloutInfo(
1565        rollout_id=str(row["rollout_id"]),
1566        actor_definition_id=str(row["actor_definition_id"]),
1567        state=row["state"],
1568        initial_rollout_pct=row.get("initial_rollout_pct"),
1569        current_target_rollout_pct=row.get("current_target_rollout_pct"),
1570        final_target_rollout_pct=row.get("final_target_rollout_pct"),
1571        has_breaking_changes=row["has_breaking_changes"],
1572        max_step_wait_time_mins=row.get("max_step_wait_time_mins"),
1573        rollout_strategy=row.get("rollout_strategy"),
1574        workflow_run_id=row.get("workflow_run_id"),
1575        error_msg=row.get("error_msg"),
1576        failed_reason=row.get("failed_reason"),
1577        paused_reason=row.get("paused_reason"),
1578        tag=row.get("tag"),
1579        created_at=row.get("created_at"),
1580        updated_at=row.get("updated_at"),
1581        completed_at=row.get("completed_at"),
1582        expires_at=row.get("expires_at"),
1583        rc_docker_image_tag=row.get("rc_docker_image_tag"),
1584        rc_docker_repository=row.get("rc_docker_repository"),
1585        initial_docker_image_tag=row.get("initial_docker_image_tag"),
1586        initial_docker_repository=row.get("initial_docker_repository"),
1587        filters=_parse_rollout_filters(row.get("filters")),
1588        customer_tier=_extract_tier_from_filters(row.get("filters")),
1589    )
1590
1591
1592@mcp_tool(
1593    read_only=True,
1594    idempotent=True,
1595)
1596def query_prod_connector_rollouts(
1597    actor_definition_id: Annotated[
1598        str | None,
1599        Field(description="Connector definition UUID to filter by (optional)"),
1600    ] = None,
1601    rollout_id: Annotated[
1602        str | None,
1603        Field(description="Specific rollout UUID to look up (optional)"),
1604    ] = None,
1605    active_only: Annotated[
1606        bool,
1607        Field(description="If true, only return active (non-terminal) rollouts"),
1608    ] = False,
1609    limit: Annotated[
1610        int,
1611        Field(description="Maximum number of results (default: 100)"),
1612    ] = 100,
1613) -> list[ConnectorRolloutInfo]:
1614    """Query connector rollouts with flexible filtering.
1615
1616    Returns rollouts based on the provided filters. If no filters are specified,
1617    returns all active rollouts. Useful for monitoring rollout status and history.
1618
1619    Filter behavior:
1620    - rollout_id: Returns that specific rollout (ignores other filters)
1621    - active_only: Returns only active (non-terminal) rollouts
1622    - actor_definition_id: Returns rollouts for that specific connector
1623    - No filters: Returns all active rollouts (same as active_only=True)
1624    """
1625    rows = query_connector_rollouts(
1626        actor_definition_id=actor_definition_id,
1627        rollout_id=rollout_id,
1628        active_only=active_only,
1629        limit=limit,
1630    )
1631    return [_row_to_connector_rollout_info(row) for row in rows]
1632
1633
1634def register_prod_db_query_tools(app: FastMCP) -> None:
1635    """Register prod DB query tools with the FastMCP app."""
1636    register_mcp_tools(app, mcp_module=__name__)