airbyte_ops_mcp.mcp.prod_db_queries

MCP tools for querying the Airbyte Cloud Prod DB Replica.

This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.

MCP reference

MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 13 tool(s), 0 prompt(s), 0 resource(s).

Tools (13)

query_prod_actors_by_pinned_connector_version

Hints: read-only · idempotent

List actors (sources/destinations) effectively pinned to a specific connector version.

Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find pinned instances for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find pinned instances for",
      "type": "string"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_connector

Hints: read-only · idempotent · open-world

Search for all connections using a specific source or destination connector type.

This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.

Set exclude_pinned=True to filter out connections that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 1000 Maximum number of results (default: 1000)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 1000,
      "description": "Maximum number of results (default: 1000)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_stream

Hints: read-only · idempotent · open-world

Find connections that have a specific stream enabled in their catalog.

This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use cases:

  • Finding connections with a specific stream enabled for regression testing
  • Validating connector fixes that affect particular streams
  • Identifying which customers use rarely-enabled streams

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.

Parameters:

Name Type Required Default Description
stream_name string yes Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "stream_name": {
      "description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
      "type": "string"
    },
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_connection_stats

Hints: read-only · idempotent · open-world

Get aggregate connection stats for multiple connectors.

Returns counts of connections grouped by pinned version for each connector, including:

  • Total, enabled, and active connection counts
  • Pinned vs unpinned breakdown
  • Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)

This tool is designed for release monitoring workflows. It allows you to:

  1. Query recently released connectors to identify which ones to monitor
  2. Get aggregate stats showing how many connections are using each version
  3. See health metrics (pass/fail) broken down by version

The lookback_days parameter controls the lookback window for:

  • Counting 'active' connections (those with recent sync activity)
  • Determining 'latest attempt status' (most recent attempt within the window)

Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.

Parameters:

Name Type Required Default Description
source_definition_ids array<string> | null no null List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']
destination_definition_ids array<string> | null no null List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']
lookback_days integer no 7 Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
    },
    "destination_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Response containing connection stats for multiple connectors.",
  "properties": {
    "sources": {
      "description": "Stats for source connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "destinations": {
      "description": "Stats for destination connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "lookback_days": {
      "description": "Lookback window used for 'active' connections",
      "type": "integer"
    },
    "generated_at": {
      "description": "When this response was generated",
      "format": "date-time",
      "type": "string"
    }
  },
  "required": [
    "lookback_days",
    "generated_at"
  ],
  "type": "object"
}

query_prod_connector_rollouts

Hints: read-only · idempotent

Query connector rollouts with flexible filtering.

Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.

Filter behavior:

  • rollout_id: Returns that specific rollout (ignores other filters)
  • active_only: Returns only active (non-terminal) rollouts
  • actor_definition_id: Returns rollouts for that specific connector
  • No filters: Returns all active rollouts (same as active_only=True)

Parameters:

Name Type Required Default Description
actor_definition_id string | null no null Connector definition UUID to filter by (optional)
rollout_id string | null no null Specific rollout UUID to look up (optional)
active_only boolean no false If true, only return active (non-terminal) rollouts
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "actor_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector definition UUID to filter by (optional)"
    },
    "rollout_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Specific rollout UUID to look up (optional)"
    },
    "active_only": {
      "default": false,
      "description": "If true, only return active (non-terminal) rollouts",
      "type": "boolean"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a connector rollout.",
        "properties": {
          "rollout_id": {
            "description": "The rollout UUID",
            "type": "string"
          },
          "actor_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "state": {
            "description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
            "type": "string"
          },
          "initial_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Initial rollout percentage"
          },
          "current_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Current target rollout percentage"
          },
          "final_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Final target rollout percentage"
          },
          "has_breaking_changes": {
            "description": "Whether the RC has breaking changes",
            "type": "boolean"
          },
          "max_step_wait_time_mins": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Maximum wait time between rollout steps in minutes"
          },
          "rollout_strategy": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Rollout strategy: manual, automated, overridden"
          },
          "updated_by_user_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "User ID recorded as last updating the rollout"
          },
          "updated_by_user_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Name recorded as last updating the rollout"
          },
          "updated_by_user_email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Email recorded as last updating the rollout"
          },
          "workflow_run_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Temporal workflow run ID"
          },
          "error_msg": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Error message if errored"
          },
          "failed_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for failure if failed"
          },
          "paused_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for pause if paused"
          },
          "tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Optional tag for the rollout"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was created"
          },
          "updated_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was last updated"
          },
          "completed_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout completed (if terminal)"
          },
          "expires_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout expires"
          },
          "rc_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the release candidate"
          },
          "rc_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the release candidate"
          },
          "initial_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the initial version"
          },
          "initial_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the initial version"
          },
          "filters": {
            "anyOf": [
              {
                "additionalProperties": true,
                "type": "object"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
          }
        },
        "required": [
          "rollout_id",
          "actor_definition_id",
          "state",
          "has_breaking_changes"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_versions

Hints: read-only · idempotent

List all versions for a connector definition.

Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.

Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date

Parameters:

Name Type Required Default Description
connector_definition_id string yes Connector definition UUID to list versions for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_definition_id": {
      "description": "Connector definition UUID to list versions for",
      "type": "string"
    }
  },
  "required": [
    "connector_definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_dataplanes

Hints: read-only · idempotent

List all dataplane groups with workspace counts.

Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).

Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count

Parameters:

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_failed_sync_attempts_for_connector

Hints: read-only · idempotent · open-world

List failed sync attempts for ALL actors using a source connector type.

This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.

Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.

Key fields in results:

  • failure_summary: JSON containing failure details including failureType and messages
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_new_connector_releases

Hints: read-only · idempotent

List recently published connector versions.

Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).

Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at

Parameters:

Name Type Required Default Description
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_connector

Hints: read-only · idempotent · open-world

List recent sync jobs for ALL actors using a connector type.

This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use this tool to:

  • Find healthy connections with recent successful syncs (status_filter='succeeded')
  • Investigate connector issues across all users (status_filter='failed')
  • Get an overview of all recent sync activity (status_filter='all')

Set exclude_pinned=True to filter out syncs for actors that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.

