airbyte_ops_mcp.mcp.prod_db_queries

MCP tools for querying the Airbyte Cloud Prod DB Replica.

This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.

MCP reference

MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 16 tool(s), 0 prompt(s), 0 resource(s).

Tools (16)

query_connector_pin_stats

Hints: read-only · idempotent · open-world

Query connector versions that have at least one scoped configuration pin.

Returns versions from the prod DB that are referenced by at least one scoped_configuration pin (key = 'connector_version'). Each version appears exactly once with per-scope pin breakdown (actor, workspace, org).

If neither filter is provided, returns the global superset across all connectors.

Parameters:

Name Type Required Default Description
connector_definition_id string | null no null Connector definition UUID to filter by (optional). Mutually exclusive with connector_canonical_name.
connector_canonical_name string | null no null Connector canonical name (e.g. source-postgres) to filter by. Resolved to a definition ID via the registry. Mutually exclusive with connector_definition_id.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector definition UUID to filter by (optional). Mutually exclusive with `connector_canonical_name`."
    },
    "connector_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector canonical name (e.g. `source-postgres`) to filter by. Resolved to a definition ID via the registry. Mutually exclusive with `connector_definition_id`."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "A connector version that has at least one scoped configuration pin.",
        "properties": {
          "version_id": {
            "description": "The actor_definition_version UUID",
            "type": "string"
          },
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_name": {
            "description": "Human-readable connector name",
            "type": "string"
          },
          "docker_repository": {
            "description": "Docker repository path",
            "type": "string"
          },
          "docker_image_tag": {
            "description": "Docker image tag for this version",
            "type": "string"
          },
          "last_published": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "ISO timestamp when this version was last published"
          },
          "pin_count": {
            "description": "Total number of scoped_configuration rows pinning to this version",
            "type": "integer"
          },
          "breaking_change_pins": {
            "default": 0,
            "description": "Number of actor-scoped pins created by breaking changes",
            "type": "integer"
          },
          "rollout_pins": {
            "default": 0,
            "description": "Number of pins created by connector rollouts",
            "type": "integer"
          },
          "actor_pins": {
            "description": "Number of actor-scoped pins (excludes breaking change and rollout pins)",
            "type": "integer"
          },
          "workspace_pins": {
            "description": "Number of workspace-scoped pins",
            "type": "integer"
          },
          "org_pins": {
            "description": "Number of organization-scoped pins",
            "type": "integer"
          }
        },
        "required": [
          "version_id",
          "connector_definition_id",
          "connector_name",
          "docker_repository",
          "docker_image_tag",
          "pin_count",
          "actor_pins",
          "workspace_pins",
          "org_pins"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_actors_by_pinned_connector_version

Hints: read-only · idempotent

List actors (sources/destinations) effectively pinned to a specific connector version.

Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.

The actor_id field is the actor ID (superset of source_id/destination_id).

Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.

Parameters:

Name Type Required Default Description
connector_version_id string yes Connector version UUID to find pinned instances for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "description": "Connector version UUID to find pinned instances for",
      "type": "string"
    }
  },
  "required": [
    "connector_version_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connection_sync_activity

Hints: read-only · idempotent · open-world

List recent sync jobs and attempts from the Prod DB Replica.

Returns one row per (job, attempt) pair for sync jobs whose updated_at falls in [start_at, end_at), scoped to the provided organization, workspace, or connection IDs. Designed for live operational lookups — e.g. "what happened on this connection in the last hour" — not for historical analysis.

Each row is enriched with customer_tier and is_eu for the owning organization. Tier filtering is intentionally not applied — this is a read-only observability query.

Input requirements:

  • At least one of organization_id, workspace_id, or connection_ids must be provided (any combination is accepted).
  • start_at and end_at must be timezone-aware and start_at < end_at.

Key fields in each row:

  • job_id, attempt_id, attempt_number
  • job_status, attempt_status
  • job_started_at, job_updated_at, attempt_ended_at
  • failure_summary (JSON; populated when an attempt failed)
  • connection_id, connection_name, connection_status
  • source_actor_id, source_actor_name, source_actor_definition_id
  • destination_actor_id, destination_actor_name, destination_actor_definition_id
  • workspace_id, workspace_name, organization_id
  • dataplane_group_id, dataplane_name
  • customer_tier, is_eu (added by tier enrichment)

Parameters:

Name Type Required Default Description
start_at string yes Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or Z).
end_at string yes Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after start_at.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @airbyte-internal as an alias for the Airbyte internal org.
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") | null no null Optional workspace UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @devin-ai-sandbox as an alias for the Devin AI sandbox workspace.
connection_ids array<string> | null no null Optional list of connection UUIDs. At least one of organization_id, workspace_id, or connection_ids is required.
status_filter enum("all", "succeeded", "failed") no "all" Filter by job status: all (default), succeeded, or failed. Applied to jobs.status in the Prod DB Replica.
limit integer no 1000 Maximum number of attempt rows to return.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "start_at": {
      "description": "Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or `Z`).",
      "format": "date-time",
      "type": "string"
    },
    "end_at": {
      "description": "Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after `start_at`.",
      "format": "date-time",
      "type": "string"
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@airbyte-internal` as an alias for the Airbyte internal org."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional workspace UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@devin-ai-sandbox` as an alias for the Devin AI sandbox workspace."
    },
    "connection_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional list of connection UUIDs. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required."
    },
    "status_filter": {
      "description": "Filter by job status: `all` (default), `succeeded`, or `failed`. Applied to `jobs.status` in the Prod DB Replica.",
      "enum": [
        "all",
        "succeeded",
        "failed"
      ],
      "type": "string",
      "default": "all"
    },
    "limit": {
      "default": 1000,
      "description": "Maximum number of attempt rows to return.",
      "type": "integer"
    }
  },
  "required": [
    "start_at",
    "end_at"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_connector

Hints: read-only · idempotent · open-world

Search for all connections using a specific source or destination connector type.

This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.

Set exclude_pinned=True to filter out connections that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Set enabled_schedules_only=True to restrict results to connections that are both enabled (status='active') and on an automated schedule (not manual-trigger-only). This is useful for canary prerelease workflows where you need connections that will run organically during the monitoring window.

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.

pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 1000 Maximum number of results (default: 1000)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).
enabled_schedules_only boolean no false If True, only return connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 1000,
      "description": "Maximum number of results (default: 1000)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
      "type": "boolean"
    },
    "enabled_schedules_only": {
      "default": false,
      "description": "If True, only return connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connections_by_stream

Hints: read-only · idempotent · open-world

Find connections that have a specific stream enabled in their catalog.

This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use cases:

  • Finding connections with a specific stream enabled for regression testing
  • Validating connector fixes that affect particular streams
  • Identifying which customers use rarely-enabled streams

Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.

Parameters:

Name Type Required Default Description
stream_name string yes Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "stream_name": {
      "description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
      "type": "string"
    },
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "required": [
    "stream_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_connection_stats

Hints: read-only · idempotent · open-world

Get aggregate connection stats for multiple connectors.

Returns counts of connections grouped by pinned version for each connector, including:

  • Total, enabled, and active connection counts
  • Pinned vs unpinned breakdown
  • Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)

This tool is designed for release monitoring workflows. It allows you to:

  1. Query recently released connectors to identify which ones to monitor
  2. Get aggregate stats showing how many connections are using each version
  3. See health metrics (pass/fail) broken down by version

The lookback_days parameter controls the lookback window for:

  • Counting 'active' connections (those with recent sync activity)
  • Determining 'latest attempt status' (most recent attempt within the window)

Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.

Parameters:

Name Type Required Default Description
source_definition_ids array<string> | null no null List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']
destination_definition_ids array<string> | null no null List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']
lookback_days integer no 7 Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
    },
    "destination_definition_ids": {
      "anyOf": [
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Response containing connection stats for multiple connectors.",
  "properties": {
    "sources": {
      "description": "Stats for source connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "destinations": {
      "description": "Stats for destination connectors",
      "items": {
        "description": "Aggregate connection stats for a connector.",
        "properties": {
          "connector_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "connector_type": {
            "description": "'source' or 'destination'",
            "type": "string"
          },
          "canonical_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The canonical connector name if resolved"
          },
          "total_connections": {
            "description": "Total number of non-deprecated connections",
            "type": "integer"
          },
          "enabled_connections": {
            "description": "Number of enabled (active status) connections",
            "type": "integer"
          },
          "active_connections": {
            "description": "Number of connections with recent sync activity",
            "type": "integer"
          },
          "pinned_connections": {
            "description": "Number of connections with explicit version pins",
            "type": "integer"
          },
          "unpinned_connections": {
            "description": "Number of connections on default version",
            "type": "integer"
          },
          "latest_attempt": {
            "description": "Overall breakdown by latest attempt status",
            "properties": {
              "succeeded": {
                "default": 0,
                "description": "Connections where latest attempt succeeded",
                "type": "integer"
              },
              "failed": {
                "default": 0,
                "description": "Connections where latest attempt failed",
                "type": "integer"
              },
              "cancelled": {
                "default": 0,
                "description": "Connections where latest attempt was cancelled",
                "type": "integer"
              },
              "running": {
                "default": 0,
                "description": "Connections where latest attempt is still running",
                "type": "integer"
              },
              "unknown": {
                "default": 0,
                "description": "Connections with no recent attempts in the lookback window",
                "type": "integer"
              }
            },
            "type": "object"
          },
          "by_version": {
            "description": "Stats broken down by pinned version",
            "items": {
              "description": "Stats for connections pinned to a specific version.",
              "properties": {
                "pinned_version_id": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "description": "The connector version UUID (None for unpinned connections)"
                },
                "docker_image_tag": {
                  "anyOf": [
                    {
                      "type": "string"
                    },
                    {
                      "type": "null"
                    }
                  ],
                  "default": null,
                  "description": "The docker image tag for this version"
                },
                "total_connections": {
                  "description": "Total number of connections",
                  "type": "integer"
                },
                "enabled_connections": {
                  "description": "Number of enabled (active status) connections",
                  "type": "integer"
                },
                "active_connections": {
                  "description": "Number of connections with recent sync activity",
                  "type": "integer"
                },
                "latest_attempt": {
                  "description": "Breakdown by latest attempt status",
                  "properties": {
                    "succeeded": {
                      "default": 0,
                      "description": "Connections where latest attempt succeeded",
                      "type": "integer"
                    },
                    "failed": {
                      "default": 0,
                      "description": "Connections where latest attempt failed",
                      "type": "integer"
                    },
                    "cancelled": {
                      "default": 0,
                      "description": "Connections where latest attempt was cancelled",
                      "type": "integer"
                    },
                    "running": {
                      "default": 0,
                      "description": "Connections where latest attempt is still running",
                      "type": "integer"
                    },
                    "unknown": {
                      "default": 0,
                      "description": "Connections with no recent attempts in the lookback window",
                      "type": "integer"
                    }
                  },
                  "type": "object"
                }
              },
              "required": [
                "pinned_version_id",
                "total_connections",
                "enabled_connections",
                "active_connections",
                "latest_attempt"
              ],
              "type": "object"
            },
            "type": "array"
          }
        },
        "required": [
          "connector_definition_id",
          "connector_type",
          "total_connections",
          "enabled_connections",
          "active_connections",
          "pinned_connections",
          "unpinned_connections",
          "latest_attempt",
          "by_version"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "lookback_days": {
      "description": "Lookback window used for 'active' connections",
      "type": "integer"
    },
    "generated_at": {
      "description": "When this response was generated",
      "format": "date-time",
      "type": "string"
    }
  },
  "required": [
    "lookback_days",
    "generated_at"
  ],
  "type": "object"
}

query_prod_connector_rollouts

Hints: read-only · idempotent

Query connector rollouts with flexible filtering.

Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.

Filter behavior:

  • rollout_id: Returns that specific rollout (ignores other filters)
  • active_only: Returns only active (non-terminal) rollouts
  • actor_definition_id: Returns rollouts for that specific connector
  • No filters: Returns all active rollouts (same as active_only=True)

Parameters:

Name Type Required Default Description
actor_definition_id string | null no null Connector definition UUID to filter by (optional)
rollout_id string | null no null Specific rollout UUID to look up (optional)
active_only boolean no false If true, only return active (non-terminal) rollouts
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "actor_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector definition UUID to filter by (optional)"
    },
    "rollout_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Specific rollout UUID to look up (optional)"
    },
    "active_only": {
      "default": false,
      "description": "If true, only return active (non-terminal) rollouts",
      "type": "boolean"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a connector rollout.",
        "properties": {
          "rollout_id": {
            "description": "The rollout UUID",
            "type": "string"
          },
          "actor_definition_id": {
            "description": "The connector definition UUID",
            "type": "string"
          },
          "state": {
            "description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
            "type": "string"
          },
          "initial_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Initial rollout percentage"
          },
          "current_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Current target rollout percentage"
          },
          "final_target_rollout_pct": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Final target rollout percentage"
          },
          "has_breaking_changes": {
            "description": "Whether the RC has breaking changes",
            "type": "boolean"
          },
          "max_step_wait_time_mins": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Maximum wait time between rollout steps in minutes"
          },
          "rollout_strategy": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Rollout strategy: manual, automated, overridden"
          },
          "updated_by_user_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "User ID recorded as last updating the rollout"
          },
          "updated_by_user_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Name recorded as last updating the rollout"
          },
          "updated_by_user_email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Email recorded as last updating the rollout"
          },
          "workflow_run_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Temporal workflow run ID"
          },
          "error_msg": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Error message if errored"
          },
          "failed_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for failure if failed"
          },
          "paused_reason": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Reason for pause if paused"
          },
          "tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Optional tag for the rollout"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was created"
          },
          "updated_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout was last updated"
          },
          "completed_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout completed (if terminal)"
          },
          "expires_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the rollout expires"
          },
          "rc_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the release candidate"
          },
          "rc_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the release candidate"
          },
          "initial_docker_image_tag": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker image tag of the initial version"
          },
          "initial_docker_repository": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Docker repository of the initial version"
          },
          "filters": {
            "anyOf": [
              {
                "additionalProperties": true,
                "type": "object"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
          }
        },
        "required": [
          "rollout_id",
          "actor_definition_id",
          "state",
          "has_breaking_changes"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_connector_versions

Hints: read-only · idempotent

List all versions for a connector definition.

Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.

Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date

Parameters:

Name Type Required Default Description
connector_definition_id string yes Connector definition UUID to list versions for

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_definition_id": {
      "description": "Connector definition UUID to list versions for",
      "type": "string"
    }
  },
  "required": [
    "connector_definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_dataplanes

Hints: read-only · idempotent

List all dataplane groups with workspace counts.

Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).

Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count

