airbyte_ops_mcp.mcp.prod_db_queries
MCP tools for querying the Airbyte Cloud Prod DB Replica.
This module provides MCP tools that wrap the query functions from airbyte_ops_mcp.prod_db_access.queries for use by AI agents.
MCP reference
MCP primitives registered by the prod_db_queries module of the airbyte-internal-ops server: 16 tool(s), 0 prompt(s), 0 resource(s).
Tools (16)
query_connector_pin_stats
Hints: read-only · idempotent · open-world
Query connector versions that have at least one scoped configuration pin.
Returns versions from the prod DB that are referenced by at least one
scoped_configuration pin (key = 'connector_version'). Each version
appears exactly once with per-scope pin breakdown (actor, workspace, org).
If neither filter is provided, returns the global superset across all connectors.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_definition_id |
string | null |
no | null |
Connector definition UUID to filter by (optional). Mutually exclusive with connector_canonical_name. |
connector_canonical_name |
string | null |
no | null |
Connector canonical name (e.g. source-postgres) to filter by. Resolved to a definition ID via the registry. Mutually exclusive with connector_definition_id. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connector definition UUID to filter by (optional). Mutually exclusive with `connector_canonical_name`."
},
"connector_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connector canonical name (e.g. `source-postgres`) to filter by. Resolved to a definition ID via the registry. Mutually exclusive with `connector_definition_id`."
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "A connector version that has at least one scoped configuration pin.",
"properties": {
"version_id": {
"description": "The actor_definition_version UUID",
"type": "string"
},
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_name": {
"description": "Human-readable connector name",
"type": "string"
},
"docker_repository": {
"description": "Docker repository path",
"type": "string"
},
"docker_image_tag": {
"description": "Docker image tag for this version",
"type": "string"
},
"last_published": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "ISO timestamp when this version was last published"
},
"pin_count": {
"description": "Total number of scoped_configuration rows pinning to this version",
"type": "integer"
},
"breaking_change_pins": {
"default": 0,
"description": "Number of actor-scoped pins created by breaking changes",
"type": "integer"
},
"rollout_pins": {
"default": 0,
"description": "Number of pins created by connector rollouts",
"type": "integer"
},
"actor_pins": {
"description": "Number of actor-scoped pins (excludes breaking change and rollout pins)",
"type": "integer"
},
"workspace_pins": {
"description": "Number of workspace-scoped pins",
"type": "integer"
},
"org_pins": {
"description": "Number of organization-scoped pins",
"type": "integer"
}
},
"required": [
"version_id",
"connector_definition_id",
"connector_name",
"docker_repository",
"docker_image_tag",
"pin_count",
"actor_pins",
"workspace_pins",
"org_pins"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_actors_by_pinned_connector_version
Hints: read-only · idempotent
List actors (sources/destinations) effectively pinned to a specific connector version.
Returns all actors that are effectively pinned to a specific connector version, considering all scope levels: actor-level pins, workspace-level pins, and organization-level pins (with actor > workspace > organization precedence). Useful for monitoring rollouts and understanding which customers are affected.
The actor_id field is the actor ID (superset of source_id/destination_id).
Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, origin, description, created_at, expires_at, pin_scope_type, actor_name, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_version_id |
string |
yes | — | Connector version UUID to find pinned instances for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_version_id": {
"description": "Connector version UUID to find pinned instances for",
"type": "string"
}
},
"required": [
"connector_version_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connection_sync_activity
Hints: read-only · idempotent · open-world
List recent sync jobs and attempts from the Prod DB Replica.
Returns one row per (job, attempt) pair for sync jobs whose updated_at
falls in [start_at, end_at), scoped to the provided organization,
workspace, or connection IDs. Designed for live operational lookups —
e.g. "what happened on this connection in the last hour" — not for
historical analysis.
Each row is enriched with customer_tier and is_eu for the owning
organization. Tier filtering is intentionally not applied — this is a
read-only observability query.
Input requirements:
- At least one of
organization_id,workspace_id, orconnection_idsmust be provided (any combination is accepted). start_atandend_atmust be timezone-aware andstart_at < end_at.
Key fields in each row:
job_id,attempt_id,attempt_numberjob_status,attempt_statusjob_started_at,job_updated_at,attempt_ended_atfailure_summary(JSON; populated when an attempt failed)connection_id,connection_name,connection_statussource_actor_id,source_actor_name,source_actor_definition_iddestination_actor_id,destination_actor_name,destination_actor_definition_idworkspace_id,workspace_name,organization_iddataplane_group_id,dataplane_namecustomer_tier,is_eu(added by tier enrichment)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
start_at |
string |
yes | — | Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or Z). |
end_at |
string |
yes | — | Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after start_at. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @airbyte-internal as an alias for the Airbyte internal org. |
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") | null |
no | null |
Optional workspace UUID or alias. At least one of organization_id, workspace_id, or connection_ids is required. Accepts @devin-ai-sandbox as an alias for the Devin AI sandbox workspace. |
connection_ids |
array<string> | null |
no | null |
Optional list of connection UUIDs. At least one of organization_id, workspace_id, or connection_ids is required. |
status_filter |
enum("all", "succeeded", "failed") |
no | "all" |
Filter by job status: all (default), succeeded, or failed. Applied to jobs.status in the Prod DB Replica. |
limit |
integer |
no | 1000 |
Maximum number of attempt rows to return. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"start_at": {
"description": "Inclusive start timestamp for the sync activity window. Must be timezone-aware (ISO 8601 with offset or `Z`).",
"format": "date-time",
"type": "string"
},
"end_at": {
"description": "Exclusive end timestamp for the sync activity window. Must be timezone-aware and strictly after `start_at`.",
"format": "date-time",
"type": "string"
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@airbyte-internal` as an alias for the Airbyte internal org."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional workspace UUID or alias. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required. Accepts `@devin-ai-sandbox` as an alias for the Devin AI sandbox workspace."
},
"connection_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional list of connection UUIDs. At least one of `organization_id`, `workspace_id`, or `connection_ids` is required."
},
"status_filter": {
"description": "Filter by job status: `all` (default), `succeeded`, or `failed`. Applied to `jobs.status` in the Prod DB Replica.",
"enum": [
"all",
"succeeded",
"failed"
],
"type": "string",
"default": "all"
},
"limit": {
"default": 1000,
"description": "Maximum number of attempt rows to return.",
"type": "integer"
}
},
"required": [
"start_at",
"end_at"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connections_by_connector
Hints: read-only · idempotent · open-world
Search for all connections using a specific source or destination connector type.
This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. It finds all connections where the source or destination connector matches the specified type, regardless of how the connector is named by users.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Optionally filter by organization_id to limit results to a specific organization. Use '@airbyte-internal' as an alias for the Airbyte internal organization.
Set exclude_pinned=True to filter out connections that are already pinned to a
specific version. This is useful for 'prove fix' live connection testing workflows
where you want to find unpinned connections to test against.
Set enabled_schedules_only=True to restrict results to connections that are both
enabled (status='active') and on an automated schedule (not manual-trigger-only).
This is useful for canary prerelease workflows where you need connections that
will run organically during the monitoring window.
Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. For source queries, returns: connection_id, connection_name, connection_url, source_id, source_name, source_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. For destination queries, returns: connection_id, connection_name, connection_url, destination_id, destination_name, destination_definition_id, workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu.
pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope level the effective pin came from (NULL if not pinned).
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
destination_definition_id |
string | null |
no | null |
Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB. |
destination_canonical_name |
string | null |
no | null |
Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
limit |
integer |
no | 1000 |
Maximum number of results (default: 1000) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
exclude_pinned |
boolean |
no | false |
If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections). |
enabled_schedules_only |
boolean |
no | false |
If True, only return connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"destination_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Destination connector definition ID (UUID) to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB."
},
"destination_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical destination connector name to search for. Exactly one of source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name is required. Examples: 'destination-duckdb', 'DuckDB'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"limit": {
"default": 1000,
"description": "Maximum number of results (default: 1000)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
"exclude_pinned": {
"default": false,
"description": "If True, exclude connections whose connector is already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all connections).",
"type": "boolean"
},
"enabled_schedules_only": {
"default": false,
"description": "If True, only return connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections).",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connections_by_stream
Hints: read-only · idempotent · open-world
Find connections that have a specific stream enabled in their catalog.
This tool searches the connection's configured catalog (JSONB) for streams matching the specified name. It's particularly useful when validating connector fixes that affect specific streams - you can quickly find customer connections that use the affected stream.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Use cases:
- Finding connections with a specific stream enabled for regression testing
- Validating connector fixes that affect particular streams
- Identifying which customers use rarely-enabled streams
Returns a list of connection dicts with workspace context and clickable Cloud UI URLs.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
stream_name |
string |
yes | — | Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'. |
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"stream_name": {
"description": "Name of the stream to search for in connection catalogs. This must match the exact stream name as configured in the connection. Examples: 'global_exclusions', 'campaigns', 'users'.",
"type": "string"
},
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Provide this OR source_definition_id (exactly one required). Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only connections in this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"required": [
"stream_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connector_connection_stats
Hints: read-only · idempotent · open-world
Get aggregate connection stats for multiple connectors.
Returns counts of connections grouped by pinned version for each connector, including:
- Total, enabled, and active connection counts
- Pinned vs unpinned breakdown
- Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown)
This tool is designed for release monitoring workflows. It allows you to:
- Query recently released connectors to identify which ones to monitor
- Get aggregate stats showing how many connections are using each version
- See health metrics (pass/fail) broken down by version
The lookback_days parameter controls the lookback window for:
- Counting 'active' connections (those with recent sync activity)
- Determining 'latest attempt status' (most recent attempt within the window)
Connections with no sync activity in the lookback window will have 'unknown' status in the latest_attempt breakdown.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_ids |
array<string> | null |
no | null |
List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139'] |
destination_definition_ids |
array<string> | null |
no | null |
List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610'] |
lookback_days |
integer |
no | 7 |
Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of source connector definition IDs (UUIDs) to get stats for. Example: ['afa734e4-3571-11ec-991a-1e0031268139']"
},
"destination_definition_ids": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "List of destination connector definition IDs (UUIDs) to get stats for. Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']"
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back for 'active' connections (default: 7). Connections with sync activity within this window are counted as active.",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Response containing connection stats for multiple connectors.",
"properties": {
"sources": {
"description": "Stats for source connectors",
"items": {
"description": "Aggregate connection stats for a connector.",
"properties": {
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_type": {
"description": "'source' or 'destination'",
"type": "string"
},
"canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The canonical connector name if resolved"
},
"total_connections": {
"description": "Total number of non-deprecated connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"pinned_connections": {
"description": "Number of connections with explicit version pins",
"type": "integer"
},
"unpinned_connections": {
"description": "Number of connections on default version",
"type": "integer"
},
"latest_attempt": {
"description": "Overall breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
},
"by_version": {
"description": "Stats broken down by pinned version",
"items": {
"description": "Stats for connections pinned to a specific version.",
"properties": {
"pinned_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"description": "The connector version UUID (None for unpinned connections)"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag for this version"
},
"total_connections": {
"description": "Total number of connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"latest_attempt": {
"description": "Breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"pinned_version_id",
"total_connections",
"enabled_connections",
"active_connections",
"latest_attempt"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"connector_definition_id",
"connector_type",
"total_connections",
"enabled_connections",
"active_connections",
"pinned_connections",
"unpinned_connections",
"latest_attempt",
"by_version"
],
"type": "object"
},
"type": "array"
},
"destinations": {
"description": "Stats for destination connectors",
"items": {
"description": "Aggregate connection stats for a connector.",
"properties": {
"connector_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"connector_type": {
"description": "'source' or 'destination'",
"type": "string"
},
"canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The canonical connector name if resolved"
},
"total_connections": {
"description": "Total number of non-deprecated connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"pinned_connections": {
"description": "Number of connections with explicit version pins",
"type": "integer"
},
"unpinned_connections": {
"description": "Number of connections on default version",
"type": "integer"
},
"latest_attempt": {
"description": "Overall breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
},
"by_version": {
"description": "Stats broken down by pinned version",
"items": {
"description": "Stats for connections pinned to a specific version.",
"properties": {
"pinned_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"description": "The connector version UUID (None for unpinned connections)"
},
"docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The docker image tag for this version"
},
"total_connections": {
"description": "Total number of connections",
"type": "integer"
},
"enabled_connections": {
"description": "Number of enabled (active status) connections",
"type": "integer"
},
"active_connections": {
"description": "Number of connections with recent sync activity",
"type": "integer"
},
"latest_attempt": {
"description": "Breakdown by latest attempt status",
"properties": {
"succeeded": {
"default": 0,
"description": "Connections where latest attempt succeeded",
"type": "integer"
},
"failed": {
"default": 0,
"description": "Connections where latest attempt failed",
"type": "integer"
},
"cancelled": {
"default": 0,
"description": "Connections where latest attempt was cancelled",
"type": "integer"
},
"running": {
"default": 0,
"description": "Connections where latest attempt is still running",
"type": "integer"
},
"unknown": {
"default": 0,
"description": "Connections with no recent attempts in the lookback window",
"type": "integer"
}
},
"type": "object"
}
},
"required": [
"pinned_version_id",
"total_connections",
"enabled_connections",
"active_connections",
"latest_attempt"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"connector_definition_id",
"connector_type",
"total_connections",
"enabled_connections",
"active_connections",
"pinned_connections",
"unpinned_connections",
"latest_attempt",
"by_version"
],
"type": "object"
},
"type": "array"
},
"lookback_days": {
"description": "Lookback window used for 'active' connections",
"type": "integer"
},
"generated_at": {
"description": "When this response was generated",
"format": "date-time",
"type": "string"
}
},
"required": [
"lookback_days",
"generated_at"
],
"type": "object"
}
query_prod_connector_rollouts
Hints: read-only · idempotent
Query connector rollouts with flexible filtering.
Returns rollouts based on the provided filters. If no filters are specified, returns all active rollouts. Useful for monitoring rollout status and history.
Filter behavior:
- rollout_id: Returns that specific rollout (ignores other filters)
- active_only: Returns only active (non-terminal) rollouts
- actor_definition_id: Returns rollouts for that specific connector
- No filters: Returns all active rollouts (same as active_only=True)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
actor_definition_id |
string | null |
no | null |
Connector definition UUID to filter by (optional) |
rollout_id |
string | null |
no | null |
Specific rollout UUID to look up (optional) |
active_only |
boolean |
no | false |
If true, only return active (non-terminal) rollouts |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"actor_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connector definition UUID to filter by (optional)"
},
"rollout_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Specific rollout UUID to look up (optional)"
},
"active_only": {
"default": false,
"description": "If true, only return active (non-terminal) rollouts",
"type": "boolean"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a connector rollout.",
"properties": {
"rollout_id": {
"description": "The rollout UUID",
"type": "string"
},
"actor_definition_id": {
"description": "The connector definition UUID",
"type": "string"
},
"state": {
"description": "Rollout state: initialized, workflow_started, in_progress, paused, finalizing, succeeded, errored, failed_rolled_back, canceled",
"type": "string"
},
"initial_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Initial rollout percentage"
},
"current_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Current target rollout percentage"
},
"final_target_rollout_pct": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Final target rollout percentage"
},
"has_breaking_changes": {
"description": "Whether the RC has breaking changes",
"type": "boolean"
},
"max_step_wait_time_mins": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Maximum wait time between rollout steps in minutes"
},
"rollout_strategy": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Rollout strategy: manual, automated, overridden"
},
"updated_by_user_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "User ID recorded as last updating the rollout"
},
"updated_by_user_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Name recorded as last updating the rollout"
},
"updated_by_user_email": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Email recorded as last updating the rollout"
},
"workflow_run_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Temporal workflow run ID"
},
"error_msg": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Error message if errored"
},
"failed_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Reason for failure if failed"
},
"paused_reason": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Reason for pause if paused"
},
"tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional tag for the rollout"
},
"created_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout was created"
},
"updated_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout was last updated"
},
"completed_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout completed (if terminal)"
},
"expires_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the rollout expires"
},
"rc_docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker image tag of the release candidate"
},
"rc_docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker repository of the release candidate"
},
"initial_docker_image_tag": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker image tag of the initial version"
},
"initial_docker_repository": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Docker repository of the initial version"
},
"filters": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
],
"default": null,
"description": "Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier targeted by this rollout (extracted from filters), e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set."
}
},
"required": [
"rollout_id",
"actor_definition_id",
"state",
"has_breaking_changes"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_connector_versions
Hints: read-only · idempotent
List all versions for a connector definition.
Returns all published versions of a connector, ordered by last_published date descending. Useful for understanding version history and finding specific version IDs for pinning or rollout monitoring.
Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, release_stage, support_level, cdk_version, language, last_published, release_date
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_definition_id |
string |
yes | — | Connector definition UUID to list versions for |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_definition_id": {
"description": "Connector definition UUID to list versions for",
"type": "string"
}
},
"required": [
"connector_definition_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_dataplanes
Hints: read-only · idempotent
List all dataplane groups with workspace counts.
Returns information about all active dataplane groups in Airbyte Cloud, including the number of workspaces in each. Useful for understanding the distribution of workspaces across regions (US, US-Central, EU).
Returns list of dicts with keys: dataplane_group_id, dataplane_name, organization_id, enabled, tombstone, created_at, workspace_count
Parameters:
_No parameters._
Show input JSON schema
{
"additionalProperties": false,
"properties": {},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_failed_sync_attempts_for_connector
Hints: read-only · idempotent · open-world
List failed sync attempts for ALL actors using a source connector type.
This tool finds all actors with the given connector definition and returns their failed sync attempts, regardless of whether they have explicit version pins.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
This is useful for investigating connector issues across all users. Use this when you want to find failures for a connector type regardless of which version users are on.
Note: This tool only supports SOURCE connectors. For destination connectors, a separate tool would be needed.
Key fields in results:
- failure_summary: JSON containing failure details including failureType and messages
- customer_tier: TIER_0, TIER_1, or TIER_2
- is_eu: Whether the workspace is in the EU region
- pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
- pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
lookback_days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Exactly one of this or source_canonical_name is required. Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Exactly one of this or source_definition_id is required. Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only failed attempts from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_new_connector_releases
Hints: read-only · idempotent
List recently published connector versions.
Returns connector versions published within the specified number of days. Uses last_published timestamp which reflects when the version was actually deployed to the registry (not the changelog date).
Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, docker_image_tag, last_published, release_date, release_stage, support_level, cdk_version, language, created_at
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_organizations
Hints: read-only · idempotent
Search organizations by name or email substring.
Performs a case-insensitive substring match on organization name and email.
Use the returned organization_id values with other tools like
query_prod_connections_by_connector or lookup_customer_tiers.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
name_contains |
string |
yes | — | Case-insensitive substring to search for in organization name or email. For example, 'acme' will match organizations named 'Acme Corp' or with email 'admin@acme.io'. |
limit |
integer |
no | 20 |
Maximum number of organizations to return (default: 20) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"name_contains": {
"description": "Case-insensitive substring to search for in organization name or email. For example, 'acme' will match organizations named 'Acme Corp' or with email 'admin@acme.io'.",
"type": "string"
},
"limit": {
"default": 20,
"description": "Maximum number of organizations to return (default: 20)",
"type": "integer"
}
},
"required": [
"name_contains"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of searching organizations by name substring.",
"properties": {
"name_contains": {
"description": "The search substring that was used",
"type": "string"
},
"total_found": {
"description": "Total number of organizations matching",
"type": "integer"
},
"organizations": {
"description": "List of matching organizations",
"items": {
"description": "A single organization returned by a name/email search.",
"properties": {
"organization_id": {
"description": "The organization UUID",
"type": "string"
},
"organization_name": {
"description": "The name of the organization",
"type": "string"
},
"email": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The email address associated with the organization"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
}
},
"required": [
"organization_id",
"organization_name"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"name_contains",
"total_found",
"organizations"
],
"type": "object"
}
query_prod_recent_syncs_for_connector
Hints: read-only · idempotent · open-world
List recent sync jobs for ALL actors using a connector type.
This tool finds all actors with the given connector definition and returns their recent sync jobs, regardless of whether they have explicit version pins. It filters out deleted actors, deleted workspaces, and deprecated connections.
Results are always enriched with customer_tier and is_eu fields. The customer_tier_filter parameter is required to ensure tier-aware querying.
Use this tool to:
- Find healthy connections with recent successful syncs (status_filter='succeeded')
- Investigate connector issues across all users (status_filter='failed')
- Get an overview of all recent sync activity (status_filter='all')
Set exclude_pinned=True to filter out syncs for actors that are already pinned to a
specific version. This is useful for 'prove fix' live connection testing workflows
where you want to find unpinned connections to test against.
Set enabled_schedules_only=True to restrict results to connections that are both
enabled (status='active') and on an automated schedule (not manual-trigger-only).
This is useful for canary prerelease workflows where you need connections that
will run organically during the monitoring window.
Supports both SOURCE and DESTINATION connectors. Provide exactly one of: source_definition_id, source_canonical_name, destination_definition_id, or destination_canonical_name.
Key fields in results:
- job_status: 'succeeded', 'failed', 'cancelled', etc.
- connection_id, connection_name: The connection that ran the sync
- actor_id, actor_name: The source or destination actor
- customer_tier: TIER_0, TIER_1, or TIER_2
- is_eu: Whether the workspace is in the EU region
- pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned)
- pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned)
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_definition_id |
string | null |
no | null |
Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics. |
source_canonical_name |
string | null |
no | null |
Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'. |
destination_definition_id |
string | null |
no | null |
Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB. |
destination_canonical_name |
string | null |
no | null |
Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'. |
status_filter |
enum("all", "succeeded", "failed") |
no | "all" |
Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures. |
organization_id |
string | enum("664c690e-5263-49ba-b01f-4a6759b3330a") | null |
no | null |
Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org. |
lookback_days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
customer_tier_filter |
enum("TIER_0", "TIER_1", "TIER_2", "ALL") |
no | "TIER_2" |
Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers. |
exclude_pinned |
boolean |
no | false |
If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs). |
enabled_schedules_only |
boolean |
no | false |
If True, only return syncs for connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Source connector definition ID (UUID) to search for. Provide this OR source_canonical_name OR destination_definition_id OR destination_canonical_name (exactly one required). Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics."
},
"source_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical source connector name to search for. Provide this OR source_definition_id OR destination_definition_id OR destination_canonical_name (exactly one required). Examples: 'source-youtube-analytics', 'YouTube Analytics'."
},
"destination_definition_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Destination connector definition ID (UUID) to search for. Provide this OR destination_canonical_name OR source_definition_id OR source_canonical_name (exactly one required). Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB."
},
"destination_canonical_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical destination connector name to search for. Provide this OR destination_definition_id OR source_definition_id OR source_canonical_name (exactly one required). Examples: 'destination-duckdb', 'DuckDB'."
},
"status_filter": {
"description": "Filter by job status: 'all' (default), 'succeeded', or 'failed'. Use 'succeeded' to find healthy connections with recent successful syncs. Use 'failed' to find connections with recent failures.",
"enum": [
"all",
"succeeded",
"failed"
],
"type": "string",
"default": "all"
},
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Organization ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@airbyte-internal\") and its value\nis the actual organization UUID. Use `OrganizationAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"664c690e-5263-49ba-b01f-4a6759b3330a"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional organization ID (UUID) or alias to filter results. If provided, only syncs from this organization will be returned. Accepts '@airbyte-internal' as an alias for the Airbyte internal org."
},
"lookback_days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"customer_tier_filter": {
"default": "TIER_2",
"description": "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. Filters results to only include connections belonging to organizations in the specified tier. Use 'ALL' to include all tiers.",
"enum": [
"TIER_0",
"TIER_1",
"TIER_2",
"ALL"
],
"type": "string"
},
"exclude_pinned": {
"default": false,
"description": "If True, exclude syncs for actors that are already pinned to a specific version (at any scope level: actor, workspace, or organization). Useful for 'prove fix' workflows where you want to find unpinned connections for live testing. Default: False (include all syncs).",
"type": "boolean"
},
"enabled_schedules_only": {
"default": false,
"description": "If True, only return syncs for connections that are both active (not paused/inactive) and on an automated sync schedule (not manual-trigger-only). Useful for canary workflows where you need connections that will produce organic syncs during a monitoring window. Default: False (include all connections).",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_recent_syncs_for_connector_version
Hints: read-only · idempotent
List sync jobs that were run with a specific connector version.
Works for both source and destination connectors. Automatically detects the connector type from the version metadata and uses the appropriate query variant.
Accepts either connector_version_id (UUID) or connector_name +
connector_version (e.g. source-pokeapi + 0.3.59). When using
name + version, the docker_repository is derived from the canonical
name (e.g. source-pokeapi → airbyte/source-pokeapi).
Filters on the version stamped into jobs.config at job-creation time,
not the current pin state. This avoids false positives (pre-pin syncs
counted as RC) and false negatives (post-unpin syncs missed).
Pin columns (pin_origin_type, pin_origin, pin_scope_type) are
still included as informational output but are not used for filtering.
Returns list of dicts with keys: job_id, connection_id, job_status,
started_at, job_updated_at, connection_name, actor_id, actor_name,
actor_definition_id, source_definition_version_id,
destination_definition_version_id, pin_origin_type,
pin_origin, pin_scope_type, workspace_id, workspace_name,
organization_id, dataplane_group_id, dataplane_name.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connector_version_id |
string | null |
no | null |
Connector version UUID. Provide this OR connector_name + connector_version. |
connector_name |
string | null |
no | null |
Canonical connector name (e.g. source-pokeapi, destination-duckdb). Used with connector_version to resolve the version UUID. |
connector_version |
string | null |
no | null |
Semver version tag (e.g. 0.3.59). Used with connector_name to resolve the version UUID. |
days |
integer |
no | 7 |
Number of days to look back (default: 7) |
limit |
integer |
no | 100 |
Maximum number of results (default: 100) |
successful_only |
boolean |
no | false |
If True, only return successful syncs (default: False) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connector_version_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Connector version UUID. Provide this OR connector_name + connector_version."
},
"connector_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Canonical connector name (e.g. `source-pokeapi`, `destination-duckdb`). Used with `connector_version` to resolve the version UUID."
},
"connector_version": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Semver version tag (e.g. `0.3.59`). Used with `connector_name` to resolve the version UUID."
},
"days": {
"default": 7,
"description": "Number of days to look back (default: 7)",
"type": "integer"
},
"limit": {
"default": 100,
"description": "Maximum number of results (default: 100)",
"type": "integer"
},
"successful_only": {
"default": false,
"description": "If `True`, only return successful syncs (default: `False`)",
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_workspace_info
Hints: read-only · idempotent
Get workspace information including dataplane group.
Returns details about a specific workspace, including which dataplane (region) it belongs to. Useful for determining if a workspace is in the EU region for filtering purposes.
Returns dict with keys: workspace_id, workspace_name, slug, organization_id, dataplane_group_id, dataplane_name, created_at, tombstone Or None if workspace not found.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | enum("266ebdfe-0d7b-4540-9817-de7e4505ba61") |
yes | — | Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"description": "Workspace ID aliases that can be used in place of UUIDs.\n\nEach member's name is the alias (e.g., \"@devin-ai-sandbox\") and its value\nis the actual workspace UUID. Use `WorkspaceAliasEnum.resolve()` to\nresolve aliases to actual IDs.",
"enum": [
"266ebdfe-0d7b-4540-9817-de7e4505ba61"
],
"type": "string"
}
],
"description": "Workspace UUID or alias to look up. Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace."
}
},
"required": [
"workspace_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "null"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
query_prod_workspaces
Hints: read-only · idempotent
Search workspaces by name substring or email domain.
At least one of name_contains or email_domain must be provided.
When name_contains is given, performs a case-insensitive substring match
on workspace name and slug. When email_domain is given, matches
workspaces by user email domain.
The returned organization IDs can be used with other tools like
query_prod_connections_by_connector to find connections within
those organizations for safe testing.
Parameters:
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
name_contains |
string | null |
no | null |
Case-insensitive substring to search for in workspace name or slug. For example, 'acme' will match workspaces named 'Acme Staging'. |
email_domain |
string | null |
no | null |
Email domain to search for (e.g., 'motherduck.com'). Do not include the '@' symbol. |
limit |
integer |
no | 100 |
Maximum number of workspaces to return (default: 100) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Case-insensitive substring to search for in workspace name or slug. For example, 'acme' will match workspaces named 'Acme Staging'."
},
"email_domain": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Email domain to search for (e.g., 'motherduck.com'). Do not include the '@' symbol."
},
"limit": {
"default": 100,
"description": "Maximum number of workspaces to return (default: 100)",
"type": "integer"
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Result of searching workspaces by name or email domain.",
"properties": {
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name substring that was searched for"
},
"email_domain": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The email domain that was searched for (e.g., 'motherduck.com')"
},
"total_workspaces_found": {
"description": "Total number of workspaces matching",
"type": "integer"
},
"unique_organization_ids": {
"description": "List of unique organization IDs found",
"items": {
"type": "string"
},
"type": "array"
},
"workspaces": {
"description": "List of matching workspaces",
"items": {
"description": "Information about a workspace.",
"properties": {
"organization_id": {
"description": "The organization UUID",
"type": "string"
},
"workspace_id": {
"description": "The workspace UUID",
"type": "string"
},
"workspace_name": {
"description": "The name of the workspace",
"type": "string"
},
"slug": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The workspace slug (URL-friendly identifier)"
},
"email": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The email address associated with the workspace"
},
"dataplane_group_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The dataplane group UUID (region)"
},
"dataplane_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the dataplane (e.g., 'US', 'EU')"
},
"created_at": {
"anyOf": [
{
"format": "date-time",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "When the workspace was created"
},
"customer_tier": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache."
},
"is_eu": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Whether the workspace is in the EU region (derived from dataplane_name)."
}
},
"required": [
"organization_id",
"workspace_id",
"workspace_name"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"total_workspaces_found",
"unique_organization_ids",
"workspaces"
],
"type": "object"
}
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2"""MCP tools for querying the Airbyte Cloud Prod DB Replica. 3 4This module provides MCP tools that wrap the query functions from 5airbyte_ops_mcp.prod_db_access.queries for use by AI agents. 6 7## MCP reference 8 9.. include:: ../../../docs/mcp-generated/prod_db_queries.md 10 :start-line: 2 11""" 12 13from __future__ import annotations 14 15__all__: list[str] = [] 16 17import json 18from datetime import datetime, timezone 19from enum import StrEnum 20from typing import Annotated, Any 21 22from airbyte.exceptions import PyAirbyteInputError 23from fastmcp import FastMCP 24from fastmcp_extensions import mcp_tool, register_mcp_tools 25from pydantic import BaseModel, Field 26 27from airbyte_ops_mcp.cloud_admin.registry_lookup import ( 28 resolve_canonical_name_to_definition_id, 29) 30from airbyte_ops_mcp.constants import OrganizationAliasEnum, WorkspaceAliasEnum 31from airbyte_ops_mcp.prod_db_access.queries import ( 32 is_source_connector, 33 query_actors_pinned_to_version, 34 query_connection_sync_activity_from_prod, 35 query_connections_by_connector, 36 query_connections_by_destination_connector, 37 query_connections_by_stream, 38 query_connector_rollouts, 39 query_connector_versions, 40 query_dataplanes_list, 41 query_destination_connection_stats, 42 query_failed_sync_attempts_for_connector, 43 query_new_connector_releases, 44 query_recent_syncs_for_connector, 45 query_source_connection_stats, 46 query_syncs_for_connector_version, 47 query_versions_with_pins, 48 query_workspace_info, 49 query_workspaces_by_email_domain, 50 resolve_version_id_by_tag, 51 resolve_version_info, 52 search_organizations, 53 search_workspaces, 54) 55from airbyte_ops_mcp.tier_cache import ( 56 TierFilter, 57 enrich_rows_by_org, 58 filter_rows_by_tier, 59 get_org_tiers, 60) 61 62 63class StatusFilter(StrEnum): 64 """Filter for job status in sync queries.""" 65 66 ALL = "all" 67 SUCCEEDED = "succeeded" 68 FAILED = "failed" 69 70 71# Cloud UI base URL for building connection URLs 72CLOUD_UI_BASE_URL = "https://cloud.airbyte.com" 73 74 75def _validate_sync_activity_scope( 76 *, 77 organization_id: str | None, 78 workspace_id: str | None, 79 connection_ids: list[str] | None, 80) -> None: 81 """Require at least one explicit scope filter for sync activity queries.""" 82 if organization_id or workspace_id or connection_ids: 83 return 84 raise PyAirbyteInputError( 85 message=( 86 "Provide at least one scope filter: `organization_id`, `workspace_id`, " 87 "or `connection_ids`." 88 ), 89 context={ 90 "organization_id": organization_id, 91 "workspace_id": workspace_id, 92 "connection_ids": connection_ids, 93 }, 94 ) 95 96 97def _validate_sync_activity_window( 98 *, 99 start_at: datetime, 100 end_at: datetime, 101) -> tuple[datetime, datetime]: 102 """Validate that `start_at` and `end_at` describe a usable window. 103 104 Returns the timestamps normalized to UTC. Raises `PyAirbyteInputError` for 105 naive timestamps or inverted ranges. No clock-relative caps are enforced 106 here; the caller is trusted to choose a sensible window. 107 """ 108 if start_at.tzinfo is None or end_at.tzinfo is None: 109 raise PyAirbyteInputError( 110 message="`start_at` and `end_at` must include timezone information.", 111 context={ 112 "start_at": start_at.isoformat(), 113 "end_at": end_at.isoformat(), 114 }, 115 ) 116 117 normalized_start = start_at.astimezone(timezone.utc) 118 normalized_end = end_at.astimezone(timezone.utc) 119 120 if normalized_start >= normalized_end: 121 raise PyAirbyteInputError( 122 message="`start_at` must be earlier than `end_at`.", 123 context={ 124 "start_at": normalized_start.isoformat(), 125 "end_at": normalized_end.isoformat(), 126 }, 127 ) 128 return normalized_start, normalized_end 129 130 131# ============================================================================= 132# Pydantic Models for MCP Tool Responses 133# ============================================================================= 134 135 136class OrganizationSearchHit(BaseModel): 137 """A single organization returned by a name/email search.""" 138 139 organization_id: str = Field(description="The organization UUID") 140 organization_name: str = Field(description="The name of the organization") 141 email: str | None = Field( 142 default=None, description="The email address associated with the organization" 143 ) 144 customer_tier: str | None = Field( 145 default=None, 146 description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.", 147 ) 148 149 150class OrganizationSearchResult(BaseModel): 151 """Result of searching organizations by name substring.""" 152 153 name_contains: str = Field(description="The search substring that was used") 154 total_found: int = Field(description="Total number of organizations matching") 155 organizations: list[OrganizationSearchHit] = Field( 156 description="List of matching organizations" 157 ) 158 159 160class WorkspaceInfo(BaseModel): 161 """Information about a workspace.""" 162 163 organization_id: str = Field(description="The organization UUID") 164 workspace_id: str = Field(description="The workspace UUID") 165 workspace_name: str = Field(description="The name of the workspace") 166 slug: str | None = Field( 167 default=None, description="The workspace slug (URL-friendly identifier)" 168 ) 169 email: str | None = Field( 170 default=None, description="The email address associated with the workspace" 171 ) 172 dataplane_group_id: str | None = Field( 173 default=None, description="The dataplane group UUID (region)" 174 ) 175 dataplane_name: str | None = Field( 176 default=None, description="The name of the dataplane (e.g., 'US', 'EU')" 177 ) 178 created_at: datetime | None = Field( 179 default=None, description="When the workspace was created" 180 ) 181 customer_tier: str | None = Field( 182 default=None, 183 description="Customer tier (TIER_0, TIER_1, or TIER_2). Enriched from BigQuery tier cache.", 184 ) 185 is_eu: bool | None = Field( 186 default=None, 187 description="Whether the workspace is in the EU region (derived from dataplane_name).", 188 ) 189 190 191class WorkspaceSearchResult(BaseModel): 192 """Result of searching workspaces by name or email domain.""" 193 194 name_contains: str | None = Field( 195 default=None, description="The name substring that was searched for" 196 ) 197 email_domain: str | None = Field( 198 default=None, 199 description="The email domain that was searched for (e.g., 'motherduck.com')", 200 ) 201 total_workspaces_found: int = Field( 202 description="Total number of workspaces matching" 203 ) 204 unique_organization_ids: list[str] = Field( 205 description="List of unique organization IDs found" 206 ) 207 workspaces: list[WorkspaceInfo] = Field(description="List of matching workspaces") 208 209 210# Keep backward-compatible alias for any external references 211WorkspacesByEmailDomainResult = WorkspaceSearchResult 212 213 214class LatestAttemptBreakdown(BaseModel): 215 """Breakdown of connections by latest attempt status.""" 216 217 succeeded: int = Field( 218 default=0, description="Connections where latest attempt succeeded" 219 ) 220 failed: int = Field( 221 default=0, description="Connections where latest attempt failed" 222 ) 223 cancelled: int = Field( 224 default=0, description="Connections where latest attempt was cancelled" 225 ) 226 running: int = Field( 227 default=0, description="Connections where latest attempt is still running" 228 ) 229 unknown: int = Field( 230 default=0, 231 description="Connections with no recent attempts in the lookback window", 232 ) 233 234 235class VersionPinStats(BaseModel): 236 """Stats for connections pinned to a specific version.""" 237 238 pinned_version_id: str | None = Field( 239 description="The connector version UUID (None for unpinned connections)" 240 ) 241 docker_image_tag: str | None = Field( 242 default=None, description="The docker image tag for this version" 243 ) 244 total_connections: int = Field(description="Total number of connections") 245 enabled_connections: int = Field( 246 description="Number of enabled (active status) connections" 247 ) 248 active_connections: int = Field( 249 description="Number of connections with recent sync activity" 250 ) 251 latest_attempt: LatestAttemptBreakdown = Field( 252 description="Breakdown by latest attempt status" 253 ) 254 255 256class ConnectorConnectionStats(BaseModel): 257 """Aggregate connection stats for a connector.""" 258 259 connector_definition_id: str = Field(description="The connector definition UUID") 260 connector_type: str = Field(description="'source' or 'destination'") 261 canonical_name: str | None = Field( 262 default=None, description="The canonical connector name if resolved" 263 ) 264 total_connections: int = Field( 265 description="Total number of non-deprecated connections" 266 ) 267 enabled_connections: int = Field( 268 description="Number of enabled (active status) connections" 269 ) 270 active_connections: int = Field( 271 description="Number of connections with recent sync activity" 272 ) 273 pinned_connections: int = Field( 274 description="Number of connections with explicit version pins" 275 ) 276 unpinned_connections: int = Field( 277 description="Number of connections on default version" 278 ) 279 latest_attempt: LatestAttemptBreakdown = Field( 280 description="Overall breakdown by latest attempt status" 281 ) 282 by_version: list[VersionPinStats] = Field( 283 description="Stats broken down by pinned version" 284 ) 285 286 287class ConnectorConnectionStatsResponse(BaseModel): 288 """Response containing connection stats for multiple connectors.""" 289 290 sources: list[ConnectorConnectionStats] = Field( 291 default_factory=list, description="Stats for source connectors" 292 ) 293 destinations: list[ConnectorConnectionStats] = Field( 294 default_factory=list, description="Stats for destination connectors" 295 ) 296 lookback_days: int = Field( 297 description="Lookback window used for 'active' connections" 298 ) 299 generated_at: datetime = Field(description="When this response was generated") 300 301 302def _opt_str(value: Any) -> str | None: 303 """Convert a nullable value to str, returning None if the value is None/falsy.""" 304 return str(value) if value else None 305 306 307@mcp_tool( 308 read_only=True, 309 idempotent=True, 310) 311def query_prod_dataplanes() -> list[dict[str, Any]]: 312 """List all dataplane groups with workspace counts. 313 314 Returns information about all active dataplane groups in Airbyte Cloud, 315 including the number of workspaces in each. Useful for understanding 316 the distribution of workspaces across regions (US, US-Central, EU). 317 318 Returns list of dicts with keys: dataplane_group_id, dataplane_name, 319 organization_id, enabled, tombstone, created_at, workspace_count 320 """ 321 return query_dataplanes_list() 322 323 324@mcp_tool( 325 read_only=True, 326 idempotent=True, 327) 328def query_prod_workspace_info( 329 workspace_id: Annotated[ 330 str | WorkspaceAliasEnum, 331 Field( 332 description="Workspace UUID or alias to look up. " 333 "Accepts '@devin-ai-sandbox' as an alias for the Devin AI sandbox workspace." 334 ), 335 ], 336) -> dict[str, Any] | None: 337 """Get workspace information including dataplane group. 338 339 Returns details about a specific workspace, including which dataplane 340 (region) it belongs to. Useful for determining if a workspace is in 341 the EU region for filtering purposes. 342 343 Returns dict with keys: workspace_id, workspace_name, slug, organization_id, 344 dataplane_group_id, dataplane_name, created_at, tombstone 345 Or None if workspace not found. 346 """ 347 # Resolve workspace ID alias (workspace_id is required, so resolved value is never None) 348 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 349 assert resolved_workspace_id is not None # Type narrowing: workspace_id is required 350 351 return query_workspace_info(resolved_workspace_id) 352 353 354@mcp_tool( 355 read_only=True, 356 idempotent=True, 357) 358def query_prod_connector_versions( 359 connector_definition_id: Annotated[ 360 str, 361 Field(description="Connector definition UUID to list versions for"), 362 ], 363) -> list[dict[str, Any]]: 364 """List all versions for a connector definition. 365 366 Returns all published versions of a connector, ordered by last_published 367 date descending. Useful for understanding version history and finding 368 specific version IDs for pinning or rollout monitoring. 369 370 Returns list of dicts with keys: version_id, docker_image_tag, docker_repository, 371 release_stage, support_level, cdk_version, language, last_published, release_date 372 """ 373 return query_connector_versions(connector_definition_id) 374 375 376@mcp_tool( 377 read_only=True, 378 idempotent=True, 379) 380def query_prod_new_connector_releases( 381 days: Annotated[ 382 int, 383 Field(description="Number of days to look back (default: 7)", default=7), 384 ] = 7, 385 limit: Annotated[ 386 int, 387 Field(description="Maximum number of results (default: 100)", default=100), 388 ] = 100, 389) -> list[dict[str, Any]]: 390 """List recently published connector versions. 391 392 Returns connector versions published within the specified number of days. 393 Uses last_published timestamp which reflects when the version was actually 394 deployed to the registry (not the changelog date). 395 396 Returns list of dicts with keys: version_id, connector_definition_id, docker_repository, 397 docker_image_tag, last_published, release_date, release_stage, support_level, 398 cdk_version, language, created_at 399 """ 400 return query_new_connector_releases(days=days, limit=limit) 401 402 403@mcp_tool( 404 read_only=True, 405 idempotent=True, 406) 407def query_prod_actors_by_pinned_connector_version( 408 connector_version_id: Annotated[ 409 str, 410 Field(description="Connector version UUID to find pinned instances for"), 411 ], 412) -> list[dict[str, Any]]: 413 """List actors (sources/destinations) effectively pinned to a specific connector version. 414 415 Returns all actors that are effectively pinned to a specific connector version, 416 considering all scope levels: actor-level pins, workspace-level pins, and 417 organization-level pins (with actor > workspace > organization precedence). 418 Useful for monitoring rollouts and understanding which customers are affected. 419 420 The actor_id field is the actor ID (superset of source_id/destination_id). 421 422 Returns list of dicts with keys: actor_id, connector_definition_id, origin_type, 423 origin, description, created_at, expires_at, pin_scope_type, actor_name, 424 workspace_id, workspace_name, organization_id, dataplane_group_id, dataplane_name 425 426 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 427 level the effective pin came from. 428 """ 429 return query_actors_pinned_to_version(connector_version_id) 430 431 432@mcp_tool( 433 read_only=True, 434 idempotent=True, 435) 436def query_prod_recent_syncs_for_connector_version( 437 connector_version_id: Annotated[ 438 str | None, 439 Field( 440 description=( 441 "Connector version UUID. Provide this OR " 442 "connector_name + connector_version." 443 ), 444 default=None, 445 ), 446 ] = None, 447 connector_name: Annotated[ 448 str | None, 449 Field( 450 description=( 451 "Canonical connector name (e.g. `source-pokeapi`, " 452 "`destination-duckdb`). Used with `connector_version` to " 453 "resolve the version UUID." 454 ), 455 default=None, 456 ), 457 ] = None, 458 connector_version: Annotated[ 459 str | None, 460 Field( 461 description=( 462 "Semver version tag (e.g. `0.3.59`). " 463 "Used with `connector_name` to resolve the version UUID." 464 ), 465 default=None, 466 ), 467 ] = None, 468 days: Annotated[ 469 int, 470 Field(description="Number of days to look back (default: 7)", default=7), 471 ] = 7, 472 limit: Annotated[ 473 int, 474 Field(description="Maximum number of results (default: 100)", default=100), 475 ] = 100, 476 successful_only: Annotated[ 477 bool, 478 Field( 479 description="If `True`, only return successful syncs (default: `False`)", 480 default=False, 481 ), 482 ] = False, 483) -> list[dict[str, Any]]: 484 """List sync jobs that were run with a specific connector version. 485 486 Works for both source and destination connectors. Automatically detects 487 the connector type from the version metadata and uses the appropriate 488 query variant. 489 490 Accepts either `connector_version_id` (UUID) or `connector_name` + 491 `connector_version` (e.g. `source-pokeapi` + `0.3.59`). When using 492 name + version, the `docker_repository` is derived from the canonical 493 name (e.g. `source-pokeapi` → `airbyte/source-pokeapi`). 494 495 Filters on the version stamped into `jobs.config` at job-creation time, 496 not the current pin state. This avoids false positives (pre-pin syncs 497 counted as RC) and false negatives (post-unpin syncs missed). 498 499 Pin columns (`pin_origin_type`, `pin_origin`, `pin_scope_type`) are 500 still included as informational output but are not used for filtering. 501 502 Returns list of dicts with keys: `job_id`, `connection_id`, `job_status`, 503 `started_at`, `job_updated_at`, `connection_name`, `actor_id`, `actor_name`, 504 `actor_definition_id`, `source_definition_version_id`, 505 `destination_definition_version_id`, `pin_origin_type`, 506 `pin_origin`, `pin_scope_type`, `workspace_id`, `workspace_name`, 507 `organization_id`, `dataplane_group_id`, `dataplane_name`. 508 """ 509 # Resolve inputs to a version UUID and connector type. 510 if connector_version_id is not None: 511 version_info = resolve_version_info(connector_version_id) 512 docker_repository = version_info["docker_repository"] 513 elif connector_name is not None and connector_version is not None: 514 # Derive docker_repository from canonical name. 515 docker_repository = f"airbyte/{connector_name}" 516 version_info = resolve_version_id_by_tag( 517 docker_repository=docker_repository, 518 docker_image_tag=connector_version, 519 ) 520 connector_version_id = version_info["version_id"] 521 else: 522 raise PyAirbyteInputError( 523 message=( 524 "Provide either `connector_version_id` or both " 525 "`connector_name` and `connector_version`." 526 ), 527 ) 528 529 is_destination = not is_source_connector(docker_repository) 530 return query_syncs_for_connector_version( 531 connector_version_id, 532 is_destination=is_destination, 533 days=days, 534 limit=limit, 535 successful_only=successful_only, 536 ) 537 538 539@mcp_tool( 540 read_only=True, 541 idempotent=True, 542 open_world=True, 543) 544def query_prod_recent_syncs_for_connector( 545 source_definition_id: Annotated[ 546 str | None, 547 Field( 548 description=( 549 "Source connector definition ID (UUID) to search for. " 550 "Provide this OR source_canonical_name OR destination_definition_id " 551 "OR destination_canonical_name (exactly one required). " 552 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 553 ), 554 default=None, 555 ), 556 ], 557 source_canonical_name: Annotated[ 558 str | None, 559 Field( 560 description=( 561 "Canonical source connector name to search for. " 562 "Provide this OR source_definition_id OR destination_definition_id " 563 "OR destination_canonical_name (exactly one required). " 564 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 565 ), 566 default=None, 567 ), 568 ], 569 destination_definition_id: Annotated[ 570 str | None, 571 Field( 572 description=( 573 "Destination connector definition ID (UUID) to search for. " 574 "Provide this OR destination_canonical_name OR source_definition_id " 575 "OR source_canonical_name (exactly one required). " 576 "Example: '94bd199c-2ff0-4aa2-b98e-17f0acb72610' for DuckDB." 577 ), 578 default=None, 579 ), 580 ], 581 destination_canonical_name: Annotated[ 582 str | None, 583 Field( 584 description=( 585 "Canonical destination connector name to search for. " 586 "Provide this OR destination_definition_id OR source_definition_id " 587 "OR source_canonical_name (exactly one required). " 588 "Examples: 'destination-duckdb', 'DuckDB'." 589 ), 590 default=None, 591 ), 592 ], 593 status_filter: Annotated[ 594 StatusFilter, 595 Field( 596 description=( 597 "Filter by job status: 'all' (default), 'succeeded', or 'failed'. " 598 "Use 'succeeded' to find healthy connections with recent successful syncs. " 599 "Use 'failed' to find connections with recent failures." 600 ), 601 default=StatusFilter.ALL, 602 ), 603 ], 604 organization_id: Annotated[ 605 str | OrganizationAliasEnum | None, 606 Field( 607 description=( 608 "Optional organization ID (UUID) or alias to filter results. " 609 "If provided, only syncs from this organization will be returned. " 610 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 611 ), 612 default=None, 613 ), 614 ], 615 lookback_days: Annotated[ 616 int, 617 Field(description="Number of days to look back (default: 7)", default=7), 618 ], 619 limit: Annotated[ 620 int, 621 Field(description="Maximum number of results (default: 100)", default=100), 622 ], 623 customer_tier_filter: Annotated[ 624 TierFilter, 625 Field( 626 description=( 627 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 628 "Filters results to only include connections belonging to organizations " 629 "in the specified tier. Use 'ALL' to include all tiers." 630 ), 631 ), 632 ] = "TIER_2", 633 *, 634 exclude_pinned: Annotated[ 635 bool, 636 Field( 637 description=( 638 "If True, exclude syncs for actors that are already pinned to a " 639 "specific version (at any scope level: actor, workspace, or organization). " 640 "Useful for 'prove fix' workflows where you want to find unpinned " 641 "connections for live testing. Default: False (include all syncs)." 642 ), 643 default=False, 644 ), 645 ], 646 enabled_schedules_only: Annotated[ 647 bool, 648 Field( 649 description=( 650 "If True, only return syncs for connections that are both active " 651 "(not paused/inactive) and on an automated sync schedule " 652 "(not manual-trigger-only). Useful for canary workflows where " 653 "you need connections that will produce organic syncs during a " 654 "monitoring window. Default: False (include all connections)." 655 ), 656 default=False, 657 ), 658 ], 659) -> list[dict[str, Any]]: 660 """List recent sync jobs for ALL actors using a connector type. 661 662 This tool finds all actors with the given connector definition and returns their 663 recent sync jobs, regardless of whether they have explicit version pins. It filters 664 out deleted actors, deleted workspaces, and deprecated connections. 665 666 Results are always enriched with customer_tier and is_eu fields. 667 The customer_tier_filter parameter is required to ensure tier-aware querying. 668 669 Use this tool to: 670 - Find healthy connections with recent successful syncs (status_filter='succeeded') 671 - Investigate connector issues across all users (status_filter='failed') 672 - Get an overview of all recent sync activity (status_filter='all') 673 674 Set `exclude_pinned=True` to filter out syncs for actors that are already pinned to a 675 specific version. This is useful for 'prove fix' live connection testing workflows 676 where you want to find unpinned connections to test against. 677 678 Set `enabled_schedules_only=True` to restrict results to connections that are both 679 enabled (status='active') and on an automated schedule (not manual-trigger-only). 680 This is useful for canary prerelease workflows where you need connections that 681 will run organically during the monitoring window. 682 683 Supports both SOURCE and DESTINATION connectors. Provide exactly one of: 684 source_definition_id, source_canonical_name, destination_definition_id, 685 or destination_canonical_name. 686 687 Key fields in results: 688 - job_status: 'succeeded', 'failed', 'cancelled', etc. 689 - connection_id, connection_name: The connection that ran the sync 690 - actor_id, actor_name: The source or destination actor 691 - customer_tier: TIER_0, TIER_1, or TIER_2 692 - is_eu: Whether the workspace is in the EU region 693 - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned) 694 - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned) 695 """ 696 # Validate that exactly one connector parameter is provided 697 provided_params = [ 698 source_definition_id, 699 source_canonical_name, 700 destination_definition_id, 701 destination_canonical_name, 702 ] 703 num_provided = sum(p is not None for p in provided_params) 704 if num_provided != 1: 705 raise PyAirbyteInputError( 706 message=( 707 "Exactly one of source_definition_id, source_canonical_name, " 708 "destination_definition_id, or destination_canonical_name must be provided." 709 ), 710 ) 711 712 # Determine if this is a destination connector 713 is_destination = ( 714 destination_definition_id is not None or destination_canonical_name is not None 715 ) 716 717 # Resolve canonical name to definition ID if needed 718 resolved_definition_id: str 719 if source_canonical_name: 720 resolved_definition_id = resolve_canonical_name_to_definition_id( 721 canonical_name=source_canonical_name, 722 ) 723 elif destination_canonical_name: 724 resolved_definition_id = resolve_canonical_name_to_definition_id( 725 canonical_name=destination_canonical_name, 726 ) 727 elif source_definition_id: 728 resolved_definition_id = source_definition_id 729 else: 730 # We've validated exactly one param is provided, so this must be set 731 assert destination_definition_id is not None 732 resolved_definition_id = destination_definition_id 733 734 # Resolve organization ID alias 735 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 736 737 rows = query_recent_syncs_for_connector( 738 connector_definition_id=resolved_definition_id, 739 is_destination=is_destination, 740 status_filter=status_filter, 741 organization_id=resolved_organization_id, 742 days=lookback_days, 743 limit=limit, 744 exclude_pinned=exclude_pinned, 745 enabled_schedules_only=enabled_schedules_only, 746 ) 747 748 enriched = enrich_rows_by_org(rows) 749 return filter_rows_by_tier(enriched, customer_tier_filter) 750 751 752@mcp_tool( 753 read_only=True, 754 idempotent=True, 755 open_world=True, 756) 757def query_prod_failed_sync_attempts_for_connector( 758 source_definition_id: Annotated[ 759 str | None, 760 Field( 761 description=( 762 "Source connector definition ID (UUID) to search for. " 763 "Exactly one of this or source_canonical_name is required. " 764 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 765 ), 766 default=None, 767 ), 768 ] = None, 769 source_canonical_name: Annotated[ 770 str | None, 771 Field( 772 description=( 773 "Canonical source connector name to search for. " 774 "Exactly one of this or source_definition_id is required. " 775 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 776 ), 777 default=None, 778 ), 779 ] = None, 780 organization_id: Annotated[ 781 str | OrganizationAliasEnum | None, 782 Field( 783 description=( 784 "Optional organization ID (UUID) or alias to filter results. " 785 "If provided, only failed attempts from this organization will be returned. " 786 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 787 ), 788 default=None, 789 ), 790 ] = None, 791 lookback_days: Annotated[ 792 int, 793 Field(description="Number of days to look back (default: 7)", default=7), 794 ] = 7, 795 limit: Annotated[ 796 int, 797 Field(description="Maximum number of results (default: 100)", default=100), 798 ] = 100, 799 customer_tier_filter: Annotated[ 800 TierFilter, 801 Field( 802 description=( 803 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 804 "Filters results to only include connections belonging to organizations " 805 "in the specified tier. Use 'ALL' to include all tiers." 806 ), 807 ), 808 ] = "TIER_2", 809) -> list[dict[str, Any]]: 810 """List failed sync attempts for ALL actors using a source connector type. 811 812 This tool finds all actors with the given connector definition and returns their 813 failed sync attempts, regardless of whether they have explicit version pins. 814 815 Results are always enriched with customer_tier and is_eu fields. 816 The customer_tier_filter parameter is required to ensure tier-aware querying. 817 818 This is useful for investigating connector issues across all users. Use this when 819 you want to find failures for a connector type regardless of which version users 820 are on. 821 822 Note: This tool only supports SOURCE connectors. For destination connectors, 823 a separate tool would be needed. 824 825 Key fields in results: 826 - failure_summary: JSON containing failure details including failureType and messages 827 - customer_tier: TIER_0, TIER_1, or TIER_2 828 - is_eu: Whether the workspace is in the EU region 829 - pin_origin_type, pin_origin, pinned_version_id: Version pin context (NULL if not pinned) 830 - pin_scope_type: 'actor', 'workspace', or 'organization' (NULL if not pinned) 831 """ 832 # Validate that exactly one of the two parameters is provided 833 if (source_definition_id is None) == (source_canonical_name is None): 834 raise PyAirbyteInputError( 835 message=( 836 "Exactly one of source_definition_id or source_canonical_name " 837 "must be provided, but not both." 838 ), 839 ) 840 841 # Resolve canonical name to definition ID if needed 842 resolved_definition_id: str 843 if source_canonical_name: 844 resolved_definition_id = resolve_canonical_name_to_definition_id( 845 canonical_name=source_canonical_name, 846 ) 847 else: 848 resolved_definition_id = source_definition_id # type: ignore[assignment] 849 850 # Resolve organization ID alias 851 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 852 853 rows = query_failed_sync_attempts_for_connector( 854 connector_definition_id=resolved_definition_id, 855 organization_id=resolved_organization_id, 856 days=lookback_days, 857 limit=limit, 858 ) 859 enriched = enrich_rows_by_org(rows) 860 return filter_rows_by_tier(enriched, customer_tier_filter) 861 862 863@mcp_tool( 864 read_only=True, 865 idempotent=True, 866 open_world=True, 867) 868def query_prod_connections_by_connector( 869 source_definition_id: Annotated[ 870 str | None, 871 Field( 872 description=( 873 "Source connector definition ID (UUID) to search for. " 874 "Exactly one of source_definition_id, source_canonical_name, " 875 "destination_definition_id, or destination_canonical_name is required. " 876 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 877 ), 878 default=None, 879 ), 880 ] = None, 881 source_canonical_name: Annotated[ 882 str | None, 883 Field( 884 description=( 885 "Canonical source connector name to search for. " 886 "Exactly one of source_definition_id, source_canonical_name, " 887 "destination_definition_id, or destination_canonical_name is required. " 888 "Examples: 'source-youtube-analytics', 'YouTube Analytics'." 889 ), 890 default=None, 891 ), 892 ] = None, 893 destination_definition_id: Annotated[ 894 str | None, 895 Field( 896 description=( 897 "Destination connector definition ID (UUID) to search for. " 898 "Exactly one of source_definition_id, source_canonical_name, " 899 "destination_definition_id, or destination_canonical_name is required. " 900 "Example: 'e5c8e66c-a480-4a5e-9c0e-e8e5e4c5c5c5' for DuckDB." 901 ), 902 default=None, 903 ), 904 ] = None, 905 destination_canonical_name: Annotated[ 906 str | None, 907 Field( 908 description=( 909 "Canonical destination connector name to search for. " 910 "Exactly one of source_definition_id, source_canonical_name, " 911 "destination_definition_id, or destination_canonical_name is required. " 912 "Examples: 'destination-duckdb', 'DuckDB'." 913 ), 914 default=None, 915 ), 916 ] = None, 917 organization_id: Annotated[ 918 str | OrganizationAliasEnum | None, 919 Field( 920 description=( 921 "Optional organization ID (UUID) or alias to filter results. " 922 "If provided, only connections in this organization will be returned. " 923 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 924 ), 925 default=None, 926 ), 927 ] = None, 928 limit: Annotated[ 929 int, 930 Field(description="Maximum number of results (default: 1000)", default=1000), 931 ] = 1000, 932 customer_tier_filter: Annotated[ 933 TierFilter, 934 Field( 935 description=( 936 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 937 "Filters results to only include connections belonging to organizations " 938 "in the specified tier. Use 'ALL' to include all tiers." 939 ), 940 ), 941 ] = "TIER_2", 942 *, 943 exclude_pinned: Annotated[ 944 bool, 945 Field( 946 description=( 947 "If True, exclude connections whose connector is already pinned to a " 948 "specific version (at any scope level: actor, workspace, or organization). " 949 "Useful for 'prove fix' workflows where you want to find unpinned " 950 "connections for live testing. Default: False (include all connections)." 951 ), 952 default=False, 953 ), 954 ], 955 enabled_schedules_only: Annotated[ 956 bool, 957 Field( 958 description=( 959 "If True, only return connections that are both active " 960 "(not paused/inactive) and on an automated sync schedule " 961 "(not manual-trigger-only). Useful for canary workflows where " 962 "you need connections that will produce organic syncs during a " 963 "monitoring window. Default: False (include all connections)." 964 ), 965 default=False, 966 ), 967 ], 968) -> list[dict[str, Any]]: 969 """Search for all connections using a specific source or destination connector type. 970 971 This tool queries the Airbyte Cloud Prod DB Replica directly for fast results. 972 It finds all connections where the source or destination connector matches the 973 specified type, regardless of how the connector is named by users. 974 975 Results are always enriched with customer_tier and is_eu fields. 976 The customer_tier_filter parameter is required to ensure tier-aware querying. 977 978 Optionally filter by organization_id to limit results to a specific organization. 979 Use '@airbyte-internal' as an alias for the Airbyte internal organization. 980 981 Set `exclude_pinned=True` to filter out connections that are already pinned to a 982 specific version. This is useful for 'prove fix' live connection testing workflows 983 where you want to find unpinned connections to test against. 984 985 Set `enabled_schedules_only=True` to restrict results to connections that are both 986 enabled (status='active') and on an automated schedule (not manual-trigger-only). 987 This is useful for canary prerelease workflows where you need connections that 988 will run organically during the monitoring window. 989 990 Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. 991 For source queries, returns: connection_id, connection_name, connection_url, source_id, 992 source_name, source_definition_id, workspace_id, workspace_name, organization_id, 993 dataplane_group_id, dataplane_name, pin_origin_type, pin_origin, pinned_version_id, 994 pin_scope_type, customer_tier, is_eu. 995 For destination queries, returns: connection_id, connection_name, connection_url, 996 destination_id, destination_name, destination_definition_id, workspace_id, 997 workspace_name, organization_id, dataplane_group_id, dataplane_name, pin_origin_type, 998 pin_origin, pinned_version_id, pin_scope_type, customer_tier, is_eu. 999 1000 pin_scope_type is 'actor', 'workspace', or 'organization' indicating which scope 1001 level the effective pin came from (NULL if not pinned). 1002 """ 1003 # Validate that exactly one of the four connector parameters is provided 1004 provided_params = [ 1005 source_definition_id, 1006 source_canonical_name, 1007 destination_definition_id, 1008 destination_canonical_name, 1009 ] 1010 num_provided = sum(p is not None for p in provided_params) 1011 if num_provided != 1: 1012 raise PyAirbyteInputError( 1013 message=( 1014 "Exactly one of source_definition_id, source_canonical_name, " 1015 "destination_definition_id, or destination_canonical_name must be provided." 1016 ), 1017 ) 1018 1019 # Determine if this is a source or destination query and resolve the definition ID 1020 is_source_query = ( 1021 source_definition_id is not None or source_canonical_name is not None 1022 ) 1023 resolved_definition_id: str 1024 1025 if source_canonical_name: 1026 resolved_definition_id = resolve_canonical_name_to_definition_id( 1027 canonical_name=source_canonical_name, 1028 ) 1029 elif source_definition_id: 1030 resolved_definition_id = source_definition_id 1031 elif destination_canonical_name: 1032 resolved_definition_id = resolve_canonical_name_to_definition_id( 1033 canonical_name=destination_canonical_name, 1034 ) 1035 else: 1036 resolved_definition_id = destination_definition_id # type: ignore[assignment] 1037 1038 # Resolve organization ID alias 1039 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 1040 1041 # Query the database based on connector type 1042 if is_source_query: 1043 rows = [ 1044 { 1045 "organization_id": str(row.get("organization_id", "")), 1046 "workspace_id": str(row["workspace_id"]), 1047 "workspace_name": row.get("workspace_name", ""), 1048 "connection_id": str(row["connection_id"]), 1049 "connection_name": row.get("connection_name", ""), 1050 "connection_url": ( 1051 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 1052 f"/connections/{row['connection_id']}/status" 1053 ), 1054 "source_id": str(row["source_id"]), 1055 "source_name": row.get("source_name", ""), 1056 "source_definition_id": str(row["source_definition_id"]), 1057 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 1058 "dataplane_name": row.get("dataplane_name", ""), 1059 "pin_origin_type": row.get("pin_origin_type"), 1060 "pin_origin": row.get("pin_origin"), 1061 "pinned_version_id": _opt_str(row.get("pinned_version_id")), 1062 "pin_scope_type": row.get("pin_scope_type"), 1063 } 1064 for row in query_connections_by_connector( 1065 connector_definition_id=resolved_definition_id, 1066 organization_id=resolved_organization_id, 1067 limit=limit, 1068 exclude_pinned=exclude_pinned, 1069 enabled_schedules_only=enabled_schedules_only, 1070 ) 1071 ] 1072 else: 1073 # Destination query 1074 rows = [ 1075 { 1076 "organization_id": str(row.get("organization_id", "")), 1077 "workspace_id": str(row["workspace_id"]), 1078 "workspace_name": row.get("workspace_name", ""), 1079 "connection_id": str(row["connection_id"]), 1080 "connection_name": row.get("connection_name", ""), 1081 "connection_url": ( 1082 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 1083 f"/connections/{row['connection_id']}/status" 1084 ), 1085 "destination_id": str(row["destination_id"]), 1086 "destination_name": row.get("destination_name", ""), 1087 "destination_definition_id": str(row["destination_definition_id"]), 1088 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 1089 "dataplane_name": row.get("dataplane_name", ""), 1090 "pin_origin_type": row.get("pin_origin_type"), 1091 "pin_origin": row.get("pin_origin"), 1092 "pinned_version_id": _opt_str(row.get("pinned_version_id")), 1093 "pin_scope_type": row.get("pin_scope_type"), 1094 } 1095 for row in query_connections_by_destination_connector( 1096 connector_definition_id=resolved_definition_id, 1097 organization_id=resolved_organization_id, 1098 limit=limit, 1099 exclude_pinned=exclude_pinned, 1100 enabled_schedules_only=enabled_schedules_only, 1101 ) 1102 ] 1103 1104 enriched = enrich_rows_by_org(rows) 1105 return filter_rows_by_tier(enriched, customer_tier_filter) 1106 1107 1108@mcp_tool( 1109 read_only=True, 1110 idempotent=True, 1111 open_world=True, 1112) 1113def query_prod_connections_by_stream( 1114 stream_name: Annotated[ 1115 str, 1116 Field( 1117 description=( 1118 "Name of the stream to search for in connection catalogs. " 1119 "This must match the exact stream name as configured in the connection. " 1120 "Examples: 'global_exclusions', 'campaigns', 'users'." 1121 ), 1122 ), 1123 ], 1124 source_definition_id: Annotated[ 1125 str | None, 1126 Field( 1127 description=( 1128 "Source connector definition ID (UUID) to search for. " 1129 "Provide this OR source_canonical_name (exactly one required). " 1130 "Example: 'afa734e4-3571-11ec-991a-1e0031268139' for YouTube Analytics." 1131 ), 1132 default=None, 1133 ), 1134 ], 1135 source_canonical_name: Annotated[ 1136 str | None, 1137 Field( 1138 description=( 1139 "Canonical source connector name to search for. " 1140 "Provide this OR source_definition_id (exactly one required). " 1141 "Examples: 'source-klaviyo', 'Klaviyo', 'source-youtube-analytics'." 1142 ), 1143 default=None, 1144 ), 1145 ], 1146 organization_id: Annotated[ 1147 str | OrganizationAliasEnum | None, 1148 Field( 1149 description=( 1150 "Optional organization ID (UUID) or alias to filter results. " 1151 "If provided, only connections in this organization will be returned. " 1152 "Accepts '@airbyte-internal' as an alias for the Airbyte internal org." 1153 ), 1154 default=None, 1155 ), 1156 ], 1157 limit: Annotated[ 1158 int, 1159 Field(description="Maximum number of results (default: 100)", default=100), 1160 ], 1161 customer_tier_filter: Annotated[ 1162 TierFilter, 1163 Field( 1164 description=( 1165 "Required tier filter: 'TIER_0', 'TIER_1', 'TIER_2', or 'ALL'. " 1166 "Filters results to only include connections belonging to organizations " 1167 "in the specified tier. Use 'ALL' to include all tiers." 1168 ), 1169 ), 1170 ] = "TIER_2", 1171) -> list[dict[str, Any]]: 1172 """Find connections that have a specific stream enabled in their catalog. 1173 1174 This tool searches the connection's configured catalog (JSONB) for streams 1175 matching the specified name. It's particularly useful when validating 1176 connector fixes that affect specific streams - you can quickly find 1177 customer connections that use the affected stream. 1178 1179 Results are always enriched with customer_tier and is_eu fields. 1180 The customer_tier_filter parameter is required to ensure tier-aware querying. 1181 1182 Use cases: 1183 - Finding connections with a specific stream enabled for regression testing 1184 - Validating connector fixes that affect particular streams 1185 - Identifying which customers use rarely-enabled streams 1186 1187 Returns a list of connection dicts with workspace context and clickable Cloud UI URLs. 1188 """ 1189 provided_params = [source_definition_id, source_canonical_name] 1190 num_provided = sum(p is not None for p in provided_params) 1191 if num_provided != 1: 1192 raise PyAirbyteInputError( 1193 message=( 1194 "Exactly one of source_definition_id or source_canonical_name " 1195 "must be provided." 1196 ), 1197 ) 1198 1199 resolved_definition_id: str 1200 if source_canonical_name: 1201 resolved_definition_id = resolve_canonical_name_to_definition_id( 1202 canonical_name=source_canonical_name, 1203 ) 1204 else: 1205 assert source_definition_id is not None 1206 resolved_definition_id = source_definition_id 1207 1208 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 1209 1210 rows = [ 1211 { 1212 "organization_id": str(row.get("organization_id", "")), 1213 "workspace_id": str(row["workspace_id"]), 1214 "workspace_name": row.get("workspace_name", ""), 1215 "connection_id": str(row["connection_id"]), 1216 "connection_name": row.get("connection_name", ""), 1217 "connection_status": row.get("connection_status", ""), 1218 "connection_url": ( 1219 f"{CLOUD_UI_BASE_URL}/workspaces/{row['workspace_id']}" 1220 f"/connections/{row['connection_id']}/status" 1221 ), 1222 "source_id": str(row["source_id"]), 1223 "source_name": row.get("source_name", ""), 1224 "source_definition_id": str(row["source_definition_id"]), 1225 "dataplane_group_id": str(row.get("dataplane_group_id", "")), 1226 "dataplane_name": row.get("dataplane_name", ""), 1227 } 1228 for row in query_connections_by_stream( 1229 connector_definition_id=resolved_definition_id, 1230 stream_name=stream_name, 1231 organization_id=resolved_organization_id, 1232 limit=limit, 1233 ) 1234 ] 1235 enriched = enrich_rows_by_org(rows) 1236 return filter_rows_by_tier(enriched, customer_tier_filter) 1237 1238 1239@mcp_tool( 1240 read_only=True, 1241 idempotent=True, 1242) 1243def query_prod_organizations( 1244 name_contains: Annotated[ 1245 str, 1246 Field( 1247 description=( 1248 "Case-insensitive substring to search for in organization name or email. " 1249 "For example, 'acme' will match organizations named 'Acme Corp' or " 1250 "with email 'admin@acme.io'." 1251 ), 1252 ), 1253 ], 1254 limit: Annotated[ 1255 int, 1256 Field( 1257 description="Maximum number of organizations to return (default: 20)", 1258 default=20, 1259 ), 1260 ] = 20, 1261) -> OrganizationSearchResult: 1262 """Search organizations by name or email substring. 1263 1264 Performs a case-insensitive substring match on organization name and email. 1265 Use the returned `organization_id` values with other tools like 1266 `query_prod_connections_by_connector` or `lookup_customer_tiers`. 1267 """ 1268 rows = search_organizations(name_contains=name_contains, limit=limit) 1269 1270 orgs = [ 1271 OrganizationSearchHit( 1272 organization_id=str(row["organization_id"]), 1273 organization_name=row.get("organization_name", ""), 1274 email=row.get("email"), 1275 ) 1276 for row in rows 1277 ] 1278 1279 # Enrich with tier annotation 1280 org_ids = [o.organization_id for o in orgs] 1281 tier_results = {r.organization_id: r for r in get_org_tiers(org_ids)} 1282 for org in orgs: 1283 tier_result = tier_results.get(org.organization_id) 1284 if tier_result: 1285 org.customer_tier = tier_result.customer_tier 1286 1287 return OrganizationSearchResult( 1288 name_contains=name_contains, 1289 total_found=len(orgs), 1290 organizations=orgs, 1291 ) 1292 1293 1294@mcp_tool( 1295 read_only=True, 1296 idempotent=True, 1297) 1298def query_prod_workspaces( 1299 name_contains: Annotated[ 1300 str | None, 1301 Field( 1302 description=( 1303 "Case-insensitive substring to search for in workspace name or slug. " 1304 "For example, 'acme' will match workspaces named 'Acme Staging'." 1305 ), 1306 default=None, 1307 ), 1308 ] = None, 1309 email_domain: Annotated[ 1310 str | None, 1311 Field( 1312 description=( 1313 "Email domain to search for (e.g., 'motherduck.com'). " 1314 "Do not include the '@' symbol." 1315 ), 1316 default=None, 1317 ), 1318 ] = None, 1319 limit: Annotated[ 1320 int, 1321 Field( 1322 description="Maximum number of workspaces to return (default: 100)", 1323 default=100, 1324 ), 1325 ] = 100, 1326) -> WorkspaceSearchResult: 1327 """Search workspaces by name substring or email domain. 1328 1329 At least one of `name_contains` or `email_domain` must be provided. 1330 When `name_contains` is given, performs a case-insensitive substring match 1331 on workspace name and slug. When `email_domain` is given, matches 1332 workspaces by user email domain. 1333 1334 The returned organization IDs can be used with other tools like 1335 `query_prod_connections_by_connector` to find connections within 1336 those organizations for safe testing. 1337 """ 1338 if not name_contains and not email_domain: 1339 raise PyAirbyteInputError( 1340 message="At least one of `name_contains` or `email_domain` must be provided.", 1341 ) 1342 1343 if name_contains: 1344 rows = search_workspaces(name_contains=name_contains, limit=limit) 1345 else: 1346 assert email_domain is not None 1347 clean_domain = email_domain.lstrip("@") 1348 rows = query_workspaces_by_email_domain(email_domain=clean_domain, limit=limit) 1349 1350 workspaces = [ 1351 WorkspaceInfo( 1352 organization_id=str(row["organization_id"]), 1353 workspace_id=str(row["workspace_id"]), 1354 workspace_name=row.get("workspace_name", ""), 1355 slug=row.get("slug"), 1356 email=row.get("email"), 1357 dataplane_group_id=_opt_str(row.get("dataplane_group_id")), 1358 dataplane_name=row.get("dataplane_name"), 1359 created_at=row.get("created_at"), 1360 ) 1361 for row in rows 1362 ] 1363 1364 # Enrich with tier annotation (annotation only, no filtering) 1365 unique_org_ids = list(dict.fromkeys(w.organization_id for w in workspaces)) 1366 tier_results = {r.organization_id: r for r in get_org_tiers(unique_org_ids)} 1367 for ws in workspaces: 1368 tier_result = tier_results.get(ws.organization_id) 1369 if tier_result: 1370 ws.customer_tier = tier_result.customer_tier 1371 ws.is_eu = ws.dataplane_name == "EU" if ws.dataplane_name else False 1372 1373 return WorkspaceSearchResult( 1374 name_contains=name_contains, 1375 email_domain=email_domain.lstrip("@") if email_domain else None, 1376 total_workspaces_found=len(workspaces), 1377 unique_organization_ids=unique_org_ids, 1378 workspaces=workspaces, 1379 ) 1380 1381 1382# Backward-compatible alias 1383query_prod_workspaces_by_email_domain = query_prod_workspaces 1384 1385 1386def _build_connector_stats( 1387 connector_definition_id: str, 1388 connector_type: str, 1389 canonical_name: str | None, 1390 rows: list[dict[str, Any]], 1391 version_tags: dict[str, str | None], 1392) -> ConnectorConnectionStats: 1393 """Build ConnectorConnectionStats from query result rows.""" 1394 # Aggregate totals across all version groups 1395 total_connections = 0 1396 enabled_connections = 0 1397 active_connections = 0 1398 pinned_connections = 0 1399 unpinned_connections = 0 1400 total_succeeded = 0 1401 total_failed = 0 1402 total_cancelled = 0 1403 total_running = 0 1404 total_unknown = 0 1405 1406 by_version: list[VersionPinStats] = [] 1407 1408 for row in rows: 1409 version_id = row.get("pinned_version_id") 1410 row_total = int(row.get("total_connections", 0)) 1411 row_enabled = int(row.get("enabled_connections", 0)) 1412 row_active = int(row.get("active_connections", 0)) 1413 row_pinned = int(row.get("pinned_connections", 0)) 1414 row_unpinned = int(row.get("unpinned_connections", 0)) 1415 row_succeeded = int(row.get("succeeded_connections", 0)) 1416 row_failed = int(row.get("failed_connections", 0)) 1417 row_cancelled = int(row.get("cancelled_connections", 0)) 1418 row_running = int(row.get("running_connections", 0)) 1419 row_unknown = int(row.get("unknown_connections", 0)) 1420 1421 total_connections += row_total 1422 enabled_connections += row_enabled 1423 active_connections += row_active 1424 pinned_connections += row_pinned 1425 unpinned_connections += row_unpinned 1426 total_succeeded += row_succeeded 1427 total_failed += row_failed 1428 total_cancelled += row_cancelled 1429 total_running += row_running 1430 total_unknown += row_unknown 1431 1432 by_version.append( 1433 VersionPinStats( 1434 pinned_version_id=str(version_id) if version_id else None, 1435 docker_image_tag=version_tags.get(str(version_id)) 1436 if version_id 1437 else None, 1438 total_connections=row_total, 1439 enabled_connections=row_enabled, 1440 active_connections=row_active, 1441 latest_attempt=LatestAttemptBreakdown( 1442 succeeded=row_succeeded, 1443 failed=row_failed, 1444 cancelled=row_cancelled, 1445 running=row_running, 1446 unknown=row_unknown, 1447 ), 1448 ) 1449 ) 1450 1451 return ConnectorConnectionStats( 1452 connector_definition_id=connector_definition_id, 1453 connector_type=connector_type, 1454 canonical_name=canonical_name, 1455 total_connections=total_connections, 1456 enabled_connections=enabled_connections, 1457 active_connections=active_connections, 1458 pinned_connections=pinned_connections, 1459 unpinned_connections=unpinned_connections, 1460 latest_attempt=LatestAttemptBreakdown( 1461 succeeded=total_succeeded, 1462 failed=total_failed, 1463 cancelled=total_cancelled, 1464 running=total_running, 1465 unknown=total_unknown, 1466 ), 1467 by_version=by_version, 1468 ) 1469 1470 1471@mcp_tool( 1472 read_only=True, 1473 idempotent=True, 1474 open_world=True, 1475) 1476def query_prod_connector_connection_stats( 1477 source_definition_ids: Annotated[ 1478 list[str] | None, 1479 Field( 1480 description=( 1481 "List of source connector definition IDs (UUIDs) to get stats for. " 1482 "Example: ['afa734e4-3571-11ec-991a-1e0031268139']" 1483 ), 1484 default=None, 1485 ), 1486 ] = None, 1487 destination_definition_ids: Annotated[ 1488 list[str] | None, 1489 Field( 1490 description=( 1491 "List of destination connector definition IDs (UUIDs) to get stats for. " 1492 "Example: ['94bd199c-2ff0-4aa2-b98e-17f0acb72610']" 1493 ), 1494 default=None, 1495 ), 1496 ] = None, 1497 lookback_days: Annotated[ 1498 int, 1499 Field( 1500 description=( 1501 "Number of days to look back for 'active' connections (default: 7). " 1502 "Connections with sync activity within this window are counted as active." 1503 ), 1504 default=7, 1505 ), 1506 ] = 7, 1507) -> ConnectorConnectionStatsResponse: 1508 """Get aggregate connection stats for multiple connectors. 1509 1510 Returns counts of connections grouped by pinned version for each connector, 1511 including: 1512 - Total, enabled, and active connection counts 1513 - Pinned vs unpinned breakdown 1514 - Latest attempt status breakdown (succeeded, failed, cancelled, running, unknown) 1515 1516 This tool is designed for release monitoring workflows. It allows you to: 1517 1. Query recently released connectors to identify which ones to monitor 1518 2. Get aggregate stats showing how many connections are using each version 1519 3. See health metrics (pass/fail) broken down by version 1520 1521 The `lookback_days` parameter controls the lookback window for: 1522 - Counting 'active' connections (those with recent sync activity) 1523 - Determining 'latest attempt status' (most recent attempt within the window) 1524 1525 Connections with no sync activity in the lookback window will have 1526 'unknown' status in the latest_attempt breakdown. 1527 """ 1528 # Initialize empty lists if None 1529 source_ids = source_definition_ids or [] 1530 destination_ids = destination_definition_ids or [] 1531 1532 if not source_ids and not destination_ids: 1533 raise PyAirbyteInputError( 1534 message=( 1535 "At least one of source_definition_ids or destination_definition_ids " 1536 "must be provided." 1537 ), 1538 ) 1539 1540 sources: list[ConnectorConnectionStats] = [] 1541 destinations: list[ConnectorConnectionStats] = [] 1542 1543 # Process source connectors 1544 for source_def_id in source_ids: 1545 # Get version info for tag lookup 1546 versions = query_connector_versions(source_def_id) 1547 version_tags = { 1548 str(v["version_id"]): v.get("docker_image_tag") for v in versions 1549 } 1550 1551 # Get aggregate stats 1552 rows = query_source_connection_stats(source_def_id, days=lookback_days) 1553 1554 sources.append( 1555 _build_connector_stats( 1556 connector_definition_id=source_def_id, 1557 connector_type="source", 1558 canonical_name=None, 1559 rows=rows, 1560 version_tags=version_tags, 1561 ) 1562 ) 1563 1564 # Process destination connectors 1565 for dest_def_id in destination_ids: 1566 # Get version info for tag lookup 1567 versions = query_connector_versions(dest_def_id) 1568 version_tags = { 1569 str(v["version_id"]): v.get("docker_image_tag") for v in versions 1570 } 1571 1572 # Get aggregate stats 1573 rows = query_destination_connection_stats(dest_def_id, days=lookback_days) 1574 1575 destinations.append( 1576 _build_connector_stats( 1577 connector_definition_id=dest_def_id, 1578 connector_type="destination", 1579 canonical_name=None, 1580 rows=rows, 1581 version_tags=version_tags, 1582 ) 1583 ) 1584 1585 return ConnectorConnectionStatsResponse( 1586 sources=sources, 1587 destinations=destinations, 1588 lookback_days=lookback_days, 1589 generated_at=datetime.now(timezone.utc), 1590 ) 1591 1592 1593# ============================================================================= 1594# Connector Rollout Models and Tools 1595# ============================================================================= 1596 1597 1598class ConnectorRolloutInfo(BaseModel): 1599 """Information about a connector rollout.""" 1600 1601 rollout_id: str = Field(description="The rollout UUID") 1602 actor_definition_id: str = Field(description="The connector definition UUID") 1603 state: str = Field( 1604 description="Rollout state: initialized, workflow_started, in_progress, " 1605 "paused, finalizing, succeeded, errored, failed_rolled_back, canceled" 1606 ) 1607 initial_rollout_pct: int | None = Field( 1608 default=None, description="Initial rollout percentage" 1609 ) 1610 current_target_rollout_pct: int | None = Field( 1611 default=None, description="Current target rollout percentage" 1612 ) 1613 final_target_rollout_pct: int | None = Field( 1614 default=None, description="Final target rollout percentage" 1615 ) 1616 has_breaking_changes: bool = Field( 1617 description="Whether the RC has breaking changes" 1618 ) 1619 max_step_wait_time_mins: int | None = Field( 1620 default=None, description="Maximum wait time between rollout steps in minutes" 1621 ) 1622 rollout_strategy: str | None = Field( 1623 default=None, description="Rollout strategy: manual, automated, overridden" 1624 ) 1625 updated_by_user_id: str | None = Field( 1626 default=None, 1627 description="User ID recorded as last updating the rollout", 1628 ) 1629 updated_by_user_name: str | None = Field( 1630 default=None, 1631 description="Name recorded as last updating the rollout", 1632 ) 1633 updated_by_user_email: str | None = Field( 1634 default=None, 1635 description="Email recorded as last updating the rollout", 1636 ) 1637 workflow_run_id: str | None = Field( 1638 default=None, description="Temporal workflow run ID" 1639 ) 1640 error_msg: str | None = Field(default=None, description="Error message if errored") 1641 failed_reason: str | None = Field( 1642 default=None, description="Reason for failure if failed" 1643 ) 1644 paused_reason: str | None = Field( 1645 default=None, description="Reason for pause if paused" 1646 ) 1647 tag: str | None = Field(default=None, description="Optional tag for the rollout") 1648 created_at: datetime | None = Field( 1649 default=None, description="When the rollout was created" 1650 ) 1651 updated_at: datetime | None = Field( 1652 default=None, description="When the rollout was last updated" 1653 ) 1654 completed_at: datetime | None = Field( 1655 default=None, description="When the rollout completed (if terminal)" 1656 ) 1657 expires_at: datetime | None = Field( 1658 default=None, description="When the rollout expires" 1659 ) 1660 rc_docker_image_tag: str | None = Field( 1661 default=None, description="Docker image tag of the release candidate" 1662 ) 1663 rc_docker_repository: str | None = Field( 1664 default=None, description="Docker repository of the release candidate" 1665 ) 1666 initial_docker_image_tag: str | None = Field( 1667 default=None, description="Docker image tag of the initial version" 1668 ) 1669 initial_docker_repository: str | None = Field( 1670 default=None, description="Docker repository of the initial version" 1671 ) 1672 filters: dict[str, Any] | None = Field( 1673 default=None, 1674 description="Raw rollout filters JSON (e.g., {'tierFilter': {'tier': 'TIER_0'}})", 1675 ) 1676 customer_tier: str | None = Field( 1677 default=None, 1678 description="Customer tier targeted by this rollout (extracted from filters), " 1679 "e.g., 'TIER_0', 'TIER_1'. None if no tier filter is set.", 1680 ) 1681 1682 1683def _parse_rollout_filters(filters_raw: Any) -> dict[str, Any] | None: 1684 """Parse the rollout filters field from a database row. 1685 1686 The filters field may be a JSON string, a dict, or None. 1687 """ 1688 if filters_raw is None: 1689 return None 1690 if isinstance(filters_raw, dict): 1691 return filters_raw 1692 if isinstance(filters_raw, str): 1693 try: 1694 parsed = json.loads(filters_raw) 1695 if isinstance(parsed, dict): 1696 return parsed 1697 except (json.JSONDecodeError, TypeError): 1698 pass 1699 return None 1700 1701 1702def _extract_tier_from_filters(filters_raw: Any) -> str | None: 1703 """Extract customer tier from rollout filters JSON. 1704 1705 Supports two formats: 1706 - Legacy: `{"tierFilter": {"tier": "TIER_0"}}` 1707 - Current: `{"customerTierFilters": [{"name": "TIER", "value": ["TIER_1"], "operator": "IN"}]}` 1708 """ 1709 parsed = _parse_rollout_filters(filters_raw) 1710 if parsed is None: 1711 return None 1712 1713 # Current format: customerTierFilters list 1714 tier_filters = parsed.get("customerTierFilters") 1715 if isinstance(tier_filters, list): 1716 for entry in tier_filters: 1717 if isinstance(entry, dict) and entry.get("name") == "TIER": 1718 values = entry.get("value") 1719 if isinstance(values, list) and len(values) == 1: 1720 return str(values[0]) 1721 if isinstance(values, list) and len(values) > 1: 1722 return ", ".join(str(v) for v in values) 1723 1724 # Legacy format: tierFilter dict 1725 tier_filter = parsed.get("tierFilter") 1726 if isinstance(tier_filter, dict): 1727 tier = tier_filter.get("tier") 1728 if isinstance(tier, str): 1729 return tier 1730 1731 return None 1732 1733 1734def _row_to_connector_rollout_info(row: dict[str, Any]) -> ConnectorRolloutInfo: 1735 """Convert a database row to a ConnectorRolloutInfo model.""" 1736 return ConnectorRolloutInfo( 1737 rollout_id=str(row["rollout_id"]), 1738 actor_definition_id=str(row["actor_definition_id"]), 1739 state=row["state"], 1740 initial_rollout_pct=row.get("initial_rollout_pct"), 1741 current_target_rollout_pct=row.get("current_target_rollout_pct"), 1742 final_target_rollout_pct=row.get("final_target_rollout_pct"), 1743 has_breaking_changes=row["has_breaking_changes"], 1744 max_step_wait_time_mins=row.get("max_step_wait_time_mins"), 1745 rollout_strategy=row.get("rollout_strategy"), 1746 updated_by_user_id=str(row["updated_by_user_id"]) 1747 if row.get("updated_by_user_id") is not None 1748 else None, 1749 updated_by_user_name=row.get("updated_by_user_name"), 1750 updated_by_user_email=row.get("updated_by_user_email"), 1751 workflow_run_id=row.get("workflow_run_id"), 1752 error_msg=row.get("error_msg"), 1753 failed_reason=row.get("failed_reason"), 1754 paused_reason=row.get("paused_reason"), 1755 tag=row.get("tag"), 1756 created_at=row.get("created_at"), 1757 updated_at=row.get("updated_at"), 1758 completed_at=row.get("completed_at"), 1759 expires_at=row.get("expires_at"), 1760 rc_docker_image_tag=row.get("rc_docker_image_tag"), 1761 rc_docker_repository=row.get("rc_docker_repository"), 1762 initial_docker_image_tag=row.get("initial_docker_image_tag"), 1763 initial_docker_repository=row.get("initial_docker_repository"), 1764 filters=_parse_rollout_filters(row.get("filters")), 1765 customer_tier=_extract_tier_from_filters(row.get("filters")), 1766 ) 1767 1768 1769@mcp_tool( 1770 read_only=True, 1771 idempotent=True, 1772) 1773def query_prod_connector_rollouts( 1774 actor_definition_id: Annotated[ 1775 str | None, 1776 Field(description="Connector definition UUID to filter by (optional)"), 1777 ] = None, 1778 rollout_id: Annotated[ 1779 str | None, 1780 Field(description="Specific rollout UUID to look up (optional)"), 1781 ] = None, 1782 active_only: Annotated[ 1783 bool, 1784 Field(description="If true, only return active (non-terminal) rollouts"), 1785 ] = False, 1786 limit: Annotated[ 1787 int, 1788 Field(description="Maximum number of results (default: 100)"), 1789 ] = 100, 1790) -> list[ConnectorRolloutInfo]: 1791 """Query connector rollouts with flexible filtering. 1792 1793 Returns rollouts based on the provided filters. If no filters are specified, 1794 returns all active rollouts. Useful for monitoring rollout status and history. 1795 1796 Filter behavior: 1797 - rollout_id: Returns that specific rollout (ignores other filters) 1798 - active_only: Returns only active (non-terminal) rollouts 1799 - actor_definition_id: Returns rollouts for that specific connector 1800 - No filters: Returns all active rollouts (same as active_only=True) 1801 """ 1802 rows = query_connector_rollouts( 1803 actor_definition_id=actor_definition_id, 1804 rollout_id=rollout_id, 1805 active_only=active_only, 1806 limit=limit, 1807 ) 1808 return [_row_to_connector_rollout_info(row) for row in rows] 1809 1810 1811@mcp_tool( 1812 read_only=True, 1813 idempotent=True, 1814 open_world=True, 1815) 1816def query_prod_connection_sync_activity( 1817 start_at: Annotated[ 1818 datetime, 1819 Field( 1820 description=( 1821 "Inclusive start timestamp for the sync activity window. " 1822 "Must be timezone-aware (ISO 8601 with offset or `Z`)." 1823 ), 1824 ), 1825 ], 1826 end_at: Annotated[ 1827 datetime, 1828 Field( 1829 description=( 1830 "Exclusive end timestamp for the sync activity window. " 1831 "Must be timezone-aware and strictly after `start_at`." 1832 ), 1833 ), 1834 ], 1835 organization_id: Annotated[ 1836 str | OrganizationAliasEnum | None, 1837 Field( 1838 description=( 1839 "Optional organization UUID or alias. At least one of " 1840 "`organization_id`, `workspace_id`, or `connection_ids` is " 1841 "required. Accepts `@airbyte-internal` as an alias for the " 1842 "Airbyte internal org." 1843 ), 1844 default=None, 1845 ), 1846 ] = None, 1847 workspace_id: Annotated[ 1848 str | WorkspaceAliasEnum | None, 1849 Field( 1850 description=( 1851 "Optional workspace UUID or alias. At least one of " 1852 "`organization_id`, `workspace_id`, or `connection_ids` is " 1853 "required. Accepts `@devin-ai-sandbox` as an alias for the " 1854 "Devin AI sandbox workspace." 1855 ), 1856 default=None, 1857 ), 1858 ] = None, 1859 connection_ids: Annotated[ 1860 list[str] | None, 1861 Field( 1862 description=( 1863 "Optional list of connection UUIDs. At least one of " 1864 "`organization_id`, `workspace_id`, or `connection_ids` is " 1865 "required." 1866 ), 1867 default=None, 1868 ), 1869 ] = None, 1870 status_filter: Annotated[ 1871 StatusFilter, 1872 Field( 1873 description=( 1874 "Filter by job status: `all` (default), `succeeded`, or " 1875 "`failed`. Applied to `jobs.status` in the Prod DB Replica." 1876 ), 1877 default=StatusFilter.ALL, 1878 ), 1879 ] = StatusFilter.ALL, 1880 limit: Annotated[ 1881 int, 1882 Field( 1883 description="Maximum number of attempt rows to return.", 1884 default=1000, 1885 ), 1886 ] = 1000, 1887) -> list[dict[str, Any]]: 1888 """List recent sync jobs and attempts from the Prod DB Replica. 1889 1890 Returns one row per `(job, attempt)` pair for sync jobs whose `updated_at` 1891 falls in `[start_at, end_at)`, scoped to the provided organization, 1892 workspace, or connection IDs. Designed for live operational lookups — 1893 e.g. "what happened on this connection in the last hour" — not for 1894 historical analysis. 1895 1896 Each row is enriched with `customer_tier` and `is_eu` for the owning 1897 organization. Tier filtering is intentionally not applied — this is a 1898 read-only observability query. 1899 1900 Input requirements: 1901 - At least one of `organization_id`, `workspace_id`, or `connection_ids` 1902 must be provided (any combination is accepted). 1903 - `start_at` and `end_at` must be timezone-aware and `start_at < end_at`. 1904 1905 Key fields in each row: 1906 - `job_id`, `attempt_id`, `attempt_number` 1907 - `job_status`, `attempt_status` 1908 - `job_started_at`, `job_updated_at`, `attempt_ended_at` 1909 - `failure_summary` (JSON; populated when an attempt failed) 1910 - `connection_id`, `connection_name`, `connection_status` 1911 - `source_actor_id`, `source_actor_name`, `source_actor_definition_id` 1912 - `destination_actor_id`, `destination_actor_name`, 1913 `destination_actor_definition_id` 1914 - `workspace_id`, `workspace_name`, `organization_id` 1915 - `dataplane_group_id`, `dataplane_name` 1916 - `customer_tier`, `is_eu` (added by tier enrichment) 1917 """ 1918 resolved_organization_id = OrganizationAliasEnum.resolve(organization_id) 1919 resolved_workspace_id = WorkspaceAliasEnum.resolve(workspace_id) 1920 _validate_sync_activity_scope( 1921 organization_id=resolved_organization_id, 1922 workspace_id=resolved_workspace_id, 1923 connection_ids=connection_ids, 1924 ) 1925 normalized_start_at, normalized_end_at = _validate_sync_activity_window( 1926 start_at=start_at, 1927 end_at=end_at, 1928 ) 1929 1930 rows = query_connection_sync_activity_from_prod( 1931 start_at=normalized_start_at, 1932 end_at=normalized_end_at, 1933 organization_id=resolved_organization_id, 1934 workspace_id=resolved_workspace_id, 1935 connection_ids=connection_ids, 1936 status_filter=status_filter.value, 1937 limit=limit, 1938 ) 1939 return enrich_rows_by_org(rows) 1940 1941 1942# ============================================================================= 1943# Pinned Connector Versions Models and Tools 1944# ============================================================================= 1945 1946 1947class PinnedConnectorVersionInfo(BaseModel): 1948 """A connector version that has at least one scoped configuration pin.""" 1949 1950 version_id: str = Field(description="The actor_definition_version UUID") 1951 connector_definition_id: str = Field(description="The connector definition UUID") 1952 connector_name: str = Field(description="Human-readable connector name") 1953 docker_repository: str = Field(description="Docker repository path") 1954 docker_image_tag: str = Field(description="Docker image tag for this version") 1955 last_published: str | None = Field( 1956 default=None, description="ISO timestamp when this version was last published" 1957 ) 1958 pin_count: int = Field( 1959 description="Total number of scoped_configuration rows pinning to this version" 1960 ) 1961 breaking_change_pins: int = Field( 1962 default=0, 1963 description="Number of actor-scoped pins created by breaking changes", 1964 ) 1965 rollout_pins: int = Field( 1966 default=0, 1967 description="Number of pins created by connector rollouts", 1968 ) 1969 actor_pins: int = Field( 1970 description="Number of actor-scoped pins (excludes breaking change and rollout pins)" 1971 ) 1972 workspace_pins: int = Field(description="Number of workspace-scoped pins") 1973 org_pins: int = Field(description="Number of organization-scoped pins") 1974 1975 1976@mcp_tool( 1977 read_only=True, 1978 idempotent=True, 1979 open_world=True, 1980) 1981def query_connector_pin_stats( 1982 connector_definition_id: Annotated[ 1983 str | None, 1984 Field( 1985 description="Connector definition UUID to filter by (optional). " 1986 "Mutually exclusive with `connector_canonical_name`." 1987 ), 1988 ] = None, 1989 connector_canonical_name: Annotated[ 1990 str | None, 1991 Field( 1992 description="Connector canonical name (e.g. `source-postgres`) to filter by. " 1993 "Resolved to a definition ID via the registry. " 1994 "Mutually exclusive with `connector_definition_id`." 1995 ), 1996 ] = None, 1997) -> list[PinnedConnectorVersionInfo]: 1998 """Query connector versions that have at least one scoped configuration pin. 1999 2000 Returns versions from the prod DB that are referenced by at least one 2001 `scoped_configuration` pin (`key = 'connector_version'`). Each version 2002 appears exactly once with per-scope pin breakdown (actor, workspace, org). 2003 2004 If neither filter is provided, returns the global superset across all connectors. 2005 """ 2006 if connector_definition_id and connector_canonical_name: 2007 raise PyAirbyteInputError( 2008 message=( 2009 "Provide at most one of `connector_definition_id` or " 2010 "`connector_canonical_name`, not both." 2011 ), 2012 ) 2013 2014 resolved_id: str | None = None 2015 if connector_canonical_name: 2016 resolved_id = resolve_canonical_name_to_definition_id( 2017 canonical_name=connector_canonical_name, 2018 ) 2019 elif connector_definition_id: 2020 resolved_id = connector_definition_id 2021 2022 rows = query_versions_with_pins(actor_definition_id=resolved_id) 2023 return [ 2024 PinnedConnectorVersionInfo( 2025 version_id=str(row["version_id"]), 2026 connector_definition_id=str(row["connector_definition_id"]), 2027 connector_name=row["connector_name"], 2028 docker_repository=row["docker_repository"], 2029 docker_image_tag=row["docker_image_tag"], 2030 last_published=( 2031 row["last_published"].isoformat() if row.get("last_published") else None 2032 ), 2033 pin_count=row["pin_count"], 2034 breaking_change_pins=row.get("breaking_change_pins", 0), 2035 rollout_pins=row.get("rollout_pins", 0), 2036 actor_pins=row.get("actor_pins", 0), 2037 workspace_pins=row.get("workspace_pins", 0), 2038 org_pins=row.get("org_pins", 0), 2039 ) 2040 for row in rows 2041 ] 2042 2043 2044def register_prod_db_query_tools(app: FastMCP) -> None: 2045 """Register prod DB query tools with the FastMCP app.""" 2046 register_mcp_tools(app, mcp_module=__name__)