Key fields in results:

  • job_status: 'succeeded', 'failed', 'cancelled', etc.
  • connection_id, connection_name: The connection that ran the sync
  • actor_id, actor_name: The source or destination actor
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'.
status_filter enum("all", "succeeded", "failed") no "all" Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
    },
    "status_filter": {
      "description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
      "enum": [
        "all",
        "succeeded",
        "failed"
      ],
      "type": "string",
      "default": "all"
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_version_pinned_connector

Hints: read-only · idempotent

List sync job results for actors PINNED to a specific connector version.

IMPORTANT: This tool ONLY returns results for actors that are effectively pinned to the specified version via scoped_configuration at any scope level (actor, workspace, or organization). Most connections run unpinned and will NOT appear in these results.

Use this tool when you want to monitor rollout health for actors that have been pinned to a pre-release or specific version. For finding healthy connections across ALL actors using a connector type (regardless of pinning), use query_prod_recent_syncs_for_connector instead.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: job_id, connection_id, job_status, started_at, job_updated_at, connection_name, actor_id, actor_name, connector_definition_id, pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find sync results for
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
successful_only boolean no false If True, only return successful syncs (default: False)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find sync results for",
      "type": "string"
    },
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "successful_only": {
      "default": false,
      "description": "If True, only return successful syncs (default: False)",
      "type": "boolean"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspace_info

Hints: read-only · idempotent

Get workspace information including dataplane group.

Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.

Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    }
  },
  "required": [
    "workspace_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspaces_by_email_domain

Hints: read-only · idempotent

Find workspaces by email domain.

This tool searches for workspaces where users have email addresses matching the specified domain. This is useful for identifying workspaces belonging to specific companies - for example, searching for "motherduck.com" will find workspaces belonging to MotherDuck employees.

Use cases:

  • Finding partner organization connections for testing connector fixes
  • Identifying internal test accounts for specific integrations
  • Locating workspaces belonging to technology partners

The returned organization IDs can be used with other tools like query_prod_connections_by_connector to find connections within those organizations for safe testing.

Parameters:

Name Type Required Default Description
email_domain string yes Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.
limit integer no 100 Maximum number of workspaces to return (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "email_domain": {
      "description": "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). Do not include the '@' symbol. This will find workspaces where users have email addresses with this domain.",
      "type": "string"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of workspaces to return (default: 100)",
      "type": "integer"
    }
  },
  "required": [
    "email_domain"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of looking up workspaces by email domain.",
  "properties": {
    "email_domain": {
      "description": "The email domain that was searched for (e.g., 'motherduck.com')",
      "type": "string"
    },
    "total_workspaces_found": {
      "description": "Total number of workspaces matching the email domain",
      "type": "integer"
    },
    "unique_organization_ids": {
      "description": "List of unique organization IDs found",
      "items": {
        "type": "string"
      },
      "type": "array"
    },
    "workspaces": {
      "description": "List of workspaces matching the email domain",
      "items": {
        "description": "Information about a workspace found by email domain search.",
        "properties": {
          "organization_id": {
            "description": "The organization UUID",
            "type": "string"
          },
          "workspace_id": {
            "description": "The workspace UUID",
            "type": "string"
          },
          "workspace_name": {
            "description": "The name of the workspace",
            "type": "string"
          },
          "slug": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The workspace slug (URL-friendly identifier)"
          },
          "email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The email address associated with the workspace"
          },
          "dataplane_group_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The dataplane group UUID (region)"
          },
          "dataplane_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The name of the dataplane (e.g., 'US', 'EU')"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the workspace was created"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
          },
          "is_eu": {
            "anyOf": [
              {
                "type": "boolean"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Whether the workspace is in the EU region (derived from dataplane_name)."
          }
        },
        "required": [
          "organization_id",
          "workspace_id",
          "workspace_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "email_domain",
    "total_workspaces_found",
    "unique_organization_ids",
    "workspaces"
  ],
  "type": "object"
}

   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""MCP tools for querying the Airbyte Cloud Prod DB Replica.
   3
   4This module provides MCP tools that wrap the query functions from
   5airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
   6
   7## MCP reference
   8
   9.. include:: ../../../docs/mcp-generated/prod_db_queries.md
  10    :start-line: 2
  11"""
  12
  13from __future__ import annotations
  14
  15__all__: list[str] = []
  16
  17import json
  18from datetime import datetime, timezone
  19from enum import StrEnum
  20from typing import Annotated, Any
  21
  22from airbyte.exceptions import PyAirbyteInputError
  23from fastmcp import FastMCP
  24from fastmcp_extensions import mcp_tool, register_mcp_tools
  25from pydantic import BaseModel, Field
  26
  27from airbyte_ops_mcp.cloud_admin.registry_lookup import (
  28    resolve_canonical_name_to_definition_id,
  29)
  30from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum
  31from airbyte_ops_mcp.prod_db_access.queries import (
  32    query_actors_pinned_to_version,
  33    query_connections_by_connector,
  34    query_connections_by_destination_connector,
  35    query_connections_by_stream,
  36    query_connector_rollouts,
  37    query_connector_versions,
  38    query_dataplanes_list,
  39    query_destination_connection_stats,
  40    query_failed_sync_attempts_for_connector,
  41    query_new_connector_releases,
  42    query_recent_syncs_for_connector,
  43    query_source_connection_stats,
  44    query_syncs_for_version_pinned_connector,
  45    query_workspace_info,
  46    query_workspaces_by_email_domain,
  47)
  48from airbyte_ops_mcp.tier_cache import (
  49    TierFilter,
  50    enrich_rows_by_org,
  51    filter_rows_by_tier,
  52    get_org_tiers,
  53)
  54
  55
  56class StatusFilter(StrEnum):
  57    """Filter for job status in sync queries."""
  58
  59    ALL = "all"
  60    SUCCEEDED = "succeeded"
  61    FAILED = "failed"
  62
  63
  64# Cloud UI base URL for building connection URLs
  65CLOUD_UI_BASE_URL = "https://cloud.airbyte.com"
  66
  67
  68# =============================================================================
  69# Pydantic Models for MCP Tool Responses
  70# =============================================================================
  71
  72
  73class WorkspaceInfo(BaseModel):
  74    """Information about a workspace found by email domain search."""
  75
  76    organization_id: str = Field(description="The organization UUID")
  77    workspace_id: str = Field(description="The workspace UUID")
  78    workspace_name: str = Field(description="The name of the workspace")
  79    slug: str | None = Field(
  80        default=None, description="The workspace slug (URL-friendly identifier)"
  81    )
  82    email: str | None = Field(
  83        default=None, description="The email address associated with the workspace"
  84    )
  85    dataplane_group_id: str | None = Field(
  86        default=None, description="The dataplane group UUID (region)"
  87    )
  88    dataplane_name: str | None = Field(
  89        default=None, description="The name of the dataplane (e.g., 'US', 'EU')"
  90    )
  91    created_at: datetime | None = Field(
  92        default=None, description="When the workspace was created"
  93    )
  94    customer_tier: str | None = Field(
  95        default=None,
  96        description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.",
  97    )
  98    is_eu: bool | None = Field(
  99        default=None,
 100        description="Whether the workspace is in the EU region (derived from dataplane_name).",
 101    )
 102
 103
 104class WorkspacesByEmailDomainResult(BaseModel):
 105    """Result of looking up workspaces by email domain."""
 106
 107    email_domain: str = Field(
 108        description="The email domain that was searched for (e.g., 'motherduck.com')"
 109    )
 110    total_workspaces_found: int = Field(
 111        description="Total number of workspaces matching the email domain"
 112    )
 113    unique_organization_ids: list[str] = Field(
 114        description="List of unique organization IDs found"
 115    )
 116    workspaces: list[WorkspaceInfo] = Field(
 117        description="List of workspaces matching the email domain"
 118    )
 119
 120
 121class LatestAttemptBreakdown(BaseModel):
 122    """Breakdown of connections by latest attempt status."""
 123
 124    succeeded: int = Field(
 125        default=0, description="Connections where latest attempt succeeded"
 126    )
 127    failed: int = Field(
 128        default=0, description="Connections where latest attempt failed"
 129    )
 130    cancelled: int = Field(
 131        default=0, description="Connections where latest attempt was cancelled"
 132    )
 133    running: int = Field(
 134        default=0, description="Connections where latest attempt is still running"
 135    )
 136    unknown: int = Field(
 137        default=0,
 138        description="Connections with no recent attempts in the lookback window",
 139    )
 140
 141
 142class VersionPinStats(BaseModel):
 143    """Stats for connections pinned to a specific version."""
 144
 145    pinned_version_id: str | None = Field(
 146        description="The connector version UUID (None for unpinned connections)"
 147    )
 148    docker_image_tag: str | None = Field(
 149        default=None, description="The docker image tag for this version"
 150    )
 151    total_connections: int = Field(description="Total number of connections")
 152    enabled_connections: int = Field(
 153        description="Number of enabled (active status) connections"
 154    )
 155    active_connections: int = Field(
 156        description="Number of connections with recent sync activity"
 157    )
 158    latest_attempt: LatestAttemptBreakdown = Field(
 159        description="Breakdown by latest attempt status"
 160    )
 161
 162
 163class ConnectorConnectionStats(BaseModel):
 164    """Aggregate connection stats for a connector."""
 165
 166    connector_definition_id: str = Field(description="The connector definition UUID")
 167    connector_type: str = Field(description="'source' or 'destination'")
 168    canonical_name: str | None = Field(
 169        default=None, description="The canonical connector name if resolved"
 170    )
 171    total_connections: int = Field(
 172        description="Total number of non-deprecated connections"
 173    )
 174    enabled_connections: int = Field(
 175        description="Number of enabled (active status) connections"
 176    )
 177    active_connections: int = Field(
 178        description="Number of connections with recent sync activity"
 179    )
 180    pinned_connections: int = Field(
 181        description="Number of connections with explicit version pins"
 182    )
 183    unpinned_connections: int = Field(
 184        description="Number of connections on default version"
 185    )
 186    latest_attempt: LatestAttemptBreakdown = Field(
 187        description="Overall breakdown by latest attempt status"
 188    )
 189    by_version: list[VersionPinStats] = Field(
 190        description="Stats broken down by pinned version"
 191    )
 192
 193
 194class ConnectorConnectionStatsResponse(BaseModel):
 195    """Response containing connection stats for multiple connectors."""
 196
 197    sources: list[ConnectorConnectionStats] = Field(
 198        default_factory=list, description="Stats for source connectors"
 199    )
 200    destinations: list[ConnectorConnectionStats] = Field(
 201        default_factory=list, description="Stats for destination connectors"
 202    )
 203    lookback_days: int = Field(
 204        description="Lookback window used for 'active' connections"
 205    )
 206    generated_at: datetime = Field(description="When this response was generated")
 207
 208
 209def _opt_str(value: Any) -> str | None:
 210    """Convert a nullable value to str, returning None if the value is None/falsy."""
 211    return str(value) if value else None
 212
 213
 214@mcp_tool(
 215    read_only=True,
 216    idempotent=True,
 217)
 218def query_prod_dataplanes() -> list[dict[str, Any]]:
 219    """List all dataplane groups with workspace counts.
 220
 221    Returns information about all active dataplane groups in Airbyte Cloud,
 222    including the number of workspaces in each. Useful for understanding
 223    the distribution of workspaces across regions (US, US-Central, EU).
 224
 225    Returns list of dicts with keys: dataplane_group_id, dataplane_name,
 226    organization_id, enabled, tombstone, created_at, workspace_count
 227    """
 228    return query_dataplanes_list()
 229
 230
 231@mcp_tool(
 232    read_only=True,
 233    idempotent=True,
 234)
 235def query_prod_workspace_info(
 236    workspace_id: Annotated[
 237        str | WorkspaceAliasEnum,
 238        Field(
 239            description="Workspace UUID or alias to look up. "
 240            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 241        ),
 242    ],
 243) -> dict[str, Any] | None:
 244    """Get workspace information including dataplane group.
 245
 246    Returns details about a specific workspace, including which dataplane
 247    (region) it belongs to. Useful for determining if a workspace is in
 248    the EU region for filtering purposes.
 249
 250    Returns dict with keys: workspace_id, workspace_name, slug, organization_id,
 251    dataplane_group_id, dataplane_name, created_at, tombstone
 252    Or None if workspace not found.
 253    """
 254    # Resolve workspace ID alias (workspace_id is required, so resolved value is never None)
 255    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 256    assert resolved_workspace_id is not None  # Type narrowing: workspace_id is required
 257
 258    return query_workspace_info(resolved_workspace_id)
 259
 260
 261@mcp_tool(
 262    read_only=True,
 263    idempotent=True,
 264)
 265def query_prod_connector_versions(
 266    connector_definition_id: Annotated[
 267        str,
 268        Field(description="Connector definition UUID to list versions for"),
 269    ],
 270) -> list[dict[str, Any]]:
 271    """List all versions for a connector definition.
 272
 273    Returns all published versions of a connector, ordered by last_published
 274    date descending. Useful for understanding version history and finding
 275    specific version IDs for pinning or rollout monitoring.
 276
 277    Returns list of dicts with keys: version_id, docker_image_tag, docker_repository,
 278    release_stage, support_level, cdk_version, language, last_published, release_date
 279    """
 280    return query_connector_versions(connector_definition_id)
 281
 282
 283@mcp_tool(
 284    read_only=True,
 285    idempotent=True,
 286)
 287def query_prod_new_connector_releases(
 288    days: Annotated[
 289        int,
 290        Field(description="Number of days to look back (default: 7)", default=7),
 291    ] = 7,
 292    limit: Annotated[
 293        int,
 294        Field(description="Maximum number of results (default: 100)", default=100),
 295    ] = 100,
 296) -> list[dict[str, Any]]:
 297    """List recently published connector versions.
 298
 299    Returns connector versions published within the specified number of days.
 300    Uses last_published timestamp which reflects when the version was actually
 301    deployed to the registry (not the changelog date).
 302
 303    Returns list of dicts with keys: version_id, connector_definition_id, docker_repository,
 304    docker_image_tag, last_published, release_date, release_stage, support_level,
 305    cdk_version, language, created_at
 306    """
 307    return query_new_connector_releases(days=days, limit=limit)
 308
 309
 310@mcp_tool(
 311    read_only=True,
 312    idempotent=True,
 313)
 314def query_prod_actors_by_pinned_connector_version(
 315    connector_version_id: Annotated[
 316        str,
 317        Field(description="Connector version UUID to find pinned instances for"),
 318    ],
 319) -> list[dict[str, Any]]:
 320    """List actors (sources/destinations) effectively pinned to a specific connector version.
 321
 322    Returns all actors that are effectively pinned to a specific connector version,
 323    considering all scope levels: actor-level pins, workspace-level pins, and
 324    organization-level pins (with actor > workspace > organization precedence).
 325    Useful for monitoring rollouts and understanding which customers are affected.
 326
 327    The actor_id field is the actor ID (superset of source_id/destination_id).
 328
 329    Returns list of dicts with keys: actor_id, connector_definition_id, origin_type,
 330    origin, description, created_at, expires_at, pin_scope_type, actor_name,
 331    workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
 332
 333    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 334    level the effective pin came from.
 335    """
 336    return query_actors_pinned_to_version(connector_version_id)
 337
 338
 339@mcp_tool(
 340    read_only=True,
 341    idempotent=True,
 342)
 343def query_prod_recent_syncs_for_version_pinned_connector(
 344    connector_version_id: Annotated[
 345        str,
 346        Field(description="Connector version UUID to find sync results for"),
 347    ],
 348    days: Annotated[
 349        int,
 350        Field(description="Number of days to look back (default: 7)", default=7),
 351    ] = 7,
 352    limit: Annotated[
 353        int,
 354        Field(description="Maximum number of results (default: 100)", default=100),
 355    ] = 100,
 356    successful_only: Annotated[
 357        bool,
 358        Field(
 359            description="If True, only return successful syncs (default: False)",
 360            default=False,
 361        ),
 362    ] = False,
 363) -> list[dict[str, Any]]:
 364    """List sync job results for actors PINNED to a specific connector version.
 365
 366    IMPORTANT: This tool ONLY returns results for actors that are effectively
 367    pinned to the specified version via scoped_configuration at any scope level
 368    (actor, workspace, or organization). Most connections run unpinned and will
 369    NOT appear in these results.
 370
 371    Use this tool when you want to monitor rollout health for actors that have been
 372    pinned to a pre-release or specific version. For finding healthy connections
 373    across ALL actors using a connector type (regardless of pinning),
 374    use query_prod_recent_syncs_for_connector instead.
 375
 376    The actor_id field is the actor ID (superset of source_id/destination_id).
 377
 378    Returns list of dicts with keys: job_id, connection_id, job_status, started_at,
 379    job_updated_at, connection_name, actor_id, actor_name, connector_definition_id,
 380    pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name,
 381    organization_id, dataplane_group_id, dataplane_name
 382
 383    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 384    level the effective pin came from.
 385    """
 386    return query_syncs_for_version_pinned_connector(
 387        connector_version_id,
 388        days=days,
 389        limit=limit,
 390        successful_only=successful_only,
 391    )
 392
 393
 394@mcp_tool(
 395    read_only=True,
 396    idempotent=True,
 397    open_world=True,
 398)
 399def query_prod_recent_syncs_for_connector(
 400    source_definition_id: Annotated[
 401        str | None,
 402        Field(
 403            description=(
 404                "Source connector definition ID (UUID) to search for. "
 405                "Provide this OR source_canonical_name OR destination_definition_id "
 406                "OR destination_canonical_name (exactly one required). "
 407                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 408            ),
 409            default=None,
 410        ),
 411    ],
 412    source_canonical_name: Annotated[
 413        str | None,
 414        Field(
 415            description=(
 416                "Canonical source connector name to search for. "
 417                "Provide this OR source_definition_id OR destination_definition_id "
 418                "OR destination_canonical_name (exactly one required). "
 419                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 420            ),
 421            default=None,
 422        ),
 423    ],
 424    destination_definition_id: Annotated[
 425        str | None,
 426        Field(
 427            description=(
 428                "Destination connector definition ID (UUID) to search for. "
 429                "Provide this OR destination_canonical_name OR source_definition_id "
 430                "OR source_canonical_name (exactly one required). "
 431                "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
 432            ),
 433            default=None,
 434        ),
 435    ],
 436    destination_canonical_name: Annotated[
 437        str | None,
 438        Field(
 439            description=(
 440                "Canonical destination connector name to search for. "
 441                "Provide this OR destination_definition_id OR source_definition_id "
 442                "OR source_canonical_name (exactly one required). "
 443                "Examples: 'destination-duckdb', 'DuckDB'."
 444            ),
 445            default=None,
 446        ),
 447    ],
 448    status_filter: Annotated[
 449        StatusFilter,
 450        Field(
 451            description=(
 452                "Filter by job status: 'all' (default), 'succeeded', or 'failed'. "
 453                "Use 'succeeded' to find healthy connections with recent successful syncs. "
 454                "Use 'failed' to find connections with recent failures."
 455            ),
 456            default=StatusFilter.ALL,
 457        ),
 458    ],
 459    organization_id: Annotated[
 460        str | OrganizationAliasEnum | None,
 461        Field(
 462            description=(
 463                "Optional organization ID (UUID) or alias to filter results. "
 464                "If provided, only syncs from this organization will be returned. "
 465                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 466            ),
 467            default=None,
 468        ),
 469    ],
 470    lookback_days: Annotated[
 471        int,
 472        Field(description="Number of days to look back (default: 7)", default=7),
 473    ],
 474    limit: Annotated[
 475        int,
 476        Field(description="Maximum number of results (default: 100)", default=100),
 477    ],
 478    customer_tier_filter: Annotated[
 479        TierFilter,
 480        Field(
 481            description=(
 482                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 483                "Filters results to only include connections belonging to organizations "
 484                "in the specified tier. Use 'ALL' to include all tiers."
 485            ),
 486        ),
 487    ] = "TIER_2",
 488    *,
 489    exclude_pinned: Annotated[
 490        bool,
 491        Field(
 492            description=(
 493                "If True, exclude syncs for actors that are already pinned to a "
 494                "specific version (at any scope level: actor, workspace, or organization). "
 495                "Useful for 'prove fix' workflows where you want to find unpinned "
 496                "connections for live testing. Default: False (include all syncs)."
 497            ),
 498            default=False,
 499        ),
 500    ],
 501) -> list[dict[str, Any]]:
 502    """List recent sync jobs for ALL actors using a connector type.
 503
 504    This tool finds all actors with the given connector definition and returns their
 505    recent sync jobs, regardless of whether they have explicit version pins. It filters
 506    out deleted actors, deleted workspaces, and deprecated connections.
 507
 508    Results are always enriched with customer_tier and is_eu fields.
 509    The customer_tier_filter parameter is required to ensure tier-aware querying.
 510
 511    Use this tool to:
 512    - Find healthy connections with recent successful syncs (status_filter='succeeded')
 513    - Investigate connector issues across all users (status_filter='failed')
 514    - Get an overview of all recent sync activity (status_filter='all')
 515
 516    Set exclude_pinned=True to filter out syncs for actors that are already pinned to a
 517    specific version. This is useful for 'prove fix' live connection testing workflows
 518    where you want to find unpinned connections to test against.
 519
 520    Supports both SOURCE and DESTINATION connectors. Provide exactly one of:
 521    source_definition_id, source_canonical_name, destination_definition_id,
 522    or destination_canonical_name.
 523
 524    Key fields in results:
 525    - job_status: 'succeeded', 'failed', 'cancelled', etc.
 526    - connection_id, connection_name: The connection that ran the sync
 527    - actor_id, actor_name: The source or destination actor
 528    - customer_tier: TIER_0, TIER_1, or TIER_2
 529    - is_eu: Whether the workspace is in the EU region
 530    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 531    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 532    """
 533    # Validate that exactly one connector parameter is provided
 534    provided_params = [
 535        source_definition_id,
 536        source_canonical_name,
 537        destination_definition_id,
 538        destination_canonical_name,
 539    ]
 540    num_provided = sum(p is not None for p in provided_params)
 541    if num_provided != 1:
 542        raise PyAirbyteInputError(
 543            message=(
 544                "Exactly one of source_definition_id, source_canonical_name, "
 545                "destination_definition_id, or destination_canonical_name must be provided."
 546            ),
 547        )
 548
 549    # Determine if this is a destination connector
 550    is_destination = (
 551        destination_definition_id is not None or destination_canonical_name is not None
 552    )
 553
 554    # Resolve canonical name to definition ID if needed
 555    resolved_definition_id: str
 556    if source_canonical_name:
 557        resolved_definition_id = resolve_canonical_name_to_definition_id(
 558            canonical_name=source_canonical_name,
 559        )
 560    elif destination_canonical_name:
 561        resolved_definition_id = resolve_canonical_name_to_definition_id(
 562            canonical_name=destination_canonical_name,
 563        )
 564    elif source_definition_id:
 565        resolved_definition_id = source_definition_id
 566    else:
 567        # We've validated exactly one param is provided, so this must be set
 568        assert destination_definition_id is not None
 569        resolved_definition_id = destination_definition_id
 570
 571    # Resolve organization ID alias
 572    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 573
 574    rows = query_recent_syncs_for_connector(
 575        connector_definition_id=resolved_definition_id,
 576        is_destination=is_destination,
 577        status_filter=status_filter,
 578        organization_id=resolved_organization_id,
 579        days=lookback_days,
 580        limit=limit,
 581        exclude_pinned=exclude_pinned,
 582    )
 583
 584    enriched = enrich_rows_by_org(rows)
 585    return filter_rows_by_tier(enriched, customer_tier_filter)
 586
 587
 588@mcp_tool(
 589    read_only=True,
 590    idempotent=True,
 591    open_world=True,
 592)
 593def query_prod_failed_sync_attempts_for_connector(
 594    source_definition_id: Annotated[
 595        str | None,
 596        Field(
 597            description=(
 598                "Source connector definition ID (UUID) to search for. "
 599                "Exactly one of this or source_canonical_name is required. "
 600                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 601            ),
 602            default=None,
 603        ),
 604    ] = None,
 605    source_canonical_name: Annotated[
 606        str | None,
 607        Field(
 608            description=(
 609                "Canonical source connector name to search for. "
 610                "Exactly one of this or source_definition_id is required. "
 611                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 612            ),
 613            default=None,
 614        ),
 615    ] = None,
 616    organization_id: Annotated[
 617        str | OrganizationAliasEnum | None,
 618        Field(
 619            description=(
 620                "Optional organization ID (UUID) or alias to filter results. "
 621                "If provided, only failed attempts from this organization will be returned. "
 622                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 623            ),
 624            default=None,
 625        ),
 626    ] = None,
 627    lookback_days: Annotated[
 628        int,
 629        Field(description="Number of days to look back (default: 7)", default=7),
 630    ] = 7,
 631    limit: Annotated[
 632        int,
 633        Field(description="Maximum number of results (default: 100)", default=100),
 634    ] = 100,
 635    customer_tier_filter: Annotated[
 636        TierFilter,
 637        Field(
 638            description=(
 639                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 640                "Filters results to only include connections belonging to organizations "
 641                "in the specified tier. Use 'ALL' to include all tiers."
 642            ),
 643        ),
 644    ] = "TIER_2",
 645) -> list[dict[str, Any]]:
 646    """List failed sync attempts for ALL actors using a source connector type.
 647
 648    This tool finds all actors with the given connector definition and returns their
 649    failed sync attempts, regardless of whether they have explicit version pins.
 650
 651    Results are always enriched with customer_tier and is_eu fields.
 652    The customer_tier_filter parameter is required to ensure tier-aware querying.
 653
 654    This is useful for investigating connector issues across all users. Use this when
 655    you want to find failures for a connector type regardless of which version users
 656    are on.
 657
 658    Note: This tool only supports SOURCE connectors. For destination connectors,
 659    a separate tool would be needed.
 660
 661    Key fields in results:
 662    - failure_summary: JSON containing failure details including failureType and messages
 663    - customer_tier: TIER_0, TIER_1, or TIER_2
 664    - is_eu: Whether the workspace is in the EU region
 665    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 666    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 667    """
 668    # Validate that exactly one of the two parameters is provided
 669    if (source_definition_id is None) == (source_canonical_name is None):
 670        raise PyAirbyteInputError(
 671            message=(
 672                "Exactly one of source_definition_id or source_canonical_name "
 673                "must be provided, but not both."
 674            ),
 675        )
 676
 677    # Resolve canonical name to definition ID if needed
 678    resolved_definition_id: str
 679    if source_canonical_name:
 680        resolved_definition_id = resolve_canonical_name_to_definition_id(
 681            canonical_name=source_canonical_name,
 682        )
 683    else:
 684        resolved_definition_id = source_definition_id  # type: ignore[assignment]
 685
 686    # Resolve organization ID alias
 687    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 688
 689    rows = query_failed_sync_attempts_for_connector(
 690        connector_definition_id=resolved_definition_id,
 691        organization_id=resolved_organization_id,
 692        days=lookback_days,
 693        limit=limit,
 694    )
 695    enriched = enrich_rows_by_org(rows)
 696    return filter_rows_by_tier(enriched, customer_tier_filter)
 697
 698
 699@mcp_tool(
 700    read_only=True,
 701    idempotent=True,
 702    open_world=True,
 703)
 704def query_prod_connections_by_connector(
 705    source_definition_id: Annotated[
 706        str | None,
 707        Field(
 708            description=(
 709                "Source connector definition ID (UUID) to search for. "
 710                "Exactly one of source_definition_id, source_canonical_name, "
 711                "destination_definition_id, or destination_canonical_name is required. "
 712                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 713            ),
 714            default=None,
 715        ),
 716    ] = None,
 717    source_canonical_name: Annotated[
 718        str | None,
 719        Field(
 720            description=(
 721                "Canonical source connector name to search for. "
 722                "Exactly one of source_definition_id, source_canonical_name, "
 723                "destination_definition_id, or destination_canonical_name is required. "
 724                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 725            ),
 726            default=None,
 727        ),
 728    ] = None,
 729    destination_definition_id: Annotated[
 730        str | None,
 731        Field(
 732            description=(
 733                "Destination connector definition ID (UUID) to search for. "
 734                "Exactly one of source_definition_id, source_canonical_name, "
 735                "destination_definition_id, or destination_canonical_name is required. "
 736                "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
 737            ),
 738            default=None,
 739        ),
 740    ] = None,
 741    destination_canonical_name: Annotated[
 742        str | None,
 743        Field(
 744            description=(
 745                "Canonical destination connector name to search for. "
 746                "Exactly one of source_definition_id, source_canonical_name, "
 747                "destination_definition_id, or destination_canonical_name is required. "
 748                "Examples: 'destination-duckdb', 'DuckDB'."
 749            ),
 750            default=None,
 751        ),
 752    ] = None,
 753    organization_id: Annotated[
 754        str | OrganizationAliasEnum | None,
 755        Field(
 756            description=(
 757                "Optional organization ID (UUID) or alias to filter results. "
 758                "If provided, only connections in this organization will be returned. "
 759                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 760            ),
 761            default=None,
 762        ),
 763    ] = None,
 764    limit: Annotated[
 765        int,
 766        Field(description="Maximum number of results (default: 1000)", default=1000),
 767    ] = 1000,
 768    customer_tier_filter: Annotated[
 769        TierFilter,
 770        Field(
 771            description=(
 772                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 773                "Filters results to only include connections belonging to organizations "
 774                "in the specified tier. Use 'ALL' to include all tiers."
 775            ),
 776        ),
 777    ] = "TIER_2",
 778    *,
 779    exclude_pinned: Annotated[
 780        bool,
 781        Field(
 782            description=(
 783                "If True, exclude connections whose connector is already pinned to a "
 784                "specific version (at any scope level: actor, workspace, or organization). "
 785                "Useful for 'prove fix' workflows where you want to find unpinned "
 786                "connections for live testing. Default: False (include all connections)."
 787            ),
 788            default=False,
 789        ),
 790    ],
 791) -> list[dict[str, Any]]:
 792    """Search for all connections using a specific source or destination connector type.
 793
 794    This tool queries the Airbyte Cloud Prod DB Replica directly for fast results.
 795    It finds all connections where the source or destination connector matches the
 796    specified type, regardless of how the connector is named by users.
 797
 798    Results are always enriched with customer_tier and is_eu fields.
 799    The customer_tier_filter parameter is required to ensure tier-aware querying.
 800
 801    Optionally filter by organization_id to limit results to a specific organization.
 802    Use '@airbyte-internal' as an alias for the Airbyte internal organization.
 803
 804    Set exclude_pinned=True to filter out connections that are already pinned to a
 805    specific version. This is useful for 'prove fix' live connection testing workflows
 806    where you want to find unpinned connections to test against.
 807
 808    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
 809    For source queries, returns: connection_id, connection_name, connection_url, source_id,
 810    source_name, source_definition_id, workspace_id, workspace_name, organization_id,
 811    dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id,
 812    pin_scope_type, customer_tier, is_eu.
 813    For destination queries, returns: connection_id, connection_name, connection_url,
 814    destination_id, destination_name, destination_definition_id, workspace_id,
 815    workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type,
 816    pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
 817
 818    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 819    level the effective pin came from (NULL if not pinned).
 820    """
 821    # Validate that exactly one of the four connector parameters is provided
 822    provided_params = [
 823        source_definition_id,
 824        source_canonical_name,
 825        destination_definition_id,
 826        destination_canonical_name,
 827    ]
 828    num_provided = sum(p is not None for p in provided_params)
 829    if num_provided != 1:
 830        raise PyAirbyteInputError(
 831            message=(
 832                "Exactly one of source_definition_id, source_canonical_name, "
 833                "destination_definition_id, or destination_canonical_name must be provided."
 834            ),
 835        )
 836
 837    # Determine if this is a source or destination query and resolve the definition ID
 838    is_source_query = (
 839        source_definition_id is not None or source_canonical_name is not None
 840    )
 841    resolved_definition_id: str
 842
 843    if source_canonical_name:
 844        resolved_definition_id = resolve_canonical_name_to_definition_id(
 845            canonical_name=source_canonical_name,
 846        )
 847    elif source_definition_id:
 848        resolved_definition_id = source_definition_id
 849    elif destination_canonical_name:
 850        resolved_definition_id = resolve_canonical_name_to_definition_id(
 851            canonical_name=destination_canonical_name,
 852        )
 853    else:
 854        resolved_definition_id = destination_definition_id  # type: ignore[assignment]
 855
 856    # Resolve organization ID alias
 857    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 858
 859    # Query the database based on connector type
 860    if is_source_query:
 861        rows = [
 862            {
 863                "organization_id": str(row.get("organization_id", "")),
 864                "workspace_id": str(row["workspace_id"]),
 865                "workspace_name": row.get("workspace_name", ""),
 866                "connection_id": str(row["connection_id"]),
 867                "connection_name": row.get("connection_name", ""),
 868                "connection_url": (
 869                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
 870                    f"/connections/{row['connection_id']}/status"
 871                ),
 872                "source_id": str(row["source_id"]),
 873                "source_name": row.get("source_name", ""),
 874                "source_definition_id": str(row["source_definition_id"]),
 875                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
 876                "dataplane_name": row.get("dataplane_name", ""),
 877                "pin_origin_type": row.get("pin_origin_type"),
 878                "pin_origin": row.get("pin_origin"),
 879                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
 880                "pin_scope_type": row.get("pin_scope_type"),
 881            }
 882            for row in query_connections_by_connector(
 883                connector_definition_id=resolved_definition_id,
 884                organization_id=resolved_organization_id,
 885                limit=limit,
 886                exclude_pinned=exclude_pinned,
 887            )
 888        ]
 889    else:
 890        # Destination query
 891        rows = [
 892            {
 893                "organization_id": str(row.get("organization_id", "")),
 894                "workspace_id": str(row["workspace_id"]),
 895                "workspace_name": row.get("workspace_name", ""),
 896                "connection_id": str(row["connection_id"]),
 897                "connection_name": row.get("connection_name", ""),
 898                "connection_url": (
 899                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
 900                    f"/connections/{row['connection_id']}/status"
 901                ),
 902                "destination_id": str(row["destination_id"]),
 903                "destination_name": row.get("destination_name", ""),
 904                "destination_definition_id": str(row["destination_definition_id"]),
 905                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
 906                "dataplane_name": row.get("dataplane_name", ""),
 907                "pin_origin_type": row.get("pin_origin_type"),
 908                "pin_origin": row.get("pin_origin"),
 909                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
 910                "pin_scope_type": row.get("pin_scope_type"),
 911            }
 912            for row in query_connections_by_destination_connector(
 913                connector_definition_id=resolved_definition_id,
 914                organization_id=resolved_organization_id,
 915                limit=limit,
 916                exclude_pinned=exclude_pinned,
 917            )
 918        ]
 919
 920    enriched = enrich_rows_by_org(rows)
 921    return filter_rows_by_tier(enriched, customer_tier_filter)
 922
 923
 924@mcp_tool(
 925    read_only=True,
 926    idempotent=True,
 927    open_world=True,
 928)
 929def query_prod_connections_by_stream(
 930    stream_name: Annotated[
 931        str,
 932        Field(
 933            description=(
 934                "Name of the stream to search for in connection catalogs. "
 935                "This must match the exact stream name as configured in the connection. "
 936                "Examples: 'global_exclusions', 'campaigns', 'users'."
 937            ),
 938        ),
 939    ],
 940    source_definition_id: Annotated[
 941        str | None,
 942        Field(
 943            description=(
 944                "Source connector definition ID (UUID) to search for. "
 945                "Provide this OR source_canonical_name (exactly one required). "
 946                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 947            ),
 948            default=None,
 949        ),
 950    ],
 951    source_canonical_name: Annotated[
 952        str | None,
 953        Field(
 954            description=(
 955                "Canonical source connector name to search for. "
 956                "Provide this OR source_definition_id (exactly one required). "
 957                "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
 958            ),
 959            default=None,
 960        ),
 961    ],
 962    organization_id: Annotated[
 963        str | OrganizationAliasEnum | None,
 964        Field(
 965            description=(
 966                "Optional organization ID (UUID) or alias to filter results. "
 967                "If provided, only connections in this organization will be returned. "
 968                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 969            ),
 970            default=None,
 971        ),
 972    ],
 973    limit: Annotated[
 974        int,
 975        Field(description="Maximum number of results (default: 100)", default=100),
 976    ],
 977    customer_tier_filter: Annotated[
 978        TierFilter,
 979        Field(
 980            description=(
 981                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 982                "Filters results to only include connections belonging to organizations "
 983                "in the specified tier. Use 'ALL' to include all tiers."
 984            ),
 985        ),
 986    ] = "TIER_2",
 987) -> list[dict[str, Any]]:
 988    """Find connections that have a specific stream enabled in their catalog.
 989
 990    This tool searches the connection's configured catalog (JSONB) for streams
 991    matching the specified name. It's particularly useful when validating
 992    connector fixes that affect specific streams - you can quickly find
 993    customer connections that use the affected stream.
 994
 995    Results are always enriched with customer_tier and is_eu fields.
 996    The customer_tier_filter parameter is required to ensure tier-aware querying.
 997
 998    Use cases:
 999    - Finding connections with a specific stream enabled for regression testing
1000    - Validating connector fixes that affect particular streams
1001    - Identifying which customers use rarely-enabled streams
1002
1003    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
1004    """
1005    provided_params = [source_definition_id, source_canonical_name]
1006    num_provided = sum(p is not None for p in provided_params)
1007    if num_provided != 1:
1008        raise PyAirbyteInputError(
1009            message=(
1010                "Exactly one of source_definition_id or source_canonical_name "
1011                "must be provided."
1012            ),
1013        )
1014
1015    resolved_definition_id: str
1016    if source_canonical_name:
1017        resolved_definition_id = resolve_canonical_name_to_definition_id(
1018            canonical_name=source_canonical_name,
1019        )
1020    else:
1021        assert source_definition_id is not None
1022        resolved_definition_id = source_definition_id
1023
1024    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1025
1026    rows = [
1027        {
1028            "organization_id": str(row.get("organization_id", "")),
1029            "workspace_id": str(row["workspace_id"]),
1030            "workspace_name": row.get("workspace_name", ""),
1031            "connection_id": str(row["connection_id"]),
1032            "connection_name": row.get("connection_name", ""),
1033            "connection_status": row.get("connection_status", ""),
1034            "connection_url": (
1035                f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
1036                f"/connections/{row['connection_id']}/status"
1037            ),
1038            "source_id": str(row["source_id"]),
1039            "source_name": row.get("source_name", ""),
1040            "source_definition_id": str(row["source_definition_id"]),
1041            "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1042            "dataplane_name": row.get("dataplane_name", ""),
1043        }
1044        for row in query_connections_by_stream(
1045            connector_definition_id=resolved_definition_id,
1046            stream_name=stream_name,
1047            organization_id=resolved_organization_id,
1048            limit=limit,
1049        )
1050    ]
1051    enriched = enrich_rows_by_org(rows)
1052    return filter_rows_by_tier(enriched, customer_tier_filter)
1053
1054
1055@mcp_tool(
1056    read_only=True,
1057    idempotent=True,
1058)
1059def query_prod_workspaces_by_email_domain(
1060    email_domain: Annotated[
1061        str,
1062        Field(
1063            description=(
1064                "Email domain to search for (e.g., 'motherduck.com', 'fivetran.com'). "
1065                "Do not include the '@' symbol. This will find workspaces where users "
1066                "have email addresses with this domain."
1067            ),
1068        ),
1069    ],
1070    limit: Annotated[
1071        int,
1072        Field(
1073            description="Maximum number of workspaces to return (default: 100)",
1074            default=100,
1075        ),
1076    ] = 100,
1077) -> WorkspacesByEmailDomainResult:
1078    """Find workspaces by email domain.
1079
1080    This tool searches for workspaces where users have email addresses matching
1081    the specified domain. This is useful for identifying workspaces belonging to
1082    specific companies - for example, searching for "motherduck.com" will find
1083    workspaces belonging to MotherDuck employees.
1084
1085    Use cases:
1086    - Finding partner organization connections for testing connector fixes
1087    - Identifying internal test accounts for specific integrations
1088    - Locating workspaces belonging to technology partners
1089
1090    The returned organization IDs can be used with other tools like
1091    `query_prod_connections_by_connector` to find connections within
1092    those organizations for safe testing.
1093    """
1094    # Strip leading @ if provided
1095    clean_domain = email_domain.lstrip("@")
1096
1097    # Query the database
1098    rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit)
1099
1100    # Convert rows to Pydantic models
1101    workspaces = [
1102        WorkspaceInfo(
1103            organization_id=str(row["organization_id"]),
1104            workspace_id=str(row["workspace_id"]),
1105            workspace_name=row.get("workspace_name", ""),
1106            slug=row.get("slug"),
1107            email=row.get("email"),
1108            dataplane_group_id=_opt_str(row.get("dataplane_group_id")),
1109            dataplane_name=row.get("dataplane_name"),
1110            created_at=row.get("created_at"),
1111        )
1112        for row in rows
1113    ]
1114
1115    # Enrich with tier annotation (annotation only, no filtering)
1116    unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces))
1117    tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)}
1118    for ws in workspaces:
1119        tier_result = tier_results.get(ws.organization_id)
1120        if tier_result:
1121            ws.customer_tier = tier_result.customer_tier
1122        ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False
1123
1124    return WorkspacesByEmailDomainResult(
1125        email_domain=clean_domain,
1126        total_workspaces_found=len(workspaces),
1127        unique_organization_ids=unique_org_ids,
1128        workspaces=workspaces,
1129    )
1130
1131
1132def _build_connector_stats(
1133    connector_definition_id: str,
1134    connector_type: str,
1135    canonical_name: str | None,
1136    rows: list[dict[str, Any]],
1137    version_tags: dict[str, str | None],
1138) -> ConnectorConnectionStats:
1139    """Build ConnectorConnectionStats from query result rows."""
1140    # Aggregate totals across all version groups
1141    total_connections = 0
1142    enabled_connections = 0
1143    active_connections = 0
1144    pinned_connections = 0
1145    unpinned_connections = 0
1146    total_succeeded = 0
1147    total_failed = 0
1148    total_cancelled = 0
1149    total_running = 0
1150    total_unknown = 0
1151
1152    by_version: list[VersionPinStats] = []
1153
1154    for row in rows:
1155        version_id = row.get("pinned_version_id")
1156        row_total = int(row.get("total_connections", 0))
1157        row_enabled = int(row.get("enabled_connections", 0))
1158        row_active = int(row.get("active_connections", 0))
1159        row_pinned = int(row.get("pinned_connections", 0))
1160        row_unpinned = int(row.get("unpinned_connections", 0))
1161        row_succeeded = int(row.get("succeeded_connections", 0))
1162        row_failed = int(row.get("failed_connections", 0))
1163        row_cancelled = int(row.get("cancelled_connections", 0))
1164        row_running = int(row.get("running_connections", 0))
1165        row_unknown = int(row.get("unknown_connections", 0))
1166
1167        total_connections += row_total
1168        enabled_connections += row_enabled
1169        active_connections += row_active
1170        pinned_connections += row_pinned
1171        unpinned_connections += row_unpinned
1172        total_succeeded += row_succeeded
1173        total_failed += row_failed
1174        total_cancelled += row_cancelled
1175        total_running += row_running
1176        total_unknown += row_unknown
1177
1178        by_version.append(
1179            VersionPinStats(
1180                pinned_version_id=str(version_id) if version_id else None,
1181                docker_image_tag=version_tags.get(str(version_id))
1182                if version_id
1183                else None,
1184                total_connections=row_total,
1185                enabled_connections=row_enabled,
1186                active_connections=row_active,
1187                latest_attempt=LatestAttemptBreakdown(
1188                    succeeded=row_succeeded,
1189                    failed=row_failed,
1190                    cancelled=row_cancelled,
1191                    running=row_running,
1192                    unknown=row_unknown,
1193                ),
1194            )
1195        )
1196
1197    return ConnectorConnectionStats(
1198        connector_definition_id=connector_definition_id,
1199        connector_type=connector_type,
1200        canonical_name=canonical_name,
1201        total_connections=total_connections,
1202        enabled_connections=enabled_connections,
1203        active_connections=active_connections,
1204        pinned_connections=pinned_connections,
1205        unpinned_connections=unpinned_connections,
1206        latest_attempt=LatestAttemptBreakdown(
1207            succeeded=total_succeeded,
1208            failed=total_failed,
1209            cancelled=total_cancelled,
1210            running=total_running,
1211            unknown=total_unknown,
1212        ),
1213        by_version=by_version,
1214    )
1215
1216
1217@mcp_tool(
1218    read_only=True,
1219    idempotent=True,
1220    open_world=True,
1221)
1222def query_prod_connector_connection_stats(
1223    source_definition_ids: Annotated[
1224        list[str] | None,
1225        Field(
1226            description=(
1227                "List of source connector definition IDs (UUIDs) to get stats for. "
1228                "Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
1229            ),
1230            default=None,
1231        ),
1232    ] = None,
1233    destination_definition_ids: Annotated[
1234        list[str] | None,
1235        Field(
1236            description=(
1237                "List of destination connector definition IDs (UUIDs) to get stats for. "
1238                "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
1239            ),
1240            default=None,
1241        ),
1242    ] = None,
1243    lookback_days: Annotated[
1244        int,
1245        Field(
1246            description=(
1247                "Number of days to look back for 'active' connections (default: 7). "
1248                "Connections with sync activity within this window are counted as active."
1249            ),
1250            default=7,
1251        ),
1252    ] = 7,
1253) -> ConnectorConnectionStatsResponse:
1254    """Get aggregate connection stats for multiple connectors.
1255
1256    Returns counts of connections grouped by pinned version for each connector,
1257    including:
1258    - Total, enabled, and active connection counts
1259    - Pinned vs unpinned breakdown
1260    - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
1261
1262    This tool is designed for release monitoring workflows. It allows you to:
1263    1. Query recently released connectors to identify which ones to monitor
1264    2. Get aggregate stats showing how many connections are using each version
1265    3. See health metrics (pass/fail) broken down by version
1266
1267    The `lookback_days` parameter controls the lookback window for:
1268    - Counting 'active' connections (those with recent sync activity)
1269    - Determining 'latest attempt status' (most recent attempt within the window)
1270
1271    Connections with no sync activity in the lookback window will have
1272    'unknown' status in the latest_attempt breakdown.
1273    """
1274    # Initialize empty lists if None
1275    source_ids = source_definition_ids or []
1276    destination_ids = destination_definition_ids or []
1277
1278    if not source_ids and not destination_ids:
1279        raise PyAirbyteInputError(
1280            message=(
1281                "At least one of source_definition_ids or destination_definition_ids "
1282                "must be provided."
1283            ),
1284        )
1285
1286    sources: list[ConnectorConnectionStats] = []
1287    destinations: list[ConnectorConnectionStats] = []
1288
1289    # Process source connectors
1290    for source_def_id in source_ids:
1291        # Get version info for tag lookup
1292        versions = query_connector_versions(source_def_id)
1293        version_tags = {
1294            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1295        }
1296
1297        # Get aggregate stats
1298        rows = query_source_connection_stats(source_def_id, days=lookback_days)
1299
1300        sources.append(
1301            _build_connector_stats(
1302                connector_definition_id=source_def_id,
1303                connector_type="source",
1304                canonical_name=None,
1305                rows=rows,
1306                version_tags=version_tags,
1307            )
1308        )
1309
1310    # Process destination connectors
1311    for dest_def_id in destination_ids:
1312        # Get version info for tag lookup
1313        versions = query_connector_versions(dest_def_id)
1314        version_tags = {
1315            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1316        }
1317
1318        # Get aggregate stats
1319        rows = query_destination_connection_stats(dest_def_id, days=lookback_days)
1320
1321        destinations.append(
1322            _build_connector_stats(
1323                connector_definition_id=dest_def_id,
1324                connector_type="destination",
1325                canonical_name=None,
1326                rows=rows,
1327                version_tags=version_tags,
1328            )
1329        )
1330
1331    return ConnectorConnectionStatsResponse(
1332        sources=sources,
1333        destinations=destinations,
1334        lookback_days=lookback_days,
1335        generated_at=datetime.now(timezone.utc),
1336    )
1337
1338
1339# =============================================================================
1340# Connector Rollout Models and Tools
1341# =============================================================================
1342
1343
1344class ConnectorRolloutInfo(BaseModel):
1345    """Information about a connector rollout."""
1346
1347    rollout_id: str = Field(description="The rollout UUID")
1348    actor_definition_id: str = Field(description="The connector definition UUID")
1349    state: str = Field(
1350        description="Rollout state: initialized, workflow_started, in_progress, "
1351        "paused, finalizing, succeeded, errored, failed_rolled_back, canceled"
1352    )
1353    initial_rollout_pct: int | None = Field(
1354        default=None, description="Initial rollout percentage"
1355    )
1356    current_target_rollout_pct: int | None = Field(
1357        default=None, description="Current target rollout percentage"
1358    )
1359    final_target_rollout_pct: int | None = Field(
1360        default=None, description="Final target rollout percentage"
1361    )
1362    has_breaking_changes: bool = Field(
1363        description="Whether the RC has breaking changes"
1364    )
1365    max_step_wait_time_mins: int | None = Field(
1366        default=None, description="Maximum wait time between rollout steps in minutes"
1367    )
1368    rollout_strategy: str | None = Field(
1369        default=None, description="Rollout strategy: manual, automated, overridden"
1370    )
1371    updated_by_user_id: str | None = Field(
1372        default=None,
1373        description="User ID recorded as last updating the rollout",
1374    )
1375    updated_by_user_name: str | None = Field(
1376        default=None,
1377        description="Name recorded as last updating the rollout",
1378    )
1379    updated_by_user_email: str | None = Field(
1380        default=None,
1381        description="Email recorded as last updating the rollout",
1382    )
1383    workflow_run_id: str | None = Field(
1384        default=None, description="Temporal workflow run ID"
1385    )
1386    error_msg: str | None = Field(default=None, description="Error message if errored")
1387    failed_reason: str | None = Field(
1388        default=None, description="Reason for failure if failed"
1389    )
1390    paused_reason: str | None = Field(
1391        default=None, description="Reason for pause if paused"
1392    )
1393    tag: str | None = Field(default=None, description="Optional tag for the rollout")
1394    created_at: datetime | None = Field(
1395        default=None, description="When the rollout was created"
1396    )
1397    updated_at: datetime | None = Field(
1398        default=None, description="When the rollout was last updated"
1399    )
1400    completed_at: datetime | None = Field(
1401        default=None, description="When the rollout completed (if terminal)"
1402    )
1403    expires_at: datetime | None = Field(
1404        default=None, description="When the rollout expires"
1405    )
1406    rc_docker_image_tag: str | None = Field(
1407        default=None, description="Docker image tag of the release candidate"
1408    )
1409    rc_docker_repository: str | None = Field(
1410        default=None, description="Docker repository of the release candidate"
1411    )
1412    initial_docker_image_tag: str | None = Field(
1413        default=None, description="Docker image tag of the initial version"
1414    )
1415    initial_docker_repository: str | None = Field(
1416        default=None, description="Docker repository of the initial version"
1417    )
1418    filters: dict[str, Any] | None = Field(
1419        default=None,
1420        description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})",
1421    )
1422    customer_tier: str | None = Field(
1423        default=None,
1424        description="Customer tier targeted by this rollout (extracted from filters), "
1425        "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.",
1426    )
1427
1428
1429def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None:
1430    """Parse the rollout filters field from a database row.
1431
1432    The filters field may be a JSON string, a dict, or None.
1433    """
1434    if filters_raw is None:
1435        return None
1436    if isinstance(filters_raw, dict):
1437        return filters_raw
1438    if isinstance(filters_raw, str):
1439        try:
1440            parsed = json.loads(filters_raw)
1441            if isinstance(parsed, dict):
1442                return parsed
1443        except (json.JSONDecodeError, TypeError):
1444            pass
1445    return None
1446
1447
1448def _extract_tier_from_filters(filters_raw: Any) -> str | None:
1449    """Extract customer tier from rollout filters JSON.
1450
1451    Supports two formats:
1452    - Legacy: `{"tierFilter": {"tier": "TIER_0"}}`
1453    - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}`
1454    """
1455    parsed = _parse_rollout_filters(filters_raw)
1456    if parsed is None:
1457        return None
1458
1459    # Current format: customerTierFilters list
1460    tier_filters = parsed.get("customerTierFilters")
1461    if isinstance(tier_filters, list):
1462        for entry in tier_filters:
1463            if isinstance(entry, dict) and entry.get("name") == "TIER":
1464                values = entry.get("value")
1465                if isinstance(values, list) and len(values) == 1:
1466                    return str(values[0])
1467                if isinstance(values, list) and len(values) > 1:
1468                    return ", ".join(str(v) for v in values)
1469
1470    # Legacy format: tierFilter dict
1471    tier_filter = parsed.get("tierFilter")
1472    if isinstance(tier_filter, dict):
1473        tier = tier_filter.get("tier")
1474        if isinstance(tier, str):
1475            return tier
1476
1477    return None
1478
1479
1480def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo:
1481    """Convert a database row to a ConnectorRolloutInfo model."""
1482    return ConnectorRolloutInfo(
1483        rollout_id=str(row["rollout_id"]),
1484        actor_definition_id=str(row["actor_definition_id"]),
1485        state=row["state"],
1486        initial_rollout_pct=row.get("initial_rollout_pct"),
1487        current_target_rollout_pct=row.get("current_target_rollout_pct"),
1488        final_target_rollout_pct=row.get("final_target_rollout_pct"),
1489        has_breaking_changes=row["has_breaking_changes"],
1490        max_step_wait_time_mins=row.get("max_step_wait_time_mins"),
1491        rollout_strategy=row.get("rollout_strategy"),
1492        updated_by_user_id=str(row["updated_by_user_id"])
1493        if row.get("updated_by_user_id") is not None
1494        else None,
1495        updated_by_user_name=row.get("updated_by_user_name"),
1496        updated_by_user_email=row.get("updated_by_user_email"),
1497        workflow_run_id=row.get("workflow_run_id"),
1498        error_msg=row.get("error_msg"),
1499        failed_reason=row.get("failed_reason"),
1500        paused_reason=row.get("paused_reason"),
1501        tag=row.get("tag"),
1502        created_at=row.get("created_at"),
1503        updated_at=row.get("updated_at"),
1504        completed_at=row.get("completed_at"),
1505        expires_at=row.get("expires_at"),
1506        rc_docker_image_tag=row.get("rc_docker_image_tag"),
1507        rc_docker_repository=row.get("rc_docker_repository"),
1508        initial_docker_image_tag=row.get("initial_docker_image_tag"),
1509        initial_docker_repository=row.get("initial_docker_repository"),
1510        filters=_parse_rollout_filters(row.get("filters")),
1511        customer_tier=_extract_tier_from_filters(row.get("filters")),
1512    )
1513
1514
1515@mcp_tool(
1516    read_only=True,
1517    idempotent=True,
1518)
1519def query_prod_connector_rollouts(
1520    actor_definition_id: Annotated[
1521        str | None,
1522        Field(description="Connector definition UUID to filter by (optional)"),
1523    ] = None,
1524    rollout_id: Annotated[
1525        str | None,
1526        Field(description="Specific rollout UUID to look up (optional)"),
1527    ] = None,
1528    active_only: Annotated[
1529        bool,
1530        Field(description="If true, only return active (non-terminal) rollouts"),
1531    ] = False,
1532    limit: Annotated[
1533        int,
1534        Field(description="Maximum number of results (default: 100)"),
1535    ] = 100,
1536) -> list[ConnectorRolloutInfo]:
1537    """Query connector rollouts with flexible filtering.
1538
1539    Returns rollouts based on the provided filters. If no filters are specified,
1540    returns all active rollouts. Useful for monitoring rollout status and history.
1541
1542    Filter behavior:
1543    - rollout_id: Returns that specific rollout (ignores other filters)
1544    - active_only: Returns only active (non-terminal) rollouts
1545    - actor_definition_id: Returns rollouts for that specific connector
1546    - No filters: Returns all active rollouts (same as active_only=True)
1547    """
1548    rows = query_connector_rollouts(
1549        actor_definition_id=actor_definition_id,
1550        rollout_id=rollout_id,
1551        active_only=active_only,
1552        limit=limit,
1553    )
1554    return [_row_to_connector_rollout_info(row) for row in rows]
1555
1556
1557def register_prod_db_query_tools(app: FastMCP) -> None:
1558    """Register prod DB query tools with the FastMCP app."""
1559    register_mcp_tools(app, mcp_module=__name__)