Parameters:

_No parameters._

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {},
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_failed_sync_attempts_for_connector

Hints: read-only · idempotent · open-world

List failed sync attempts for ALL actors using a source connector type.

This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.

Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.

Key fields in results:

  • failure_summary: JSON containing failure details including failureType and messages
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_new_connector_releases

Hints: read-only · idempotent

List recently published connector versions.

Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).

Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at

Parameters:

Name Type Required Default Description
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_organizations

Hints: read-only · idempotent

Search organizations by name or email substring.

Performs a case-insensitive substring match on organization name and email. Use the returned organization_id values with other tools like query_prod_connections_by_connector or lookup_customer_tiers.

Parameters:

Name Type Required Default Description
name_contains string yes Case-insensitive substring to search for in organization name or email. For example, 'acme' will match organizations named 'Acme Corp' or with email 'admin@acme.io'.
limit integer no 20 Maximum number of organizations to return (default: 20)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "name_contains": {
      "description": "Case-insensitive substring to search for in organization name or email. For example, 'acme' will match organizations named 'Acme Corp' or with email 'admin@acme.io'.",
      "type": "string"
    },
    "limit": {
      "default": 20,
      "description": "Maximum number of organizations to return (default: 20)",
      "type": "integer"
    }
  },
  "required": [
    "name_contains"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of searching organizations by name substring.",
  "properties": {
    "name_contains": {
      "description": "The search substring that was used",
      "type": "string"
    },
    "total_found": {
      "description": "Total number of organizations matching",
      "type": "integer"
    },
    "organizations": {
      "description": "List of matching organizations",
      "items": {
        "description": "A single organization returned by a name/email search.",
        "properties": {
          "organization_id": {
            "description": "The organization UUID",
            "type": "string"
          },
          "organization_name": {
            "description": "The name of the organization",
            "type": "string"
          },
          "email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The email address associated with the organization"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
          }
        },
        "required": [
          "organization_id",
          "organization_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "name_contains",
    "total_found",
    "organizations"
  ],
  "type": "object"
}

query_prod_recent_syncs_for_connector

Hints: read-only · idempotent · open-world

List recent sync jobs for ALL actors using a connector type.

This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.

Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.

Use this tool to:

  • Find healthy connections with recent successful syncs (status_filter='succeeded')
  • Investigate connector issues across all users (status_filter='failed')
  • Get an overview of all recent sync activity (status_filter='all')

Set exclude_pinned=True to filter out syncs for actors that are already pinned to a specific version. This is useful for 'prove fix' live connection testing workflows where you want to find unpinned connections to test against.

Set enabled_schedules_only=True to restrict results to connections that are both enabled (status='active') and on an automated schedule (not manual-trigger-only). This is useful for canary prerelease workflows where you need connections that will run organically during the monitoring window.

Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.

Key fields in results:

  • job_status: 'succeeded', 'failed', 'cancelled', etc.
  • connection_id, connection_name: The connection that ran the sync
  • actor_id, actor_name: The source or destination actor
  • customer_tier: TIER_0, TIER_1, or TIER_2
  • is_eu: Whether the workspace is in the EU region
  • pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
  • pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)

Parameters:

Name Type Required Default Description
source_definition_id string | null no null Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics.
source_canonical_name string | null no null Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'.
destination_definition_id string | null no null Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB.
destination_canonical_name string | null no null Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'.
status_filter enum("all", "succeeded", "failed") no "all" Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.
organization_id string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null no null Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org.
lookback_days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
customer_tier_filter enum("TIER_0", "TIER_1", "TIER_2", "ALL") no "TIER_2" Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.
exclude_pinned boolean no false If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).
enabled_schedules_only boolean no false If True, only return syncs for connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
    },
    "source_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
    },
    "destination_definition_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
    },
    "destination_canonical_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
    },
    "status_filter": {
      "description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
      "enum": [
        "all",
        "succeeded",
        "failed"
      ],
      "type": "string",
      "default": "all"
    },
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "664c690e-5263-49ba-b01f-4a6759b3330a"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
    },
    "lookback_days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "customer_tier_filter": {
      "default": "TIER_2",
      "description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
      "enum": [
        "TIER_0",
        "TIER_1",
        "TIER_2",
        "ALL"
      ],
      "type": "string"
    },
    "exclude_pinned": {
      "default": false,
      "description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
      "type": "boolean"
    },
    "enabled_schedules_only": {
      "default": false,
      "description": "If True, only return syncs for connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections).",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_recent_syncs_for_connector_version

Hints: read-only · idempotent

List sync jobs that were run with a specific connector version.

Works for both source and destination connectors. Automatically detects the connector type from the version metadata and uses the appropriate query variant.

Accepts either connector_version_id (UUID) or connector_name + connector_version (e.g. source-pokeapi + 0.3.59). When using name + version, the docker_repository is derived from the canonical name (e.g. source-pokeapiairbyte/source-pokeapi).

Filters on the version stamped into jobs.config at job-creation time, not the current pin state. This avoids false positives (pre-pin syncs counted as RC) and false negatives (post-unpin syncs missed).

Pin columns (pin_origin_type, pin_origin, pin_scope_type) are still included as informational output but are not used for filtering.

Returns list of dicts with keys: job_id, connection_id, job_status, started_at, job_updated_at, connection_name, actor_id, actor_name, actor_definition_id, source_definition_version_id, destination_definition_version_id, pin_origin_type, pin_origin, pin_scope_type, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name.

Parameters:

Name Type Required Default Description
connector_version_id string | null no null Connector version UUID. Provide this OR connector_name + connector_version.
connector_name string | null no null Canonical connector name (e.g. source-pokeapi, destination-duckdb). Used with connector_version to resolve the version UUID.
connector_version string | null no null Semver version tag (e.g. 0.3.59). Used with connector_name to resolve the version UUID.
days integer no 7 Number of days to look back (default: 7)
limit integer no 100 Maximum number of results (default: 100)
successful_only boolean no false If True, only return successful syncs (default: False)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connector_version_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Connector version UUID. Provide this OR connector_name + connector_version."
    },
    "connector_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Canonical connector name (e.g. `source-pokeapi`, `destination-duckdb`). Used with `connector_version` to resolve the version UUID."
    },
    "connector_version": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Semver version tag (e.g. `0.3.59`). Used with `connector_name` to resolve the version UUID."
    },
    "days": {
      "default": 7,
      "description": "Number of days to look back (default: 7)",
      "type": "integer"
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of results (default: 100)",
      "type": "integer"
    },
    "successful_only": {
      "default": false,
      "description": "If `True`, only return successful syncs (default: `False`)",
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspace_info

Hints: read-only · idempotent

Get workspace information including dataplane group.

Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.

Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.

Parameters:

Name Type Required Default Description
workspace_id string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") yes Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
          "enum": [
            "266ebdfe-0d7b-4540-9817-de7e4505ba61"
          ],
          "type": "string"
        }
      ],
      "description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
    }
  },
  "required": [
    "workspace_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

query_prod_workspaces

Hints: read-only · idempotent

Search workspaces by name substring or email domain.

At least one of name_contains or email_domain must be provided. When name_contains is given, performs a case-insensitive substring match on workspace name and slug. When email_domain is given, matches workspaces by user email domain.

The returned organization IDs can be used with other tools like query_prod_connections_by_connector to find connections within those organizations for safe testing.

Parameters:

Name Type Required Default Description
name_contains string | null no null Case-insensitive substring to search for in workspace name or slug. For example, 'acme' will match workspaces named 'Acme Staging'.
email_domain string | null no null Email domain to search for (e.g., 'motherduck.com'). Do not include the '@' symbol.
limit integer no 100 Maximum number of workspaces to return (default: 100)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Case-insensitive substring to search for in workspace name or slug. For example, 'acme' will match workspaces named 'Acme Staging'."
    },
    "email_domain": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Email domain to search for (e.g., 'motherduck.com'). Do not include the '@' symbol."
    },
    "limit": {
      "default": 100,
      "description": "Maximum number of workspaces to return (default: 100)",
      "type": "integer"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of searching workspaces by name or email domain.",
  "properties": {
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name substring that was searched for"
    },
    "email_domain": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The email domain that was searched for (e.g., 'motherduck.com')"
    },
    "total_workspaces_found": {
      "description": "Total number of workspaces matching",
      "type": "integer"
    },
    "unique_organization_ids": {
      "description": "List of unique organization IDs found",
      "items": {
        "type": "string"
      },
      "type": "array"
    },
    "workspaces": {
      "description": "List of matching workspaces",
      "items": {
        "description": "Information about a workspace.",
        "properties": {
          "organization_id": {
            "description": "The organization UUID",
            "type": "string"
          },
          "workspace_id": {
            "description": "The workspace UUID",
            "type": "string"
          },
          "workspace_name": {
            "description": "The name of the workspace",
            "type": "string"
          },
          "slug": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The workspace slug (URL-friendly identifier)"
          },
          "email": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The email address associated with the workspace"
          },
          "dataplane_group_id": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The dataplane group UUID (region)"
          },
          "dataplane_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "The name of the dataplane (e.g., 'US', 'EU')"
          },
          "created_at": {
            "anyOf": [
              {
                "format": "date-time",
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "When the workspace was created"
          },
          "customer_tier": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
          },
          "is_eu": {
            "anyOf": [
              {
                "type": "boolean"
              },
              {
                "type": "null"
              }
            ],
            "default": null,
            "description": "Whether the workspace is in the EU region (derived from dataplane_name)."
          }
        },
        "required": [
          "organization_id",
          "workspace_id",
          "workspace_name"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "total_workspaces_found",
    "unique_organization_ids",
    "workspaces"
  ],
  "type": "object"
}

   1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
   2"""MCP tools for querying the Airbyte Cloud Prod DB Replica.
   3
   4This module provides MCP tools that wrap the query functions from
   5airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
   6
   7## MCP reference
   8
   9.. include:: ../../../docs/mcp-generated/prod_db_queries.md
  10    :start-line: 2
  11"""
  12
  13from __future__ import annotations
  14
  15__all__: list[str] = []
  16
  17import json
  18from datetime import datetime, timezone
  19from enum import StrEnum
  20from typing import Annotated, Any
  21
  22from airbyte.exceptions import PyAirbyteInputError
  23from fastmcp import FastMCP
  24from fastmcp_extensions import mcp_tool, register_mcp_tools
  25from pydantic import BaseModel, Field
  26
  27from airbyte_ops_mcp.cloud_admin.registry_lookup import (
  28    resolve_canonical_name_to_definition_id,
  29)
  30from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum
  31from airbyte_ops_mcp.prod_db_access.queries import (
  32    is_source_connector,
  33    query_actors_pinned_to_version,
  34    query_connection_sync_activity_from_prod,
  35    query_connections_by_connector,
  36    query_connections_by_destination_connector,
  37    query_connections_by_stream,
  38    query_connector_rollouts,
  39    query_connector_versions,
  40    query_dataplanes_list,
  41    query_destination_connection_stats,
  42    query_failed_sync_attempts_for_connector,
  43    query_new_connector_releases,
  44    query_recent_syncs_for_connector,
  45    query_source_connection_stats,
  46    query_syncs_for_connector_version,
  47    query_versions_with_pins,
  48    query_workspace_info,
  49    query_workspaces_by_email_domain,
  50    resolve_version_id_by_tag,
  51    resolve_version_info,
  52    search_organizations,
  53    search_workspaces,
  54)
  55from airbyte_ops_mcp.tier_cache import (
  56    TierFilter,
  57    enrich_rows_by_org,
  58    filter_rows_by_tier,
  59    get_org_tiers,
  60)
  61
  62
  63class StatusFilter(StrEnum):
  64    """Filter for job status in sync queries."""
  65
  66    ALL = "all"
  67    SUCCEEDED = "succeeded"
  68    FAILED = "failed"
  69
  70
  71# Cloud UI base URL for building connection URLs
  72CLOUD_UI_BASE_URL = "https://cloud.airbyte.com"
  73
  74
  75def _validate_sync_activity_scope(
  76    *,
  77    organization_id: str | None,
  78    workspace_id: str | None,
  79    connection_ids: list[str] | None,
  80) -> None:
  81    """Require at least one explicit scope filter for sync activity queries."""
  82    if organization_id or workspace_id or connection_ids:
  83        return
  84    raise PyAirbyteInputError(
  85        message=(
  86            "Provide at least one scope filter: `organization_id`, `workspace_id`, "
  87            "or `connection_ids`."
  88        ),
  89        context={
  90            "organization_id": organization_id,
  91            "workspace_id": workspace_id,
  92            "connection_ids": connection_ids,
  93        },
  94    )
  95
  96
  97def _validate_sync_activity_window(
  98    *,
  99    start_at: datetime,
 100    end_at: datetime,
 101) -> tuple[datetime, datetime]:
 102    """Validate that `start_at` and `end_at` describe a usable window.
 103
 104    Returns the timestamps normalized to UTC. Raises `PyAirbyteInputError` for
 105    naive timestamps or inverted ranges. No clock-relative caps are enforced
 106    here; the caller is trusted to choose a sensible window.
 107    """
 108    if start_at.tzinfo is None or end_at.tzinfo is None:
 109        raise PyAirbyteInputError(
 110            message="`start_at` and `end_at` must include timezone information.",
 111            context={
 112                "start_at": start_at.isoformat(),
 113                "end_at": end_at.isoformat(),
 114            },
 115        )
 116
 117    normalized_start = start_at.astimezone(timezone.utc)
 118    normalized_end = end_at.astimezone(timezone.utc)
 119
 120    if normalized_start >= normalized_end:
 121        raise PyAirbyteInputError(
 122            message="`start_at` must be earlier than `end_at`.",
 123            context={
 124                "start_at": normalized_start.isoformat(),
 125                "end_at": normalized_end.isoformat(),
 126            },
 127        )
 128    return normalized_start, normalized_end
 129
 130
 131# =============================================================================
 132# Pydantic Models for MCP Tool Responses
 133# =============================================================================
 134
 135
 136class OrganizationSearchHit(BaseModel):
 137    """A single organization returned by a name/email search."""
 138
 139    organization_id: str = Field(description="The organization UUID")
 140    organization_name: str = Field(description="The name of the organization")
 141    email: str | None = Field(
 142        default=None, description="The email address associated with the organization"
 143    )
 144    customer_tier: str | None = Field(
 145        default=None,
 146        description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.",
 147    )
 148
 149
 150class OrganizationSearchResult(BaseModel):
 151    """Result of searching organizations by name substring."""
 152
 153    name_contains: str = Field(description="The search substring that was used")
 154    total_found: int = Field(description="Total number of organizations matching")
 155    organizations: list[OrganizationSearchHit] = Field(
 156        description="List of matching organizations"
 157    )
 158
 159
 160class WorkspaceInfo(BaseModel):
 161    """Information about a workspace."""
 162
 163    organization_id: str = Field(description="The organization UUID")
 164    workspace_id: str = Field(description="The workspace UUID")
 165    workspace_name: str = Field(description="The name of the workspace")
 166    slug: str | None = Field(
 167        default=None, description="The workspace slug (URL-friendly identifier)"
 168    )
 169    email: str | None = Field(
 170        default=None, description="The email address associated with the workspace"
 171    )
 172    dataplane_group_id: str | None = Field(
 173        default=None, description="The dataplane group UUID (region)"
 174    )
 175    dataplane_name: str | None = Field(
 176        default=None, description="The name of the dataplane (e.g., 'US', 'EU')"
 177    )
 178    created_at: datetime | None = Field(
 179        default=None, description="When the workspace was created"
 180    )
 181    customer_tier: str | None = Field(
 182        default=None,
 183        description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.",
 184    )
 185    is_eu: bool | None = Field(
 186        default=None,
 187        description="Whether the workspace is in the EU region (derived from dataplane_name).",
 188    )
 189
 190
 191class WorkspaceSearchResult(BaseModel):
 192    """Result of searching workspaces by name or email domain."""
 193
 194    name_contains: str | None = Field(
 195        default=None, description="The name substring that was searched for"
 196    )
 197    email_domain: str | None = Field(
 198        default=None,
 199        description="The email domain that was searched for (e.g., 'motherduck.com')",
 200    )
 201    total_workspaces_found: int = Field(
 202        description="Total number of workspaces matching"
 203    )
 204    unique_organization_ids: list[str] = Field(
 205        description="List of unique organization IDs found"
 206    )
 207    workspaces: list[WorkspaceInfo] = Field(description="List of matching workspaces")
 208
 209
 210# Keep backward-compatible alias for any external references
 211WorkspacesByEmailDomainResult = WorkspaceSearchResult
 212
 213
 214class LatestAttemptBreakdown(BaseModel):
 215    """Breakdown of connections by latest attempt status."""
 216
 217    succeeded: int = Field(
 218        default=0, description="Connections where latest attempt succeeded"
 219    )
 220    failed: int = Field(
 221        default=0, description="Connections where latest attempt failed"
 222    )
 223    cancelled: int = Field(
 224        default=0, description="Connections where latest attempt was cancelled"
 225    )
 226    running: int = Field(
 227        default=0, description="Connections where latest attempt is still running"
 228    )
 229    unknown: int = Field(
 230        default=0,
 231        description="Connections with no recent attempts in the lookback window",
 232    )
 233
 234
 235class VersionPinStats(BaseModel):
 236    """Stats for connections pinned to a specific version."""
 237
 238    pinned_version_id: str | None = Field(
 239        description="The connector version UUID (None for unpinned connections)"
 240    )
 241    docker_image_tag: str | None = Field(
 242        default=None, description="The docker image tag for this version"
 243    )
 244    total_connections: int = Field(description="Total number of connections")
 245    enabled_connections: int = Field(
 246        description="Number of enabled (active status) connections"
 247    )
 248    active_connections: int = Field(
 249        description="Number of connections with recent sync activity"
 250    )
 251    latest_attempt: LatestAttemptBreakdown = Field(
 252        description="Breakdown by latest attempt status"
 253    )
 254
 255
 256class ConnectorConnectionStats(BaseModel):
 257    """Aggregate connection stats for a connector."""
 258
 259    connector_definition_id: str = Field(description="The connector definition UUID")
 260    connector_type: str = Field(description="'source' or 'destination'")
 261    canonical_name: str | None = Field(
 262        default=None, description="The canonical connector name if resolved"
 263    )
 264    total_connections: int = Field(
 265        description="Total number of non-deprecated connections"
 266    )
 267    enabled_connections: int = Field(
 268        description="Number of enabled (active status) connections"
 269    )
 270    active_connections: int = Field(
 271        description="Number of connections with recent sync activity"
 272    )
 273    pinned_connections: int = Field(
 274        description="Number of connections with explicit version pins"
 275    )
 276    unpinned_connections: int = Field(
 277        description="Number of connections on default version"
 278    )
 279    latest_attempt: LatestAttemptBreakdown = Field(
 280        description="Overall breakdown by latest attempt status"
 281    )
 282    by_version: list[VersionPinStats] = Field(
 283        description="Stats broken down by pinned version"
 284    )
 285
 286
 287class ConnectorConnectionStatsResponse(BaseModel):
 288    """Response containing connection stats for multiple connectors."""
 289
 290    sources: list[ConnectorConnectionStats] = Field(
 291        default_factory=list, description="Stats for source connectors"
 292    )
 293    destinations: list[ConnectorConnectionStats] = Field(
 294        default_factory=list, description="Stats for destination connectors"
 295    )
 296    lookback_days: int = Field(
 297        description="Lookback window used for 'active' connections"
 298    )
 299    generated_at: datetime = Field(description="When this response was generated")
 300
 301
 302def _opt_str(value: Any) -> str | None:
 303    """Convert a nullable value to str, returning None if the value is None/falsy."""
 304    return str(value) if value else None
 305
 306
 307@mcp_tool(
 308    read_only=True,
 309    idempotent=True,
 310)
 311def query_prod_dataplanes() -> list[dict[str, Any]]:
 312    """List all dataplane groups with workspace counts.
 313
 314    Returns information about all active dataplane groups in Airbyte Cloud,
 315    including the number of workspaces in each. Useful for understanding
 316    the distribution of workspaces across regions (US, US-Central, EU).
 317
 318    Returns list of dicts with keys: dataplane_group_id, dataplane_name,
 319    organization_id, enabled, tombstone, created_at, workspace_count
 320    """
 321    return query_dataplanes_list()
 322
 323
 324@mcp_tool(
 325    read_only=True,
 326    idempotent=True,
 327)
 328def query_prod_workspace_info(
 329    workspace_id: Annotated[
 330        str | WorkspaceAliasEnum,
 331        Field(
 332            description="Workspace UUID or alias to look up. "
 333            "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
 334        ),
 335    ],
 336) -> dict[str, Any] | None:
 337    """Get workspace information including dataplane group.
 338
 339    Returns details about a specific workspace, including which dataplane
 340    (region) it belongs to. Useful for determining if a workspace is in
 341    the EU region for filtering purposes.
 342
 343    Returns dict with keys: workspace_id, workspace_name, slug, organization_id,
 344    dataplane_group_id, dataplane_name, created_at, tombstone
 345    Or None if workspace not found.
 346    """
 347    # Resolve workspace ID alias (workspace_id is required, so resolved value is never None)
 348    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
 349    assert resolved_workspace_id is not None  # Type narrowing: workspace_id is required
 350
 351    return query_workspace_info(resolved_workspace_id)
 352
 353
 354@mcp_tool(
 355    read_only=True,
 356    idempotent=True,
 357)
 358def query_prod_connector_versions(
 359    connector_definition_id: Annotated[
 360        str,
 361        Field(description="Connector definition UUID to list versions for"),
 362    ],
 363) -> list[dict[str, Any]]:
 364    """List all versions for a connector definition.
 365
 366    Returns all published versions of a connector, ordered by last_published
 367    date descending. Useful for understanding version history and finding
 368    specific version IDs for pinning or rollout monitoring.
 369
 370    Returns list of dicts with keys: version_id, docker_image_tag, docker_repository,
 371    release_stage, support_level, cdk_version, language, last_published, release_date
 372    """
 373    return query_connector_versions(connector_definition_id)
 374
 375
 376@mcp_tool(
 377    read_only=True,
 378    idempotent=True,
 379)
 380def query_prod_new_connector_releases(
 381    days: Annotated[
 382        int,
 383        Field(description="Number of days to look back (default: 7)", default=7),
 384    ] = 7,
 385    limit: Annotated[
 386        int,
 387        Field(description="Maximum number of results (default: 100)", default=100),
 388    ] = 100,
 389) -> list[dict[str, Any]]:
 390    """List recently published connector versions.
 391
 392    Returns connector versions published within the specified number of days.
 393    Uses last_published timestamp which reflects when the version was actually
 394    deployed to the registry (not the changelog date).
 395
 396    Returns list of dicts with keys: version_id, connector_definition_id, docker_repository,
 397    docker_image_tag, last_published, release_date, release_stage, support_level,
 398    cdk_version, language, created_at
 399    """
 400    return query_new_connector_releases(days=days, limit=limit)
 401
 402
 403@mcp_tool(
 404    read_only=True,
 405    idempotent=True,
 406)
 407def query_prod_actors_by_pinned_connector_version(
 408    connector_version_id: Annotated[
 409        str,
 410        Field(description="Connector version UUID to find pinned instances for"),
 411    ],
 412) -> list[dict[str, Any]]:
 413    """List actors (sources/destinations) effectively pinned to a specific connector version.
 414
 415    Returns all actors that are effectively pinned to a specific connector version,
 416    considering all scope levels: actor-level pins, workspace-level pins, and
 417    organization-level pins (with actor > workspace > organization precedence).
 418    Useful for monitoring rollouts and understanding which customers are affected.
 419
 420    The actor_id field is the actor ID (superset of source_id/destination_id).
 421
 422    Returns list of dicts with keys: actor_id, connector_definition_id, origin_type,
 423    origin, description, created_at, expires_at, pin_scope_type, actor_name,
 424    workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
 425
 426    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
 427    level the effective pin came from.
 428    """
 429    return query_actors_pinned_to_version(connector_version_id)
 430
 431
 432@mcp_tool(
 433    read_only=True,
 434    idempotent=True,
 435)
 436def query_prod_recent_syncs_for_connector_version(
 437    connector_version_id: Annotated[
 438        str | None,
 439        Field(
 440            description=(
 441                "Connector version UUID. Provide this OR "
 442                "connector_name + connector_version."
 443            ),
 444            default=None,
 445        ),
 446    ] = None,
 447    connector_name: Annotated[
 448        str | None,
 449        Field(
 450            description=(
 451                "Canonical connector name (e.g. `source-pokeapi`, "
 452                "`destination-duckdb`). Used with `connector_version` to "
 453                "resolve the version UUID."
 454            ),
 455            default=None,
 456        ),
 457    ] = None,
 458    connector_version: Annotated[
 459        str | None,
 460        Field(
 461            description=(
 462                "Semver version tag (e.g. `0.3.59`). "
 463                "Used with `connector_name` to resolve the version UUID."
 464            ),
 465            default=None,
 466        ),
 467    ] = None,
 468    days: Annotated[
 469        int,
 470        Field(description="Number of days to look back (default: 7)", default=7),
 471    ] = 7,
 472    limit: Annotated[
 473        int,
 474        Field(description="Maximum number of results (default: 100)", default=100),
 475    ] = 100,
 476    successful_only: Annotated[
 477        bool,
 478        Field(
 479            description="If `True`, only return successful syncs (default: `False`)",
 480            default=False,
 481        ),
 482    ] = False,
 483) -> list[dict[str, Any]]:
 484    """List sync jobs that were run with a specific connector version.
 485
 486    Works for both source and destination connectors. Automatically detects
 487    the connector type from the version metadata and uses the appropriate
 488    query variant.
 489
 490    Accepts either `connector_version_id` (UUID) or `connector_name` +
 491    `connector_version` (e.g. `source-pokeapi` + `0.3.59`). When using
 492    name + version, the `docker_repository` is derived from the canonical
 493    name (e.g. `source-pokeapi` → `airbyte/source-pokeapi`).
 494
 495    Filters on the version stamped into `jobs.config` at job-creation time,
 496    not the current pin state. This avoids false positives (pre-pin syncs
 497    counted as RC) and false negatives (post-unpin syncs missed).
 498
 499    Pin columns (`pin_origin_type`, `pin_origin`, `pin_scope_type`) are
 500    still included as informational output but are not used for filtering.
 501
 502    Returns list of dicts with keys: `job_id`, `connection_id`, `job_status`,
 503    `started_at`, `job_updated_at`, `connection_name`, `actor_id`, `actor_name`,
 504    `actor_definition_id`, `source_definition_version_id`,
 505    `destination_definition_version_id`, `pin_origin_type`,
 506    `pin_origin`, `pin_scope_type`, `workspace_id`, `workspace_name`,
 507    `organization_id`, `dataplane_group_id`, `dataplane_name`.
 508    """
 509    # Resolve inputs to a version UUID and connector type.
 510    if connector_version_id is not None:
 511        version_info = resolve_version_info(connector_version_id)
 512        docker_repository = version_info["docker_repository"]
 513    elif connector_name is not None and connector_version is not None:
 514        # Derive docker_repository from canonical name.
 515        docker_repository = f"airbyte/{connector_name}"
 516        version_info = resolve_version_id_by_tag(
 517            docker_repository=docker_repository,
 518            docker_image_tag=connector_version,
 519        )
 520        connector_version_id = version_info["version_id"]
 521    else:
 522        raise PyAirbyteInputError(
 523            message=(
 524                "Provide either `connector_version_id` or both "
 525                "`connector_name` and `connector_version`."
 526            ),
 527        )
 528
 529    is_destination = not is_source_connector(docker_repository)
 530    return query_syncs_for_connector_version(
 531        connector_version_id,
 532        is_destination=is_destination,
 533        days=days,
 534        limit=limit,
 535        successful_only=successful_only,
 536    )
 537
 538
 539@mcp_tool(
 540    read_only=True,
 541    idempotent=True,
 542    open_world=True,
 543)
 544def query_prod_recent_syncs_for_connector(
 545    source_definition_id: Annotated[
 546        str | None,
 547        Field(
 548            description=(
 549                "Source connector definition ID (UUID) to search for. "
 550                "Provide this OR source_canonical_name OR destination_definition_id "
 551                "OR destination_canonical_name (exactly one required). "
 552                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 553            ),
 554            default=None,
 555        ),
 556    ],
 557    source_canonical_name: Annotated[
 558        str | None,
 559        Field(
 560            description=(
 561                "Canonical source connector name to search for. "
 562                "Provide this OR source_definition_id OR destination_definition_id "
 563                "OR destination_canonical_name (exactly one required). "
 564                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 565            ),
 566            default=None,
 567        ),
 568    ],
 569    destination_definition_id: Annotated[
 570        str | None,
 571        Field(
 572            description=(
 573                "Destination connector definition ID (UUID) to search for. "
 574                "Provide this OR destination_canonical_name OR source_definition_id "
 575                "OR source_canonical_name (exactly one required). "
 576                "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
 577            ),
 578            default=None,
 579        ),
 580    ],
 581    destination_canonical_name: Annotated[
 582        str | None,
 583        Field(
 584            description=(
 585                "Canonical destination connector name to search for. "
 586                "Provide this OR destination_definition_id OR source_definition_id "
 587                "OR source_canonical_name (exactly one required). "
 588                "Examples: 'destination-duckdb', 'DuckDB'."
 589            ),
 590            default=None,
 591        ),
 592    ],
 593    status_filter: Annotated[
 594        StatusFilter,
 595        Field(
 596            description=(
 597                "Filter by job status: 'all' (default), 'succeeded', or 'failed'. "
 598                "Use 'succeeded' to find healthy connections with recent successful syncs. "
 599                "Use 'failed' to find connections with recent failures."
 600            ),
 601            default=StatusFilter.ALL,
 602        ),
 603    ],
 604    organization_id: Annotated[
 605        str | OrganizationAliasEnum | None,
 606        Field(
 607            description=(
 608                "Optional organization ID (UUID) or alias to filter results. "
 609                "If provided, only syncs from this organization will be returned. "
 610                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 611            ),
 612            default=None,
 613        ),
 614    ],
 615    lookback_days: Annotated[
 616        int,
 617        Field(description="Number of days to look back (default: 7)", default=7),
 618    ],
 619    limit: Annotated[
 620        int,
 621        Field(description="Maximum number of results (default: 100)", default=100),
 622    ],
 623    customer_tier_filter: Annotated[
 624        TierFilter,
 625        Field(
 626            description=(
 627                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 628                "Filters results to only include connections belonging to organizations "
 629                "in the specified tier. Use 'ALL' to include all tiers."
 630            ),
 631        ),
 632    ] = "TIER_2",
 633    *,
 634    exclude_pinned: Annotated[
 635        bool,
 636        Field(
 637            description=(
 638                "If True, exclude syncs for actors that are already pinned to a "
 639                "specific version (at any scope level: actor, workspace, or organization). "
 640                "Useful for 'prove fix' workflows where you want to find unpinned "
 641                "connections for live testing. Default: False (include all syncs)."
 642            ),
 643            default=False,
 644        ),
 645    ],
 646    enabled_schedules_only: Annotated[
 647        bool,
 648        Field(
 649            description=(
 650                "If True, only return syncs for connections that are both active "
 651                "(not paused/inactive) and on an automated sync schedule "
 652                "(not manual-trigger-only). Useful for canary workflows where "
 653                "you need connections that will produce organic syncs during a "
 654                "monitoring window. Default: False (include all connections)."
 655            ),
 656            default=False,
 657        ),
 658    ],
 659) -> list[dict[str, Any]]:
 660    """List recent sync jobs for ALL actors using a connector type.
 661
 662    This tool finds all actors with the given connector definition and returns their
 663    recent sync jobs, regardless of whether they have explicit version pins. It filters
 664    out deleted actors, deleted workspaces, and deprecated connections.
 665
 666    Results are always enriched with customer_tier and is_eu fields.
 667    The customer_tier_filter parameter is required to ensure tier-aware querying.
 668
 669    Use this tool to:
 670    - Find healthy connections with recent successful syncs (status_filter='succeeded')
 671    - Investigate connector issues across all users (status_filter='failed')
 672    - Get an overview of all recent sync activity (status_filter='all')
 673
 674    Set `exclude_pinned=True` to filter out syncs for actors that are already pinned to a
 675    specific version. This is useful for 'prove fix' live connection testing workflows
 676    where you want to find unpinned connections to test against.
 677
 678    Set `enabled_schedules_only=True` to restrict results to connections that are both
 679    enabled (status='active') and on an automated schedule (not manual-trigger-only).
 680    This is useful for canary prerelease workflows where you need connections that
 681    will run organically during the monitoring window.
 682
 683    Supports both SOURCE and DESTINATION connectors. Provide exactly one of:
 684    source_definition_id, source_canonical_name, destination_definition_id,
 685    or destination_canonical_name.
 686
 687    Key fields in results:
 688    - job_status: 'succeeded', 'failed', 'cancelled', etc.
 689    - connection_id, connection_name: The connection that ran the sync
 690    - actor_id, actor_name: The source or destination actor
 691    - customer_tier: TIER_0, TIER_1, or TIER_2
 692    - is_eu: Whether the workspace is in the EU region
 693    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 694    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 695    """
 696    # Validate that exactly one connector parameter is provided
 697    provided_params = [
 698        source_definition_id,
 699        source_canonical_name,
 700        destination_definition_id,
 701        destination_canonical_name,
 702    ]
 703    num_provided = sum(p is not None for p in provided_params)
 704    if num_provided != 1:
 705        raise PyAirbyteInputError(
 706            message=(
 707                "Exactly one of source_definition_id, source_canonical_name, "
 708                "destination_definition_id, or destination_canonical_name must be provided."
 709            ),
 710        )
 711
 712    # Determine if this is a destination connector
 713    is_destination = (
 714        destination_definition_id is not None or destination_canonical_name is not None
 715    )
 716
 717    # Resolve canonical name to definition ID if needed
 718    resolved_definition_id: str
 719    if source_canonical_name:
 720        resolved_definition_id = resolve_canonical_name_to_definition_id(
 721            canonical_name=source_canonical_name,
 722        )
 723    elif destination_canonical_name:
 724        resolved_definition_id = resolve_canonical_name_to_definition_id(
 725            canonical_name=destination_canonical_name,
 726        )
 727    elif source_definition_id:
 728        resolved_definition_id = source_definition_id
 729    else:
 730        # We've validated exactly one param is provided, so this must be set
 731        assert destination_definition_id is not None
 732        resolved_definition_id = destination_definition_id
 733
 734    # Resolve organization ID alias
 735    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 736
 737    rows = query_recent_syncs_for_connector(
 738        connector_definition_id=resolved_definition_id,
 739        is_destination=is_destination,
 740        status_filter=status_filter,
 741        organization_id=resolved_organization_id,
 742        days=lookback_days,
 743        limit=limit,
 744        exclude_pinned=exclude_pinned,
 745        enabled_schedules_only=enabled_schedules_only,
 746    )
 747
 748    enriched = enrich_rows_by_org(rows)
 749    return filter_rows_by_tier(enriched, customer_tier_filter)
 750
 751
 752@mcp_tool(
 753    read_only=True,
 754    idempotent=True,
 755    open_world=True,
 756)
 757def query_prod_failed_sync_attempts_for_connector(
 758    source_definition_id: Annotated[
 759        str | None,
 760        Field(
 761            description=(
 762                "Source connector definition ID (UUID) to search for. "
 763                "Exactly one of this or source_canonical_name is required. "
 764                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 765            ),
 766            default=None,
 767        ),
 768    ] = None,
 769    source_canonical_name: Annotated[
 770        str | None,
 771        Field(
 772            description=(
 773                "Canonical source connector name to search for. "
 774                "Exactly one of this or source_definition_id is required. "
 775                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 776            ),
 777            default=None,
 778        ),
 779    ] = None,
 780    organization_id: Annotated[
 781        str | OrganizationAliasEnum | None,
 782        Field(
 783            description=(
 784                "Optional organization ID (UUID) or alias to filter results. "
 785                "If provided, only failed attempts from this organization will be returned. "
 786                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 787            ),
 788            default=None,
 789        ),
 790    ] = None,
 791    lookback_days: Annotated[
 792        int,
 793        Field(description="Number of days to look back (default: 7)", default=7),
 794    ] = 7,
 795    limit: Annotated[
 796        int,
 797        Field(description="Maximum number of results (default: 100)", default=100),
 798    ] = 100,
 799    customer_tier_filter: Annotated[
 800        TierFilter,
 801        Field(
 802            description=(
 803                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 804                "Filters results to only include connections belonging to organizations "
 805                "in the specified tier. Use 'ALL' to include all tiers."
 806            ),
 807        ),
 808    ] = "TIER_2",
 809) -> list[dict[str, Any]]:
 810    """List failed sync attempts for ALL actors using a source connector type.
 811
 812    This tool finds all actors with the given connector definition and returns their
 813    failed sync attempts, regardless of whether they have explicit version pins.
 814
 815    Results are always enriched with customer_tier and is_eu fields.
 816    The customer_tier_filter parameter is required to ensure tier-aware querying.
 817
 818    This is useful for investigating connector issues across all users. Use this when
 819    you want to find failures for a connector type regardless of which version users
 820    are on.
 821
 822    Note: This tool only supports SOURCE connectors. For destination connectors,
 823    a separate tool would be needed.
 824
 825    Key fields in results:
 826    - failure_summary: JSON containing failure details including failureType and messages
 827    - customer_tier: TIER_0, TIER_1, or TIER_2
 828    - is_eu: Whether the workspace is in the EU region
 829    - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
 830    - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
 831    """
 832    # Validate that exactly one of the two parameters is provided
 833    if (source_definition_id is None) == (source_canonical_name is None):
 834        raise PyAirbyteInputError(
 835            message=(
 836                "Exactly one of source_definition_id or source_canonical_name "
 837                "must be provided, but not both."
 838            ),
 839        )
 840
 841    # Resolve canonical name to definition ID if needed
 842    resolved_definition_id: str
 843    if source_canonical_name:
 844        resolved_definition_id = resolve_canonical_name_to_definition_id(
 845            canonical_name=source_canonical_name,
 846        )
 847    else:
 848        resolved_definition_id = source_definition_id  # type: ignore[assignment]
 849
 850    # Resolve organization ID alias
 851    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
 852
 853    rows = query_failed_sync_attempts_for_connector(
 854        connector_definition_id=resolved_definition_id,
 855        organization_id=resolved_organization_id,
 856        days=lookback_days,
 857        limit=limit,
 858    )
 859    enriched = enrich_rows_by_org(rows)
 860    return filter_rows_by_tier(enriched, customer_tier_filter)
 861
 862
 863@mcp_tool(
 864    read_only=True,
 865    idempotent=True,
 866    open_world=True,
 867)
 868def query_prod_connections_by_connector(
 869    source_definition_id: Annotated[
 870        str | None,
 871        Field(
 872            description=(
 873                "Source connector definition ID (UUID) to search for. "
 874                "Exactly one of source_definition_id, source_canonical_name, "
 875                "destination_definition_id, or destination_canonical_name is required. "
 876                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
 877            ),
 878            default=None,
 879        ),
 880    ] = None,
 881    source_canonical_name: Annotated[
 882        str | None,
 883        Field(
 884            description=(
 885                "Canonical source connector name to search for. "
 886                "Exactly one of source_definition_id, source_canonical_name, "
 887                "destination_definition_id, or destination_canonical_name is required. "
 888                "Examples: 'source-youtube-analytics', 'YouTube Analytics'."
 889            ),
 890            default=None,
 891        ),
 892    ] = None,
 893    destination_definition_id: Annotated[
 894        str | None,
 895        Field(
 896            description=(
 897                "Destination connector definition ID (UUID) to search for. "
 898                "Exactly one of source_definition_id, source_canonical_name, "
 899                "destination_definition_id, or destination_canonical_name is required. "
 900                "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
 901            ),
 902            default=None,
 903        ),
 904    ] = None,
 905    destination_canonical_name: Annotated[
 906        str | None,
 907        Field(
 908            description=(
 909                "Canonical destination connector name to search for. "
 910                "Exactly one of source_definition_id, source_canonical_name, "
 911                "destination_definition_id, or destination_canonical_name is required. "
 912                "Examples: 'destination-duckdb', 'DuckDB'."
 913            ),
 914            default=None,
 915        ),
 916    ] = None,
 917    organization_id: Annotated[
 918        str | OrganizationAliasEnum | None,
 919        Field(
 920            description=(
 921                "Optional organization ID (UUID) or alias to filter results. "
 922                "If provided, only connections in this organization will be returned. "
 923                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
 924            ),
 925            default=None,
 926        ),
 927    ] = None,
 928    limit: Annotated[
 929        int,
 930        Field(description="Maximum number of results (default: 1000)", default=1000),
 931    ] = 1000,
 932    customer_tier_filter: Annotated[
 933        TierFilter,
 934        Field(
 935            description=(
 936                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
 937                "Filters results to only include connections belonging to organizations "
 938                "in the specified tier. Use 'ALL' to include all tiers."
 939            ),
 940        ),
 941    ] = "TIER_2",
 942    *,
 943    exclude_pinned: Annotated[
 944        bool,
 945        Field(
 946            description=(
 947                "If True, exclude connections whose connector is already pinned to a "
 948                "specific version (at any scope level: actor, workspace, or organization). "
 949                "Useful for 'prove fix' workflows where you want to find unpinned "
 950                "connections for live testing. Default: False (include all connections)."
 951            ),
 952            default=False,
 953        ),
 954    ],
 955    enabled_schedules_only: Annotated[
 956        bool,
 957        Field(
 958            description=(
 959                "If True, only return connections that are both active "
 960                "(not paused/inactive) and on an automated sync schedule "
 961                "(not manual-trigger-only). Useful for canary workflows where "
 962                "you need connections that will produce organic syncs during a "
 963                "monitoring window. Default: False (include all connections)."
 964            ),
 965            default=False,
 966        ),
 967    ],
 968) -> list[dict[str, Any]]:
 969    """Search for all connections using a specific source or destination connector type.
 970
 971    This tool queries the Airbyte Cloud Prod DB Replica directly for fast results.
 972    It finds all connections where the source or destination connector matches the
 973    specified type, regardless of how the connector is named by users.
 974
 975    Results are always enriched with customer_tier and is_eu fields.
 976    The customer_tier_filter parameter is required to ensure tier-aware querying.
 977
 978    Optionally filter by organization_id to limit results to a specific organization.
 979    Use '@airbyte-internal' as an alias for the Airbyte internal organization.
 980
 981    Set `exclude_pinned=True` to filter out connections that are already pinned to a
 982    specific version. This is useful for 'prove fix' live connection testing workflows
 983    where you want to find unpinned connections to test against.
 984
 985    Set `enabled_schedules_only=True` to restrict results to connections that are both
 986    enabled (status='active') and on an automated schedule (not manual-trigger-only).
 987    This is useful for canary prerelease workflows where you need connections that
 988    will run organically during the monitoring window.
 989
 990    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
 991    For source queries, returns: connection_id, connection_name, connection_url, source_id,
 992    source_name, source_definition_id, workspace_id, workspace_name, organization_id,
 993    dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id,
 994    pin_scope_type, customer_tier, is_eu.
 995    For destination queries, returns: connection_id, connection_name, connection_url,
 996    destination_id, destination_name, destination_definition_id, workspace_id,
 997    workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type,
 998    pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
 999
1000    pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope
1001    level the effective pin came from (NULL if not pinned).
1002    """
1003    # Validate that exactly one of the four connector parameters is provided
1004    provided_params = [
1005        source_definition_id,
1006        source_canonical_name,
1007        destination_definition_id,
1008        destination_canonical_name,
1009    ]
1010    num_provided = sum(p is not None for p in provided_params)
1011    if num_provided != 1:
1012        raise PyAirbyteInputError(
1013            message=(
1014                "Exactly one of source_definition_id, source_canonical_name, "
1015                "destination_definition_id, or destination_canonical_name must be provided."
1016            ),
1017        )
1018
1019    # Determine if this is a source or destination query and resolve the definition ID
1020    is_source_query = (
1021        source_definition_id is not None or source_canonical_name is not None
1022    )
1023    resolved_definition_id: str
1024
1025    if source_canonical_name:
1026        resolved_definition_id = resolve_canonical_name_to_definition_id(
1027            canonical_name=source_canonical_name,
1028        )
1029    elif source_definition_id:
1030        resolved_definition_id = source_definition_id
1031    elif destination_canonical_name:
1032        resolved_definition_id = resolve_canonical_name_to_definition_id(
1033            canonical_name=destination_canonical_name,
1034        )
1035    else:
1036        resolved_definition_id = destination_definition_id  # type: ignore[assignment]
1037
1038    # Resolve organization ID alias
1039    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1040
1041    # Query the database based on connector type
1042    if is_source_query:
1043        rows = [
1044            {
1045                "organization_id": str(row.get("organization_id", "")),
1046                "workspace_id": str(row["workspace_id"]),
1047                "workspace_name": row.get("workspace_name", ""),
1048                "connection_id": str(row["connection_id"]),
1049                "connection_name": row.get("connection_name", ""),
1050                "connection_url": (
1051                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
1052                    f"/connections/{row['connection_id']}/status"
1053                ),
1054                "source_id": str(row["source_id"]),
1055                "source_name": row.get("source_name", ""),
1056                "source_definition_id": str(row["source_definition_id"]),
1057                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1058                "dataplane_name": row.get("dataplane_name", ""),
1059                "pin_origin_type": row.get("pin_origin_type"),
1060                "pin_origin": row.get("pin_origin"),
1061                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
1062                "pin_scope_type": row.get("pin_scope_type"),
1063            }
1064            for row in query_connections_by_connector(
1065                connector_definition_id=resolved_definition_id,
1066                organization_id=resolved_organization_id,
1067                limit=limit,
1068                exclude_pinned=exclude_pinned,
1069                enabled_schedules_only=enabled_schedules_only,
1070            )
1071        ]
1072    else:
1073        # Destination query
1074        rows = [
1075            {
1076                "organization_id": str(row.get("organization_id", "")),
1077                "workspace_id": str(row["workspace_id"]),
1078                "workspace_name": row.get("workspace_name", ""),
1079                "connection_id": str(row["connection_id"]),
1080                "connection_name": row.get("connection_name", ""),
1081                "connection_url": (
1082                    f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
1083                    f"/connections/{row['connection_id']}/status"
1084                ),
1085                "destination_id": str(row["destination_id"]),
1086                "destination_name": row.get("destination_name", ""),
1087                "destination_definition_id": str(row["destination_definition_id"]),
1088                "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1089                "dataplane_name": row.get("dataplane_name", ""),
1090                "pin_origin_type": row.get("pin_origin_type"),
1091                "pin_origin": row.get("pin_origin"),
1092                "pinned_version_id": _opt_str(row.get("pinned_version_id")),
1093                "pin_scope_type": row.get("pin_scope_type"),
1094            }
1095            for row in query_connections_by_destination_connector(
1096                connector_definition_id=resolved_definition_id,
1097                organization_id=resolved_organization_id,
1098                limit=limit,
1099                exclude_pinned=exclude_pinned,
1100                enabled_schedules_only=enabled_schedules_only,
1101            )
1102        ]
1103
1104    enriched = enrich_rows_by_org(rows)
1105    return filter_rows_by_tier(enriched, customer_tier_filter)
1106
1107
1108@mcp_tool(
1109    read_only=True,
1110    idempotent=True,
1111    open_world=True,
1112)
1113def query_prod_connections_by_stream(
1114    stream_name: Annotated[
1115        str,
1116        Field(
1117            description=(
1118                "Name of the stream to search for in connection catalogs. "
1119                "This must match the exact stream name as configured in the connection. "
1120                "Examples: 'global_exclusions', 'campaigns', 'users'."
1121            ),
1122        ),
1123    ],
1124    source_definition_id: Annotated[
1125        str | None,
1126        Field(
1127            description=(
1128                "Source connector definition ID (UUID) to search for. "
1129                "Provide this OR source_canonical_name (exactly one required). "
1130                "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
1131            ),
1132            default=None,
1133        ),
1134    ],
1135    source_canonical_name: Annotated[
1136        str | None,
1137        Field(
1138            description=(
1139                "Canonical source connector name to search for. "
1140                "Provide this OR source_definition_id (exactly one required). "
1141                "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
1142            ),
1143            default=None,
1144        ),
1145    ],
1146    organization_id: Annotated[
1147        str | OrganizationAliasEnum | None,
1148        Field(
1149            description=(
1150                "Optional organization ID (UUID) or alias to filter results. "
1151                "If provided, only connections in this organization will be returned. "
1152                "Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
1153            ),
1154            default=None,
1155        ),
1156    ],
1157    limit: Annotated[
1158        int,
1159        Field(description="Maximum number of results (default: 100)", default=100),
1160    ],
1161    customer_tier_filter: Annotated[
1162        TierFilter,
1163        Field(
1164            description=(
1165                "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. "
1166                "Filters results to only include connections belonging to organizations "
1167                "in the specified tier. Use 'ALL' to include all tiers."
1168            ),
1169        ),
1170    ] = "TIER_2",
1171) -> list[dict[str, Any]]:
1172    """Find connections that have a specific stream enabled in their catalog.
1173
1174    This tool searches the connection's configured catalog (JSONB) for streams
1175    matching the specified name. It's particularly useful when validating
1176    connector fixes that affect specific streams - you can quickly find
1177    customer connections that use the affected stream.
1178
1179    Results are always enriched with customer_tier and is_eu fields.
1180    The customer_tier_filter parameter is required to ensure tier-aware querying.
1181
1182    Use cases:
1183    - Finding connections with a specific stream enabled for regression testing
1184    - Validating connector fixes that affect particular streams
1185    - Identifying which customers use rarely-enabled streams
1186
1187    Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
1188    """
1189    provided_params = [source_definition_id, source_canonical_name]
1190    num_provided = sum(p is not None for p in provided_params)
1191    if num_provided != 1:
1192        raise PyAirbyteInputError(
1193            message=(
1194                "Exactly one of source_definition_id or source_canonical_name "
1195                "must be provided."
1196            ),
1197        )
1198
1199    resolved_definition_id: str
1200    if source_canonical_name:
1201        resolved_definition_id = resolve_canonical_name_to_definition_id(
1202            canonical_name=source_canonical_name,
1203        )
1204    else:
1205        assert source_definition_id is not None
1206        resolved_definition_id = source_definition_id
1207
1208    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1209
1210    rows = [
1211        {
1212            "organization_id": str(row.get("organization_id", "")),
1213            "workspace_id": str(row["workspace_id"]),
1214            "workspace_name": row.get("workspace_name", ""),
1215            "connection_id": str(row["connection_id"]),
1216            "connection_name": row.get("connection_name", ""),
1217            "connection_status": row.get("connection_status", ""),
1218            "connection_url": (
1219                f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}"
1220                f"/connections/{row['connection_id']}/status"
1221            ),
1222            "source_id": str(row["source_id"]),
1223            "source_name": row.get("source_name", ""),
1224            "source_definition_id": str(row["source_definition_id"]),
1225            "dataplane_group_id": str(row.get("dataplane_group_id", "")),
1226            "dataplane_name": row.get("dataplane_name", ""),
1227        }
1228        for row in query_connections_by_stream(
1229            connector_definition_id=resolved_definition_id,
1230            stream_name=stream_name,
1231            organization_id=resolved_organization_id,
1232            limit=limit,
1233        )
1234    ]
1235    enriched = enrich_rows_by_org(rows)
1236    return filter_rows_by_tier(enriched, customer_tier_filter)
1237
1238
1239@mcp_tool(
1240    read_only=True,
1241    idempotent=True,
1242)
1243def query_prod_organizations(
1244    name_contains: Annotated[
1245        str,
1246        Field(
1247            description=(
1248                "Case-insensitive substring to search for in organization name or email. "
1249                "For example, 'acme' will match organizations named 'Acme Corp' or "
1250                "with email 'admin@acme.io'."
1251            ),
1252        ),
1253    ],
1254    limit: Annotated[
1255        int,
1256        Field(
1257            description="Maximum number of organizations to return (default: 20)",
1258            default=20,
1259        ),
1260    ] = 20,
1261) -> OrganizationSearchResult:
1262    """Search organizations by name or email substring.
1263
1264    Performs a case-insensitive substring match on organization name and email.
1265    Use the returned `organization_id` values with other tools like
1266    `query_prod_connections_by_connector` or `lookup_customer_tiers`.
1267    """
1268    rows = search_organizations(name_contains=name_contains, limit=limit)
1269
1270    orgs = [
1271        OrganizationSearchHit(
1272            organization_id=str(row["organization_id"]),
1273            organization_name=row.get("organization_name", ""),
1274            email=row.get("email"),
1275        )
1276        for row in rows
1277    ]
1278
1279    # Enrich with tier annotation
1280    org_ids = [o.organization_id for o in orgs]
1281    tier_results = {r.organization_id: r for r in get_org_tiers(org_ids)}
1282    for org in orgs:
1283        tier_result = tier_results.get(org.organization_id)
1284        if tier_result:
1285            org.customer_tier = tier_result.customer_tier
1286
1287    return OrganizationSearchResult(
1288        name_contains=name_contains,
1289        total_found=len(orgs),
1290        organizations=orgs,
1291    )
1292
1293
1294@mcp_tool(
1295    read_only=True,
1296    idempotent=True,
1297)
1298def query_prod_workspaces(
1299    name_contains: Annotated[
1300        str | None,
1301        Field(
1302            description=(
1303                "Case-insensitive substring to search for in workspace name or slug. "
1304                "For example, 'acme' will match workspaces named 'Acme Staging'."
1305            ),
1306            default=None,
1307        ),
1308    ] = None,
1309    email_domain: Annotated[
1310        str | None,
1311        Field(
1312            description=(
1313                "Email domain to search for (e.g., 'motherduck.com'). "
1314                "Do not include the '@' symbol."
1315            ),
1316            default=None,
1317        ),
1318    ] = None,
1319    limit: Annotated[
1320        int,
1321        Field(
1322            description="Maximum number of workspaces to return (default: 100)",
1323            default=100,
1324        ),
1325    ] = 100,
1326) -> WorkspaceSearchResult:
1327    """Search workspaces by name substring or email domain.
1328
1329    At least one of `name_contains` or `email_domain` must be provided.
1330    When `name_contains` is given, performs a case-insensitive substring match
1331    on workspace name and slug. When `email_domain` is given, matches
1332    workspaces by user email domain.
1333
1334    The returned organization IDs can be used with other tools like
1335    `query_prod_connections_by_connector` to find connections within
1336    those organizations for safe testing.
1337    """
1338    if not name_contains and not email_domain:
1339        raise PyAirbyteInputError(
1340            message="At least one of `name_contains` or `email_domain` must be provided.",
1341        )
1342
1343    if name_contains:
1344        rows = search_workspaces(name_contains=name_contains, limit=limit)
1345    else:
1346        assert email_domain is not None
1347        clean_domain = email_domain.lstrip("@")
1348        rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit)
1349
1350    workspaces = [
1351        WorkspaceInfo(
1352            organization_id=str(row["organization_id"]),
1353            workspace_id=str(row["workspace_id"]),
1354            workspace_name=row.get("workspace_name", ""),
1355            slug=row.get("slug"),
1356            email=row.get("email"),
1357            dataplane_group_id=_opt_str(row.get("dataplane_group_id")),
1358            dataplane_name=row.get("dataplane_name"),
1359            created_at=row.get("created_at"),
1360        )
1361        for row in rows
1362    ]
1363
1364    # Enrich with tier annotation (annotation only, no filtering)
1365    unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces))
1366    tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)}
1367    for ws in workspaces:
1368        tier_result = tier_results.get(ws.organization_id)
1369        if tier_result:
1370            ws.customer_tier = tier_result.customer_tier
1371        ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False
1372
1373    return WorkspaceSearchResult(
1374        name_contains=name_contains,
1375        email_domain=email_domain.lstrip("@") if email_domain else None,
1376        total_workspaces_found=len(workspaces),
1377        unique_organization_ids=unique_org_ids,
1378        workspaces=workspaces,
1379    )
1380
1381
1382# Backward-compatible alias
1383query_prod_workspaces_by_email_domain = query_prod_workspaces
1384
1385
1386def _build_connector_stats(
1387    connector_definition_id: str,
1388    connector_type: str,
1389    canonical_name: str | None,
1390    rows: list[dict[str, Any]],
1391    version_tags: dict[str, str | None],
1392) -> ConnectorConnectionStats:
1393    """Build ConnectorConnectionStats from query result rows."""
1394    # Aggregate totals across all version groups
1395    total_connections = 0
1396    enabled_connections = 0
1397    active_connections = 0
1398    pinned_connections = 0
1399    unpinned_connections = 0
1400    total_succeeded = 0
1401    total_failed = 0
1402    total_cancelled = 0
1403    total_running = 0
1404    total_unknown = 0
1405
1406    by_version: list[VersionPinStats] = []
1407
1408    for row in rows:
1409        version_id = row.get("pinned_version_id")
1410        row_total = int(row.get("total_connections", 0))
1411        row_enabled = int(row.get("enabled_connections", 0))
1412        row_active = int(row.get("active_connections", 0))
1413        row_pinned = int(row.get("pinned_connections", 0))
1414        row_unpinned = int(row.get("unpinned_connections", 0))
1415        row_succeeded = int(row.get("succeeded_connections", 0))
1416        row_failed = int(row.get("failed_connections", 0))
1417        row_cancelled = int(row.get("cancelled_connections", 0))
1418        row_running = int(row.get("running_connections", 0))
1419        row_unknown = int(row.get("unknown_connections", 0))
1420
1421        total_connections += row_total
1422        enabled_connections += row_enabled
1423        active_connections += row_active
1424        pinned_connections += row_pinned
1425        unpinned_connections += row_unpinned
1426        total_succeeded += row_succeeded
1427        total_failed += row_failed
1428        total_cancelled += row_cancelled
1429        total_running += row_running
1430        total_unknown += row_unknown
1431
1432        by_version.append(
1433            VersionPinStats(
1434                pinned_version_id=str(version_id) if version_id else None,
1435                docker_image_tag=version_tags.get(str(version_id))
1436                if version_id
1437                else None,
1438                total_connections=row_total,
1439                enabled_connections=row_enabled,
1440                active_connections=row_active,
1441                latest_attempt=LatestAttemptBreakdown(
1442                    succeeded=row_succeeded,
1443                    failed=row_failed,
1444                    cancelled=row_cancelled,
1445                    running=row_running,
1446                    unknown=row_unknown,
1447                ),
1448            )
1449        )
1450
1451    return ConnectorConnectionStats(
1452        connector_definition_id=connector_definition_id,
1453        connector_type=connector_type,
1454        canonical_name=canonical_name,
1455        total_connections=total_connections,
1456        enabled_connections=enabled_connections,
1457        active_connections=active_connections,
1458        pinned_connections=pinned_connections,
1459        unpinned_connections=unpinned_connections,
1460        latest_attempt=LatestAttemptBreakdown(
1461            succeeded=total_succeeded,
1462            failed=total_failed,
1463            cancelled=total_cancelled,
1464            running=total_running,
1465            unknown=total_unknown,
1466        ),
1467        by_version=by_version,
1468    )
1469
1470
1471@mcp_tool(
1472    read_only=True,
1473    idempotent=True,
1474    open_world=True,
1475)
1476def query_prod_connector_connection_stats(
1477    source_definition_ids: Annotated[
1478        list[str] | None,
1479        Field(
1480            description=(
1481                "List of source connector definition IDs (UUIDs) to get stats for. "
1482                "Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
1483            ),
1484            default=None,
1485        ),
1486    ] = None,
1487    destination_definition_ids: Annotated[
1488        list[str] | None,
1489        Field(
1490            description=(
1491                "List of destination connector definition IDs (UUIDs) to get stats for. "
1492                "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
1493            ),
1494            default=None,
1495        ),
1496    ] = None,
1497    lookback_days: Annotated[
1498        int,
1499        Field(
1500            description=(
1501                "Number of days to look back for 'active' connections (default: 7). "
1502                "Connections with sync activity within this window are counted as active."
1503            ),
1504            default=7,
1505        ),
1506    ] = 7,
1507) -> ConnectorConnectionStatsResponse:
1508    """Get aggregate connection stats for multiple connectors.
1509
1510    Returns counts of connections grouped by pinned version for each connector,
1511    including:
1512    - Total, enabled, and active connection counts
1513    - Pinned vs unpinned breakdown
1514    - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
1515
1516    This tool is designed for release monitoring workflows. It allows you to:
1517    1. Query recently released connectors to identify which ones to monitor
1518    2. Get aggregate stats showing how many connections are using each version
1519    3. See health metrics (pass/fail) broken down by version
1520
1521    The `lookback_days` parameter controls the lookback window for:
1522    - Counting 'active' connections (those with recent sync activity)
1523    - Determining 'latest attempt status' (most recent attempt within the window)
1524
1525    Connections with no sync activity in the lookback window will have
1526    'unknown' status in the latest_attempt breakdown.
1527    """
1528    # Initialize empty lists if None
1529    source_ids = source_definition_ids or []
1530    destination_ids = destination_definition_ids or []
1531
1532    if not source_ids and not destination_ids:
1533        raise PyAirbyteInputError(
1534            message=(
1535                "At least one of source_definition_ids or destination_definition_ids "
1536                "must be provided."
1537            ),
1538        )
1539
1540    sources: list[ConnectorConnectionStats] = []
1541    destinations: list[ConnectorConnectionStats] = []
1542
1543    # Process source connectors
1544    for source_def_id in source_ids:
1545        # Get version info for tag lookup
1546        versions = query_connector_versions(source_def_id)
1547        version_tags = {
1548            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1549        }
1550
1551        # Get aggregate stats
1552        rows = query_source_connection_stats(source_def_id, days=lookback_days)
1553
1554        sources.append(
1555            _build_connector_stats(
1556                connector_definition_id=source_def_id,
1557                connector_type="source",
1558                canonical_name=None,
1559                rows=rows,
1560                version_tags=version_tags,
1561            )
1562        )
1563
1564    # Process destination connectors
1565    for dest_def_id in destination_ids:
1566        # Get version info for tag lookup
1567        versions = query_connector_versions(dest_def_id)
1568        version_tags = {
1569            str(v["version_id"]): v.get("docker_image_tag") for v in versions
1570        }
1571
1572        # Get aggregate stats
1573        rows = query_destination_connection_stats(dest_def_id, days=lookback_days)
1574
1575        destinations.append(
1576            _build_connector_stats(
1577                connector_definition_id=dest_def_id,
1578                connector_type="destination",
1579                canonical_name=None,
1580                rows=rows,
1581                version_tags=version_tags,
1582            )
1583        )
1584
1585    return ConnectorConnectionStatsResponse(
1586        sources=sources,
1587        destinations=destinations,
1588        lookback_days=lookback_days,
1589        generated_at=datetime.now(timezone.utc),
1590    )
1591
1592
1593# =============================================================================
1594# Connector Rollout Models and Tools
1595# =============================================================================
1596
1597
1598class ConnectorRolloutInfo(BaseModel):
1599    """Information about a connector rollout."""
1600
1601    rollout_id: str = Field(description="The rollout UUID")
1602    actor_definition_id: str = Field(description="The connector definition UUID")
1603    state: str = Field(
1604        description="Rollout state: initialized, workflow_started, in_progress, "
1605        "paused, finalizing, succeeded, errored, failed_rolled_back, canceled"
1606    )
1607    initial_rollout_pct: int | None = Field(
1608        default=None, description="Initial rollout percentage"
1609    )
1610    current_target_rollout_pct: int | None = Field(
1611        default=None, description="Current target rollout percentage"
1612    )
1613    final_target_rollout_pct: int | None = Field(
1614        default=None, description="Final target rollout percentage"
1615    )
1616    has_breaking_changes: bool = Field(
1617        description="Whether the RC has breaking changes"
1618    )
1619    max_step_wait_time_mins: int | None = Field(
1620        default=None, description="Maximum wait time between rollout steps in minutes"
1621    )
1622    rollout_strategy: str | None = Field(
1623        default=None, description="Rollout strategy: manual, automated, overridden"
1624    )
1625    updated_by_user_id: str | None = Field(
1626        default=None,
1627        description="User ID recorded as last updating the rollout",
1628    )
1629    updated_by_user_name: str | None = Field(
1630        default=None,
1631        description="Name recorded as last updating the rollout",
1632    )
1633    updated_by_user_email: str | None = Field(
1634        default=None,
1635        description="Email recorded as last updating the rollout",
1636    )
1637    workflow_run_id: str | None = Field(
1638        default=None, description="Temporal workflow run ID"
1639    )
1640    error_msg: str | None = Field(default=None, description="Error message if errored")
1641    failed_reason: str | None = Field(
1642        default=None, description="Reason for failure if failed"
1643    )
1644    paused_reason: str | None = Field(
1645        default=None, description="Reason for pause if paused"
1646    )
1647    tag: str | None = Field(default=None, description="Optional tag for the rollout")
1648    created_at: datetime | None = Field(
1649        default=None, description="When the rollout was created"
1650    )
1651    updated_at: datetime | None = Field(
1652        default=None, description="When the rollout was last updated"
1653    )
1654    completed_at: datetime | None = Field(
1655        default=None, description="When the rollout completed (if terminal)"
1656    )
1657    expires_at: datetime | None = Field(
1658        default=None, description="When the rollout expires"
1659    )
1660    rc_docker_image_tag: str | None = Field(
1661        default=None, description="Docker image tag of the release candidate"
1662    )
1663    rc_docker_repository: str | None = Field(
1664        default=None, description="Docker repository of the release candidate"
1665    )
1666    initial_docker_image_tag: str | None = Field(
1667        default=None, description="Docker image tag of the initial version"
1668    )
1669    initial_docker_repository: str | None = Field(
1670        default=None, description="Docker repository of the initial version"
1671    )
1672    filters: dict[str, Any] | None = Field(
1673        default=None,
1674        description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})",
1675    )
1676    customer_tier: str | None = Field(
1677        default=None,
1678        description="Customer tier targeted by this rollout (extracted from filters), "
1679        "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.",
1680    )
1681
1682
1683def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None:
1684    """Parse the rollout filters field from a database row.
1685
1686    The filters field may be a JSON string, a dict, or None.
1687    """
1688    if filters_raw is None:
1689        return None
1690    if isinstance(filters_raw, dict):
1691        return filters_raw
1692    if isinstance(filters_raw, str):
1693        try:
1694            parsed = json.loads(filters_raw)
1695            if isinstance(parsed, dict):
1696                return parsed
1697        except (json.JSONDecodeError, TypeError):
1698            pass
1699    return None
1700
1701
1702def _extract_tier_from_filters(filters_raw: Any) -> str | None:
1703    """Extract customer tier from rollout filters JSON.
1704
1705    Supports two formats:
1706    - Legacy: `{"tierFilter": {"tier": "TIER_0"}}`
1707    - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}`
1708    """
1709    parsed = _parse_rollout_filters(filters_raw)
1710    if parsed is None:
1711        return None
1712
1713    # Current format: customerTierFilters list
1714    tier_filters = parsed.get("customerTierFilters")
1715    if isinstance(tier_filters, list):
1716        for entry in tier_filters:
1717            if isinstance(entry, dict) and entry.get("name") == "TIER":
1718                values = entry.get("value")
1719                if isinstance(values, list) and len(values) == 1:
1720                    return str(values[0])
1721                if isinstance(values, list) and len(values) > 1:
1722                    return ", ".join(str(v) for v in values)
1723
1724    # Legacy format: tierFilter dict
1725    tier_filter = parsed.get("tierFilter")
1726    if isinstance(tier_filter, dict):
1727        tier = tier_filter.get("tier")
1728        if isinstance(tier, str):
1729            return tier
1730
1731    return None
1732
1733
1734def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo:
1735    """Convert a database row to a ConnectorRolloutInfo model."""
1736    return ConnectorRolloutInfo(
1737        rollout_id=str(row["rollout_id"]),
1738        actor_definition_id=str(row["actor_definition_id"]),
1739        state=row["state"],
1740        initial_rollout_pct=row.get("initial_rollout_pct"),
1741        current_target_rollout_pct=row.get("current_target_rollout_pct"),
1742        final_target_rollout_pct=row.get("final_target_rollout_pct"),
1743        has_breaking_changes=row["has_breaking_changes"],
1744        max_step_wait_time_mins=row.get("max_step_wait_time_mins"),
1745        rollout_strategy=row.get("rollout_strategy"),
1746        updated_by_user_id=str(row["updated_by_user_id"])
1747        if row.get("updated_by_user_id") is not None
1748        else None,
1749        updated_by_user_name=row.get("updated_by_user_name"),
1750        updated_by_user_email=row.get("updated_by_user_email"),
1751        workflow_run_id=row.get("workflow_run_id"),
1752        error_msg=row.get("error_msg"),
1753        failed_reason=row.get("failed_reason"),
1754        paused_reason=row.get("paused_reason"),
1755        tag=row.get("tag"),
1756        created_at=row.get("created_at"),
1757        updated_at=row.get("updated_at"),
1758        completed_at=row.get("completed_at"),
1759        expires_at=row.get("expires_at"),
1760        rc_docker_image_tag=row.get("rc_docker_image_tag"),
1761        rc_docker_repository=row.get("rc_docker_repository"),
1762        initial_docker_image_tag=row.get("initial_docker_image_tag"),
1763        initial_docker_repository=row.get("initial_docker_repository"),
1764        filters=_parse_rollout_filters(row.get("filters")),
1765        customer_tier=_extract_tier_from_filters(row.get("filters")),
1766    )
1767
1768
1769@mcp_tool(
1770    read_only=True,
1771    idempotent=True,
1772)
1773def query_prod_connector_rollouts(
1774    actor_definition_id: Annotated[
1775        str | None,
1776        Field(description="Connector definition UUID to filter by (optional)"),
1777    ] = None,
1778    rollout_id: Annotated[
1779        str | None,
1780        Field(description="Specific rollout UUID to look up (optional)"),
1781    ] = None,
1782    active_only: Annotated[
1783        bool,
1784        Field(description="If true, only return active (non-terminal) rollouts"),
1785    ] = False,
1786    limit: Annotated[
1787        int,
1788        Field(description="Maximum number of results (default: 100)"),
1789    ] = 100,
1790) -> list[ConnectorRolloutInfo]:
1791    """Query connector rollouts with flexible filtering.
1792
1793    Returns rollouts based on the provided filters. If no filters are specified,
1794    returns all active rollouts. Useful for monitoring rollout status and history.
1795
1796    Filter behavior:
1797    - rollout_id: Returns that specific rollout (ignores other filters)
1798    - active_only: Returns only active (non-terminal) rollouts
1799    - actor_definition_id: Returns rollouts for that specific connector
1800    - No filters: Returns all active rollouts (same as active_only=True)
1801    """
1802    rows = query_connector_rollouts(
1803        actor_definition_id=actor_definition_id,
1804        rollout_id=rollout_id,
1805        active_only=active_only,
1806        limit=limit,
1807    )
1808    return [_row_to_connector_rollout_info(row) for row in rows]
1809
1810
1811@mcp_tool(
1812    read_only=True,
1813    idempotent=True,
1814    open_world=True,
1815)
1816def query_prod_connection_sync_activity(
1817    start_at: Annotated[
1818        datetime,
1819        Field(
1820            description=(
1821                "Inclusive start timestamp for the sync activity window. "
1822                "Must be timezone-aware (ISO 8601 with offset or `Z`)."
1823            ),
1824        ),
1825    ],
1826    end_at: Annotated[
1827        datetime,
1828        Field(
1829            description=(
1830                "Exclusive end timestamp for the sync activity window. "
1831                "Must be timezone-aware and strictly after `start_at`."
1832            ),
1833        ),
1834    ],
1835    organization_id: Annotated[
1836        str | OrganizationAliasEnum | None,
1837        Field(
1838            description=(
1839                "Optional organization UUID or alias. At least one of "
1840                "`organization_id`, `workspace_id`, or `connection_ids` is "
1841                "required. Accepts `@airbyte-internal` as an alias for the "
1842                "Airbyte internal org."
1843            ),
1844            default=None,
1845        ),
1846    ] = None,
1847    workspace_id: Annotated[
1848        str | WorkspaceAliasEnum | None,
1849        Field(
1850            description=(
1851                "Optional workspace UUID or alias. At least one of "
1852                "`organization_id`, `workspace_id`, or `connection_ids` is "
1853                "required. Accepts `@devin-ai-sandbox` as an alias for the "
1854                "Devin AI sandbox workspace."
1855            ),
1856            default=None,
1857        ),
1858    ] = None,
1859    connection_ids: Annotated[
1860        list[str] | None,
1861        Field(
1862            description=(
1863                "Optional list of connection UUIDs. At least one of "
1864                "`organization_id`, `workspace_id`, or `connection_ids` is "
1865                "required."
1866            ),
1867            default=None,
1868        ),
1869    ] = None,
1870    status_filter: Annotated[
1871        StatusFilter,
1872        Field(
1873            description=(
1874                "Filter by job status: `all` (default), `succeeded`, or "
1875                "`failed`. Applied to `jobs.status` in the Prod DB Replica."
1876            ),
1877            default=StatusFilter.ALL,
1878        ),
1879    ] = StatusFilter.ALL,
1880    limit: Annotated[
1881        int,
1882        Field(
1883            description="Maximum number of attempt rows to return.",
1884            default=1000,
1885        ),
1886    ] = 1000,
1887) -> list[dict[str, Any]]:
1888    """List recent sync jobs and attempts from the Prod DB Replica.
1889
1890    Returns one row per `(job, attempt)` pair for sync jobs whose `updated_at`
1891    falls in `[start_at, end_at)`, scoped to the provided organization,
1892    workspace, or connection IDs. Designed for live operational lookups —
1893    e.g. "what happened on this connection in the last hour" — not for
1894    historical analysis.
1895
1896    Each row is enriched with `customer_tier` and `is_eu` for the owning
1897    organization. Tier filtering is intentionally not applied — this is a
1898    read-only observability query.
1899
1900    Input requirements:
1901    - At least one of `organization_id`, `workspace_id`, or `connection_ids`
1902      must be provided (any combination is accepted).
1903    - `start_at` and `end_at` must be timezone-aware and `start_at < end_at`.
1904
1905    Key fields in each row:
1906    - `job_id`, `attempt_id`, `attempt_number`
1907    - `job_status`, `attempt_status`
1908    - `job_started_at`, `job_updated_at`, `attempt_ended_at`
1909    - `failure_summary` (JSON; populated when an attempt failed)
1910    - `connection_id`, `connection_name`, `connection_status`
1911    - `source_actor_id`, `source_actor_name`, `source_actor_definition_id`
1912    - `destination_actor_id`, `destination_actor_name`,
1913      `destination_actor_definition_id`
1914    - `workspace_id`, `workspace_name`, `organization_id`
1915    - `dataplane_group_id`, `dataplane_name`
1916    - `customer_tier`, `is_eu` (added by tier enrichment)
1917    """
1918    resolved_organization_id = OrganizationAliasEnum.resolve(organization_id)
1919    resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id)
1920    _validate_sync_activity_scope(
1921        organization_id=resolved_organization_id,
1922        workspace_id=resolved_workspace_id,
1923        connection_ids=connection_ids,
1924    )
1925    normalized_start_at, normalized_end_at = _validate_sync_activity_window(
1926        start_at=start_at,
1927        end_at=end_at,
1928    )
1929
1930    rows = query_connection_sync_activity_from_prod(
1931        start_at=normalized_start_at,
1932        end_at=normalized_end_at,
1933        organization_id=resolved_organization_id,
1934        workspace_id=resolved_workspace_id,
1935        connection_ids=connection_ids,
1936        status_filter=status_filter.value,
1937        limit=limit,
1938    )
1939    return enrich_rows_by_org(rows)
1940
1941
1942# =============================================================================
1943# Pinned Connector Versions Models and Tools
1944# =============================================================================
1945
1946
1947class PinnedConnectorVersionInfo(BaseModel):
1948    """A connector version that has at least one scoped configuration pin."""
1949
1950    version_id: str = Field(description="The actor_definition_version UUID")
1951    connector_definition_id: str = Field(description="The connector definition UUID")
1952    connector_name: str = Field(description="Human-readable connector name")
1953    docker_repository: str = Field(description="Docker repository path")
1954    docker_image_tag: str = Field(description="Docker image tag for this version")
1955    last_published: str | None = Field(
1956        default=None, description="ISO timestamp when this version was last published"
1957    )
1958    pin_count: int = Field(
1959        description="Total number of scoped_configuration rows pinning to this version"
1960    )
1961    breaking_change_pins: int = Field(
1962        default=0,
1963        description="Number of actor-scoped pins created by breaking changes",
1964    )
1965    rollout_pins: int = Field(
1966        default=0,
1967        description="Number of pins created by connector rollouts",
1968    )
1969    actor_pins: int = Field(
1970        description="Number of actor-scoped pins (excludes breaking change and rollout pins)"
1971    )
1972    workspace_pins: int = Field(description="Number of workspace-scoped pins")
1973    org_pins: int = Field(description="Number of organization-scoped pins")
1974
1975
1976@mcp_tool(
1977    read_only=True,
1978    idempotent=True,
1979    open_world=True,
1980)
1981def query_connector_pin_stats(
1982    connector_definition_id: Annotated[
1983        str | None,
1984        Field(
1985            description="Connector definition UUID to filter by (optional). "
1986            "Mutually exclusive with `connector_canonical_name`."
1987        ),
1988    ] = None,
1989    connector_canonical_name: Annotated[
1990        str | None,
1991        Field(
1992            description="Connector canonical name (e.g. `source-postgres`) to filter by. "
1993            "Resolved to a definition ID via the registry. "
1994            "Mutually exclusive with `connector_definition_id`."
1995        ),
1996    ] = None,
1997) -> list[PinnedConnectorVersionInfo]:
1998    """Query connector versions that have at least one scoped configuration pin.
1999
2000    Returns versions from the prod DB that are referenced by at least one
2001    `scoped_configuration` pin (`key = 'connector_version'`).  Each version
2002    appears exactly once with per-scope pin breakdown (actor, workspace, org).
2003
2004    If neither filter is provided, returns the global superset across all connectors.
2005    """
2006    if connector_definition_id and connector_canonical_name:
2007        raise PyAirbyteInputError(
2008            message=(
2009                "Provide at most one of `connector_definition_id` or "
2010                "`connector_canonical_name`, not both."
2011            ),
2012        )
2013
2014    resolved_id: str | None = None
2015    if connector_canonical_name:
2016        resolved_id = resolve_canonical_name_to_definition_id(
2017            canonical_name=connector_canonical_name,
2018        )
2019    elif connector_definition_id:
2020        resolved_id = connector_definition_id
2021
2022    rows = query_versions_with_pins(actor_definition_id=resolved_id)
2023    return [
2024        PinnedConnectorVersionInfo(
2025            version_id=str(row["version_id"]),
2026            connector_definition_id=str(row["connector_definition_id"]),
2027            connector_name=row["connector_name"],
2028            docker_repository=row["docker_repository"],
2029            docker_image_tag=row["docker_image_tag"],
2030            last_published=(
2031                row["last_published"].isoformat() if row.get("last_published") else None
2032            ),
2033            pin_count=row["pin_count"],
2034            breaking_change_pins=row.get("breaking_change_pins", 0),
2035            rollout_pins=row.get("rollout_pins", 0),
2036            actor_pins=row.get("actor_pins", 0),
2037            workspace_pins=row.get("workspace_pins", 0),
2038            org_pins=row.get("org_pins", 0),
2039        )
2040        for row in rows
2041    ]
2042
2043
2044def register_prod_db_query_tools(app: FastMCP) -> None:
2045    """Register prod DB query tools with the FastMCP app."""
2046    register_mcp_tools(app, mcp_module=__name__)