airbyte.mcp.cloud

Airbyte Cloud MCP operations.

cloud module

MCP primitives registered by the cloud module of the airbyte-mcp server: 35 tool(s), 0 prompt(s), 0 resource(s).

Tools (35)

check_airbyte_cloud_workspace

Hints: read-only · idempotent · open-world

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace details including workspace ID, name, organization info, and billing status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Information about a workspace in Airbyte Cloud.",
  "properties": {
    "workspace_id": {
      "type": "string"
    },
    "workspace_name": {
      "type": "string"
    },
    "workspace_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "organization_id": {
      "type": "string"
    },
    "organization_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "payment_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "subscription_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "is_account_locked": {
      "default": false,
      "type": "boolean"
    }
  },
  "required": [
    "workspace_id",
    "workspace_name",
    "organization_id"
  ],
  "type": "object"
}

create_connection_on_cloud

Hints: open-world

Create a connection between a deployed source and destination on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_name string yes The name of the connection.
source_id string yes The ID of the deployed source.
destination_id string yes The ID of the deployed destination.
selected_streams string | array<string> yes The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
table_prefix string | null no null Optional table prefix to use when syncing to the destination.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_name": {
      "description": "The name of the connection.",
      "type": "string"
    },
    "source_id": {
      "description": "The ID of the deployed source.",
      "type": "string"
    },
    "destination_id": {
      "description": "The ID of the deployed destination.",
      "type": "string"
    },
    "selected_streams": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        }
      ],
      "description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "table_prefix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional table prefix to use when syncing to the destination."
    }
  },
  "required": [
    "connection_name",
    "source_id",
    "destination_id",
    "selected_streams"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

deploy_destination_to_cloud

Hints: open-world

Deploy a destination connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_name string yes The name to use when deploying the destination.
destination_connector_name string yes The name of the destination connector (e.g., 'destination-postgres').
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
config object | string | null no null The configuration for the destination connector.
config_secret_name string | null no null The name of the secret containing the configuration.
unique boolean no true Whether to require a unique name.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_name": {
      "description": "The name to use when deploying the destination.",
      "type": "string"
    },
    "destination_connector_name": {
      "description": "The name of the destination connector (e.g., 'destination-postgres').",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the destination connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "unique": {
      "default": true,
      "description": "Whether to require a unique name.",
      "type": "boolean"
    }
  },
  "required": [
    "destination_name",
    "destination_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

deploy_noop_destination_to_cloud

Hints: open-world

Deploy the No-op destination to Airbyte Cloud for testing purposes.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
name string no "No-op Destination"
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
unique boolean no true

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "name": {
      "default": "No-op Destination",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "unique": {
      "default": true,
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

deploy_source_to_cloud

Hints: open-world

Deploy a source connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_name string yes The name to use when deploying the source.
source_connector_name string yes The name of the source connector (e.g., 'source-faker').
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
config object | string | null no null The configuration for the source connector.
config_secret_name string | null no null The name of the secret containing the configuration.
unique boolean no true Whether to require a unique name.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_name": {
      "description": "The name to use when deploying the source.",
      "type": "string"
    },
    "source_connector_name": {
      "description": "The name of the source connector (e.g., 'source-faker').",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "unique": {
      "default": true,
      "description": "Whether to require a unique name.",
      "type": "boolean"
    }
  },
  "required": [
    "source_name",
    "source_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

describe_cloud_connection

Hints: read-only · idempotent · open-world

Get detailed information about a specific deployed connection.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to describe.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to describe.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Detailed information about a deployed connection in Airbyte Cloud.",
  "properties": {
    "connection_id": {
      "type": "string"
    },
    "connection_name": {
      "type": "string"
    },
    "connection_url": {
      "type": "string"
    },
    "source_id": {
      "type": "string"
    },
    "source_name": {
      "type": "string"
    },
    "destination_id": {
      "type": "string"
    },
    "destination_name": {
      "type": "string"
    },
    "selected_streams": {
      "items": {
        "type": "string"
      },
      "type": "array"
    },
    "table_prefix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "required": [
    "connection_id",
    "connection_name",
    "connection_url",
    "source_id",
    "source_name",
    "destination_id",
    "destination_name",
    "selected_streams",
    "table_prefix"
  ],
  "type": "object"
}

describe_cloud_destination

Hints: read-only · idempotent · open-world

Get detailed information about a specific deployed destination connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the destination to describe.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the destination to describe.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "destination_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Detailed information about a deployed destination connector in Airbyte Cloud.",
  "properties": {
    "destination_id": {
      "type": "string"
    },
    "destination_name": {
      "type": "string"
    },
    "destination_url": {
      "type": "string"
    },
    "connector_definition_id": {
      "type": "string"
    }
  },
  "required": [
    "destination_id",
    "destination_name",
    "destination_url",
    "connector_definition_id"
  ],
  "type": "object"
}

describe_cloud_organization

Hints: read-only · idempotent · open-world

Get details about a specific organization including billing status.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
organization_id string | null no null Organization ID. Required if organization_name is not provided.
organization_name string | null no null Organization name (exact match). Required if organization_id is not provided.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization ID. Required if organization_name is not provided."
    },
    "organization_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization name (exact match). Required if organization_id is not provided."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Information about an organization in Airbyte Cloud.",
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "email": {
      "type": "string"
    },
    "payment_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "subscription_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "is_account_locked": {
      "default": false,
      "type": "boolean"
    }
  },
  "required": [
    "id",
    "name",
    "email"
  ],
  "type": "object"
}

describe_cloud_source

Hints: read-only · idempotent · open-world

Get detailed information about a specific deployed source connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the source to describe.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the source to describe.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "source_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Detailed information about a deployed source connector in Airbyte Cloud.",
  "properties": {
    "source_id": {
      "type": "string"
    },
    "source_name": {
      "type": "string"
    },
    "source_url": {
      "type": "string"
    },
    "connector_definition_id": {
      "type": "string"
    }
  },
  "required": [
    "source_id",
    "source_name",
    "source_url",
    "connector_definition_id"
  ],
  "type": "object"
}

get_cloud_sync_logs

Hints: read-only · idempotent · open-world

Get the logs from a sync job attempt on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
job_id integer | null | null no null
attempt_number integer | null | null no null
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
max_lines integer no 4000 Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.
from_tail boolean | null no null Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if line_offset is not specified. Cannot combine from_tail=True with line_offset.
line_offset integer | null no null Number of lines to skip from the beginning of the logs. Cannot be combined with from_tail=True.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "job_id": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "description": "Optional job ID. If not provided, the latest job will be used."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "attempt_number": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "description": "Optional attempt number. If not provided, the latest attempt will be used."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "max_lines": {
      "default": 4000,
      "description": "Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.",
      "type": "integer"
    },
    "from_tail": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`."
    },
    "line_offset": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading sync logs with pagination support.",
  "properties": {
    "job_id": {
      "type": "integer"
    },
    "attempt_number": {
      "type": "integer"
    },
    "log_text": {
      "type": "string"
    },
    "log_text_start_line": {
      "type": "integer"
    },
    "log_text_line_count": {
      "type": "integer"
    },
    "total_log_lines_available": {
      "type": "integer"
    }
  },
  "required": [
    "job_id",
    "attempt_number",
    "log_text",
    "log_text_start_line",
    "log_text_line_count",
    "total_log_lines_available"
  ],
  "type": "object"
}

get_cloud_sync_status

Hints: read-only · idempotent · open-world

Get the status of a sync job from the Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
job_id integer | null no null Optional job ID. If not provided, the latest job will be used.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
include_attempts boolean no false Whether to include detailed attempts information.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "job_id": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional job ID. If not provided, the latest job will be used."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "include_attempts": {
      "default": false,
      "description": "Whether to include detailed attempts information.",
      "type": "boolean"
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_connection_artifact

Hints: read-only · idempotent · open-world

Get a connection artifact (state or catalog) from Airbyte Cloud.

By default, returns artifacts in Airbyte protocol format (snake_case,
suitable for passing to connector CLI flags like `--state` or `--catalog`).

Retrieves the specified artifact for a connection:
- `state`: Returns a list of protocol-format `AirbyteStateMessage` dicts,
  or `{"ERROR": "..."}` if no state is set.
- `catalog`: Returns the protocol-format `ConfiguredAirbyteCatalog` dict,
  or `{"ERROR": "..."}` if not found.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
artifact_type enum("state", "catalog") yes The type of artifact to retrieve: 'state' or 'catalog'.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "artifact_type": {
      "description": "The type of artifact to retrieve: 'state' or 'catalog'.",
      "enum": [
        "state",
        "catalog"
      ],
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "artifact_type"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "items": {
            "additionalProperties": true,
            "type": "object"
          },
          "type": "array"
        }
      ]
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

get_connector_builder_draft_manifest

Hints: read-only · idempotent · open-world

Get the Connector Builder draft manifest for a custom source definition.

Returns the working draft manifest that has been saved in the Connector Builder UI but not yet published. This is useful for inspecting what a user is currently working on before they publish their changes.

If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. The published manifest is always included for comparison.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the custom source definition to retrieve the draft for.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the custom source definition to retrieve the draft for.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_custom_source_definition

Hints: read-only · idempotent · open-world

Get a custom YAML source definition from Airbyte Cloud, including its manifest.

Returns the full definition details including the published manifest YAML content. Optionally includes the Connector Builder draft manifest (unpublished changes) when include_draft=True.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the custom source definition to retrieve.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
include_draft boolean no false Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the custom source definition to retrieve.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "include_draft": {
      "default": false,
      "description": "Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.",
      "type": "boolean"
    }
  },
  "required": [
    "definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

list_cloud_sync_jobs

Hints: read-only · idempotent · open-world

List sync jobs for a connection with limit support.

This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and result limit. By default, jobs are returned
newest-first (`from_tail=True`).

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
max_jobs integer no 20 Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.
from_tail boolean | null no null When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True.
job_type enum("sync", "reset", "refresh", "clear") | null no null Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "max_jobs": {
      "default": 20,
      "description": "Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.",
      "type": "integer"
    },
    "from_tail": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True."
    },
    "job_type": {
      "anyOf": [
        {
          "description": "Enum that describes the different types of jobs that the platform runs.",
          "enum": [
            "sync",
            "reset",
            "refresh",
            "clear"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing sync jobs with limit support.",
  "properties": {
    "jobs": {
      "items": {
        "description": "Information about a sync job.",
        "properties": {
          "job_id": {
            "type": "integer"
          },
          "status": {
            "type": "string"
          },
          "bytes_synced": {
            "type": "integer"
          },
          "records_synced": {
            "type": "integer"
          },
          "start_time": {
            "type": "string"
          },
          "job_url": {
            "type": "string"
          }
        },
        "required": [
          "job_id",
          "status",
          "bytes_synced",
          "records_synced",
          "start_time",
          "job_url"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "jobs_count": {
      "type": "integer"
    },
    "from_tail": {
      "type": "boolean"
    }
  },
  "required": [
    "jobs",
    "jobs_count",
    "from_tail"
  ],
  "type": "object"
}

list_cloud_workspaces

Hints: read-only · idempotent · open-world

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
organization_id string | null no null Organization ID. Required if organization_name is not provided.
organization_name string | null no null Organization name (exact match). Required if organization_id is not provided.
name_contains string | null no null Optional substring to filter workspaces by name (server-side filtering)
limit integer | null no null Optional maximum number of items to return (default: no limit)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization ID. Required if organization_name is not provided."
    },
    "organization_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization name (exact match). Required if organization_id is not provided."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional substring to filter workspaces by name (server-side filtering)"
    },
    "limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a workspace in Airbyte Cloud.",
        "properties": {
          "workspace_id": {
            "type": "string"
          },
          "workspace_name": {
            "type": "string"
          },
          "workspace_url": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "organization_id": {
            "type": "string"
          },
          "organization_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "payment_status": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "subscription_status": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "is_account_locked": {
            "default": false,
            "type": "boolean"
          }
        },
        "required": [
          "workspace_id",
          "workspace_name",
          "organization_id"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_custom_source_definitions

Hints: read-only · idempotent · open-world

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_deployed_cloud_connections

Hints: read-only · idempotent · open-world

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
name_contains string | null no null Optional case-insensitive substring to filter connections by name
limit integer | null no null Optional maximum number of items to return (default: no limit)
with_connection_status boolean | null no false If True, include status info for each connection's most recent sync job
failing_connections_only boolean | null no false If True, only return connections with failed/cancelled last sync

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional case-insensitive substring to filter connections by name"
    },
    "limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    },
    "with_connection_status": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": false,
      "description": "If True, include status info for each connection's most recent sync job"
    },
    "failing_connections_only": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": false,
      "description": "If True, only return connections with failed/cancelled last sync"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a deployed connection in Airbyte Cloud.",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "url": {
            "type": "string"
          },
          "source_id": {
            "type": "string"
          },
          "destination_id": {
            "type": "string"
          },
          "last_job_status": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "last_job_id": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "last_job_time": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "currently_running_job_id": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "currently_running_job_start_time": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          }
        },
        "required": [
          "id",
          "name",
          "url",
          "source_id",
          "destination_id"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_deployed_cloud_destination_connectors

Hints: read-only · idempotent · open-world

List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
name_contains string | null no null Optional case-insensitive substring to filter destinations by name
limit integer | null no null Optional maximum number of items to return (default: no limit)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional case-insensitive substring to filter destinations by name"
    },
    "limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a deployed destination connector in Airbyte Cloud.",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "url": {
            "type": "string"
          }
        },
        "required": [
          "id",
          "name",
          "url"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_deployed_cloud_source_connectors

Hints: read-only · idempotent · open-world

List all deployed source connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
name_contains string | null no null Optional case-insensitive substring to filter sources by name
limit integer | null no null Optional maximum number of items to return (default: no limit)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional case-insensitive substring to filter sources by name"
    },
    "limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a deployed source connector in Airbyte Cloud.",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "url": {
            "type": "string"
          }
        },
        "required": [
          "id",
          "name",
          "url"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_cloud_connection

Hints: destructive · open-world

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.

The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to delete.
name string yes The expected name of the connection (for verification).
cascade_delete_source boolean no false Whether to also delete the source connector associated with this connection.
cascade_delete_destination boolean no false Whether to also delete the destination connector associated with this connection.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the connection (for verification).",
      "type": "string"
    },
    "cascade_delete_source": {
      "default": false,
      "description": "Whether to also delete the source connector associated with this connection.",
      "type": "boolean"
    },
    "cascade_delete_destination": {
      "default": false,
      "description": "Whether to also delete the destination connector associated with this connection.",
      "type": "boolean"
    }
  },
  "required": [
    "connection_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_cloud_destination

Hints: destructive · open-world

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.

The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the deployed destination to delete.
name string yes The expected name of the destination (for verification).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the deployed destination to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the destination (for verification).",
      "type": "string"
    }
  },
  "required": [
    "destination_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_cloud_source

Hints: destructive · open-world

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.

The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the deployed source to delete.
name string yes The expected name of the source (for verification).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the deployed source to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the source (for verification).",
      "type": "string"
    }
  },
  "required": [
    "source_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_custom_source_definition

Hints: destructive · open-world

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the custom source definition to delete.
name string yes The expected name of the custom source definition (for verification).
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the custom source definition to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the custom source definition (for verification).",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "definition_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

publish_custom_source_definition

Hints: open-world

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
name string yes The name for the custom connector definition.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
manifest_yaml string | string | null | null no null
unique boolean no true Whether to require a unique name.
pre_validate boolean no true Whether to validate the manifest client-side before publishing.
testing_values object | string | null no null Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations.
testing_values_secret_name string | null no null Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "name": {
      "description": "The name for the custom connector definition.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "manifest_yaml": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "path",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "unique": {
      "default": true,
      "description": "Whether to require a unique name.",
      "type": "boolean"
    },
    "pre_validate": {
      "default": true,
      "description": "Whether to validate the manifest client-side before publishing.",
      "type": "boolean"
    },
    "testing_values": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations."
    },
    "testing_values_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
    }
  },
  "required": [
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

rename_cloud_connection

Hints: open-world

Rename a connection on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to rename.
name string yes New name for the connection.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to rename.",
      "type": "string"
    },
    "name": {
      "description": "New name for the connection.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

rename_cloud_destination

Hints: open-world

Rename a deployed destination connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the deployed destination to rename.
name string yes New name for the destination.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the deployed destination to rename.",
      "type": "string"
    },
    "name": {
      "description": "New name for the destination.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "destination_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

rename_cloud_source

Hints: open-world

Rename a deployed source connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the deployed source to rename.
name string yes New name for the source.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the deployed source to rename.",
      "type": "string"
    },
    "name": {
      "description": "New name for the source.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "source_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

run_cloud_sync

Hints: open-world

Run a sync job on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
wait boolean no false Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.
wait_timeout integer no 300 Maximum time to wait for sync completion (seconds).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "wait": {
      "default": false,
      "description": "Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.",
      "type": "boolean"
    },
    "wait_timeout": {
      "default": 300,
      "description": "Maximum time to wait for sync completion (seconds).",
      "type": "integer"
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

set_cloud_connection_selected_streams

Hints: destructive · open-world

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to update.
stream_names string | array<string> yes The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to update.",
      "type": "string"
    },
    "stream_names": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        }
      ],
      "description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "stream_names"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

set_cloud_connection_table_prefix

Hints: destructive · open-world

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to update.
prefix string yes New table prefix to use when syncing to the destination.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to update.",
      "type": "string"
    },
    "prefix": {
      "description": "New table prefix to use when syncing to the destination.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "prefix"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_cloud_connection

Hints: destructive · open-world

Update a connection's settings on Airbyte Cloud.

This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)

At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to update.
enabled boolean | null no null Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status.
cron_expression string | null no null A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'.
manual_schedule boolean | null no null Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to update.",
      "type": "string"
    },
    "enabled": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status."
    },
    "cron_expression": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'."
    },
    "manual_schedule": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_cloud_destination_config

Hints: destructive · open-world

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the deployed destination to update.
config object | string yes New configuration for the destination connector.
config_secret_name string | null no null The name of the secret containing the configuration.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the deployed destination to update.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        }
      ],
      "description": "New configuration for the destination connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "destination_id",
    "config"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_cloud_source_config

Hints: destructive · open-world

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the deployed source to update.
config object | string yes New configuration for the source connector.
config_secret_name string | null | null no null
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the deployed source to update.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        }
      ],
      "description": "New configuration for the source connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "The name of the secret containing the configuration."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "source_id",
    "config"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_custom_source_definition

Hints: destructive · open-world

Update a custom YAML source definition in Airbyte Cloud.

Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the definition to update.
manifest_yaml string | string | null | null no null
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
pre_validate boolean no true Whether to validate the manifest client-side before updating.
testing_values object | string | null no null Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist.
testing_values_secret_name string | null no null Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the definition to update.",
      "type": "string"
    },
    "manifest_yaml": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "path",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "New manifest as YAML string or file path. Optional; omit to update only testing values."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "pre_validate": {
      "default": true,
      "description": "Whether to validate the manifest client-side before updating.",
      "type": "boolean"
    },
    "testing_values": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist."
    },
    "testing_values_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
    }
  },
  "required": [
    "definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations.
   3
   4.. include:: ../../docs/mcp-generated/cloud.md
   5"""
   6
   7# No public Python API — MCP primitives are registered via decorators and
   8# documented via the generated Markdown include above. Setting `__all__` to an
   9# empty list tells pdoc (and other doc tools) not to surface the individual
  10# tool / helper definitions as a redundant "API Documentation" list.
  11__all__: list[str] = []
  12
  13from pathlib import Path
  14from typing import Annotated, Any, Literal, cast
  15
  16from airbyte_api.models import JobTypeEnum
  17from fastmcp import Context, FastMCP
  18from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
  19from pydantic import BaseModel, Field
  20
  21from airbyte import cloud, get_destination, get_source
  22from airbyte._util import api_util
  23from airbyte.cloud.client import CloudClient
  24from airbyte.cloud.connectors import CustomCloudSourceDefinition
  25from airbyte.cloud.constants import FAILED_STATUSES
  26from airbyte.cloud.workspaces import CloudWorkspace
  27from airbyte.constants import (
  28    MCP_CONFIG_API_URL,
  29    MCP_CONFIG_BEARER_TOKEN,
  30    MCP_CONFIG_CLIENT_ID,
  31    MCP_CONFIG_CLIENT_SECRET,
  32    MCP_CONFIG_CONFIG_API_URL,
  33    MCP_CONFIG_WORKSPACE_ID,
  34)
  35from airbyte.destinations.util import get_noop_destination
  36from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  37from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
  38from airbyte.mcp._tool_utils import (
  39    AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET,
  40    check_guid_created_in_session,
  41    register_guid_created_in_session,
  42)
  43
  44
  45CLOUD_AUTH_TIP_TEXT = (
  46    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  47    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  48    "will be used to authenticate with the Airbyte Cloud API."
  49)
  50WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  51
  52
  53class CloudSourceResult(BaseModel):
  54    """Information about a deployed source connector in Airbyte Cloud."""
  55
  56    id: str
  57    """The source ID."""
  58    name: str
  59    """Display name of the source."""
  60    url: str
  61    """Web URL for managing this source in Airbyte Cloud."""
  62
  63
  64class CloudDestinationResult(BaseModel):
  65    """Information about a deployed destination connector in Airbyte Cloud."""
  66
  67    id: str
  68    """The destination ID."""
  69    name: str
  70    """Display name of the destination."""
  71    url: str
  72    """Web URL for managing this destination in Airbyte Cloud."""
  73
  74
  75class CloudConnectionResult(BaseModel):
  76    """Information about a deployed connection in Airbyte Cloud."""
  77
  78    id: str
  79    """The connection ID."""
  80    name: str
  81    """Display name of the connection."""
  82    url: str
  83    """Web URL for managing this connection in Airbyte Cloud."""
  84    source_id: str
  85    """ID of the source used by this connection."""
  86    destination_id: str
  87    """ID of the destination used by this connection."""
  88    last_job_status: str | None = None
  89    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  90    Only populated when with_connection_status=True."""
  91    last_job_id: int | None = None
  92    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  93    last_job_time: str | None = None
  94    """ISO 8601 timestamp of the most recent completed sync.
  95    Only populated when with_connection_status=True."""
  96    currently_running_job_id: int | None = None
  97    """Job ID of a currently running sync, if any.
  98    Only populated when with_connection_status=True."""
  99    currently_running_job_start_time: str | None = None
 100    """ISO 8601 timestamp of when the currently running sync started.
 101    Only populated when with_connection_status=True."""
 102
 103
 104class CloudSourceDetails(BaseModel):
 105    """Detailed information about a deployed source connector in Airbyte Cloud."""
 106
 107    source_id: str
 108    """The source ID."""
 109    source_name: str
 110    """Display name of the source."""
 111    source_url: str
 112    """Web URL for managing this source in Airbyte Cloud."""
 113    connector_definition_id: str
 114    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 115
 116
 117class CloudDestinationDetails(BaseModel):
 118    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 119
 120    destination_id: str
 121    """The destination ID."""
 122    destination_name: str
 123    """Display name of the destination."""
 124    destination_url: str
 125    """Web URL for managing this destination in Airbyte Cloud."""
 126    connector_definition_id: str
 127    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 128
 129
 130class CloudConnectionDetails(BaseModel):
 131    """Detailed information about a deployed connection in Airbyte Cloud."""
 132
 133    connection_id: str
 134    """The connection ID."""
 135    connection_name: str
 136    """Display name of the connection."""
 137    connection_url: str
 138    """Web URL for managing this connection in Airbyte Cloud."""
 139    source_id: str
 140    """ID of the source used by this connection."""
 141    source_name: str
 142    """Display name of the source."""
 143    destination_id: str
 144    """ID of the destination used by this connection."""
 145    destination_name: str
 146    """Display name of the destination."""
 147    selected_streams: list[str]
 148    """List of stream names selected for syncing."""
 149    table_prefix: str | None
 150    """Table prefix applied when syncing to the destination."""
 151
 152
 153class CloudOrganizationResult(BaseModel):
 154    """Information about an organization in Airbyte Cloud."""
 155
 156    id: str
 157    """The organization ID."""
 158    name: str
 159    """Display name of the organization."""
 160    email: str
 161    """Email associated with the organization."""
 162    payment_status: str | None = None
 163    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 164    When 'disabled', syncs are blocked due to unpaid invoices."""
 165    subscription_status: str | None = None
 166    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 167    'unsubscribed')."""
 168    is_account_locked: bool = False
 169    """Whether the account is locked due to billing issues.
 170    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 171    Defaults to False unless we have affirmative evidence of a locked state."""
 172
 173
 174class CloudWorkspaceResult(BaseModel):
 175    """Information about a workspace in Airbyte Cloud."""
 176
 177    workspace_id: str
 178    """The workspace ID."""
 179    workspace_name: str
 180    """Display name of the workspace."""
 181    workspace_url: str | None = None
 182    """URL to access the workspace in Airbyte Cloud."""
 183    organization_id: str
 184    """ID of the organization (requires ORGANIZATION_READER permission)."""
 185    organization_name: str | None = None
 186    """Name of the organization (requires ORGANIZATION_READER permission)."""
 187    payment_status: str | None = None
 188    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 189    When 'disabled', syncs are blocked due to unpaid invoices.
 190    Requires ORGANIZATION_READER permission."""
 191    subscription_status: str | None = None
 192    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 193    'unsubscribed'). Requires ORGANIZATION_READER permission."""
 194    is_account_locked: bool = False
 195    """Whether the account is locked due to billing issues.
 196    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 197    Defaults to False unless we have affirmative evidence of a locked state.
 198    Requires ORGANIZATION_READER permission."""
 199
 200
 201class LogReadResult(BaseModel):
 202    """Result of reading sync logs with pagination support."""
 203
 204    job_id: int
 205    """The job ID the logs belong to."""
 206    attempt_number: int
 207    """The attempt number the logs belong to."""
 208    log_text: str
 209    """The string containing the log text we are returning."""
 210    log_text_start_line: int
 211    """1-based line index of the first line returned."""
 212    log_text_line_count: int
 213    """Count of lines we are returning."""
 214    total_log_lines_available: int
 215    """Total number of log lines available, shows if any lines were missed due to the limit."""
 216
 217
 218class SyncJobResult(BaseModel):
 219    """Information about a sync job."""
 220
 221    job_id: int
 222    """The job ID."""
 223    status: str
 224    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
 225    bytes_synced: int
 226    """Number of bytes synced in this job."""
 227    records_synced: int
 228    """Number of records synced in this job."""
 229    start_time: str
 230    """ISO 8601 timestamp of when the job started."""
 231    job_url: str
 232    """URL to view the job in Airbyte Cloud."""
 233
 234
 235class SyncJobListResult(BaseModel):
 236    """Result of listing sync jobs with limit support."""
 237
 238    jobs: list[SyncJobResult]
 239    """List of sync jobs."""
 240    jobs_count: int
 241    """Number of jobs returned in this response."""
 242    from_tail: bool
 243    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
 244
 245
 246def _get_cloud_workspace(
 247    ctx: Context,
 248    workspace_id: str | None = None,
 249) -> CloudWorkspace:
 250    """Get an authenticated CloudWorkspace.
 251
 252    Resolves credentials from multiple sources via MCP config args in order:
 253    1. HTTP headers (when running as MCP server with HTTP/SSE transport)
 254    2. Environment variables
 255
 256    The ctx parameter provides access to MCP config values that are resolved
 257    from HTTP headers or environment variables based on the config args
 258    defined in server.py.
 259    """
 260    resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID)
 261    if not resolved_workspace_id:
 262        raise PyAirbyteInputError(
 263            message="Workspace ID is required but not provided.",
 264            guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.",
 265        )
 266
 267    return _get_cloud_client(ctx).get_workspace(resolved_workspace_id)
 268
 269
 270def _get_cloud_client(
 271    ctx: Context,
 272    *,
 273    organization_id: str | None = None,
 274) -> CloudClient:
 275    """Get an authenticated `CloudClient` from MCP config."""
 276    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
 277    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
 278    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
 279    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL)
 280    config_api_url = get_mcp_config(ctx, MCP_CONFIG_CONFIG_API_URL)
 281
 282    return CloudClient(
 283        client_id=client_id,
 284        client_secret=client_secret,
 285        bearer_token=bearer_token,
 286        public_api_root=api_url,
 287        config_api_root=config_api_url,
 288        organization_id=organization_id,
 289    )
 290
 291
 292@mcp_tool(
 293    open_world=True,
 294    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 295)
 296def deploy_source_to_cloud(
 297    ctx: Context,
 298    source_name: Annotated[
 299        str,
 300        Field(description="The name to use when deploying the source."),
 301    ],
 302    source_connector_name: Annotated[
 303        str,
 304        Field(description="The name of the source connector (e.g., 'source-faker')."),
 305    ],
 306    *,
 307    workspace_id: Annotated[
 308        str | None,
 309        Field(
 310            description=WORKSPACE_ID_TIP_TEXT,
 311            default=None,
 312        ),
 313    ],
 314    config: Annotated[
 315        dict | str | None,
 316        Field(
 317            description="The configuration for the source connector.",
 318            default=None,
 319        ),
 320    ],
 321    config_secret_name: Annotated[
 322        str | None,
 323        Field(
 324            description="The name of the secret containing the configuration.",
 325            default=None,
 326        ),
 327    ],
 328    unique: Annotated[
 329        bool,
 330        Field(
 331            description="Whether to require a unique name.",
 332            default=True,
 333        ),
 334    ],
 335) -> str:
 336    """Deploy a source connector to Airbyte Cloud."""
 337    source = get_source(
 338        source_connector_name,
 339        no_executor=True,
 340    )
 341    config_dict = resolve_connector_config(
 342        config=config,
 343        config_secret_name=config_secret_name,
 344        config_spec_jsonschema=source.config_spec,
 345    )
 346    source.set_config(config_dict, validate=True)
 347
 348    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 349    deployed_source = workspace.deploy_source(
 350        name=source_name,
 351        source=source,
 352        unique=unique,
 353    )
 354
 355    register_guid_created_in_session(deployed_source.connector_id)
 356    return (
 357        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 358        f" and URL: {deployed_source.connector_url}"
 359    )
 360
 361
 362@mcp_tool(
 363    open_world=True,
 364    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 365)
 366def deploy_destination_to_cloud(
 367    ctx: Context,
 368    destination_name: Annotated[
 369        str,
 370        Field(description="The name to use when deploying the destination."),
 371    ],
 372    destination_connector_name: Annotated[
 373        str,
 374        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 375    ],
 376    *,
 377    workspace_id: Annotated[
 378        str | None,
 379        Field(
 380            description=WORKSPACE_ID_TIP_TEXT,
 381            default=None,
 382        ),
 383    ],
 384    config: Annotated[
 385        dict | str | None,
 386        Field(
 387            description="The configuration for the destination connector.",
 388            default=None,
 389        ),
 390    ],
 391    config_secret_name: Annotated[
 392        str | None,
 393        Field(
 394            description="The name of the secret containing the configuration.",
 395            default=None,
 396        ),
 397    ],
 398    unique: Annotated[
 399        bool,
 400        Field(
 401            description="Whether to require a unique name.",
 402            default=True,
 403        ),
 404    ],
 405) -> str:
 406    """Deploy a destination connector to Airbyte Cloud."""
 407    destination = get_destination(
 408        destination_connector_name,
 409        no_executor=True,
 410    )
 411    config_dict = resolve_connector_config(
 412        config=config,
 413        config_secret_name=config_secret_name,
 414        config_spec_jsonschema=destination.config_spec,
 415    )
 416    destination.set_config(config_dict, validate=True)
 417
 418    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 419    deployed_destination = workspace.deploy_destination(
 420        name=destination_name,
 421        destination=destination,
 422        unique=unique,
 423    )
 424
 425    register_guid_created_in_session(deployed_destination.connector_id)
 426    return (
 427        f"Successfully deployed destination '{destination_name}' "
 428        f"with ID: {deployed_destination.connector_id}"
 429    )
 430
 431
 432@mcp_tool(
 433    open_world=True,
 434    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 435)
 436def create_connection_on_cloud(
 437    ctx: Context,
 438    connection_name: Annotated[
 439        str,
 440        Field(description="The name of the connection."),
 441    ],
 442    source_id: Annotated[
 443        str,
 444        Field(description="The ID of the deployed source."),
 445    ],
 446    destination_id: Annotated[
 447        str,
 448        Field(description="The ID of the deployed destination."),
 449    ],
 450    selected_streams: Annotated[
 451        str | list[str],
 452        Field(
 453            description=(
 454                "The selected stream names to sync within the connection. "
 455                "Must be an explicit stream name or list of streams. "
 456                "Cannot be empty or '*'."
 457            )
 458        ),
 459    ],
 460    *,
 461    workspace_id: Annotated[
 462        str | None,
 463        Field(
 464            description=WORKSPACE_ID_TIP_TEXT,
 465            default=None,
 466        ),
 467    ],
 468    table_prefix: Annotated[
 469        str | None,
 470        Field(
 471            description="Optional table prefix to use when syncing to the destination.",
 472            default=None,
 473        ),
 474    ],
 475) -> str:
 476    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 477    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 478    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 479    deployed_connection = workspace.deploy_connection(
 480        connection_name=connection_name,
 481        source=source_id,
 482        destination=destination_id,
 483        selected_streams=resolved_streams_list,
 484        table_prefix=table_prefix,
 485    )
 486
 487    register_guid_created_in_session(deployed_connection.connection_id)
 488    return (
 489        f"Successfully created connection '{connection_name}' "
 490        f"with ID '{deployed_connection.connection_id}' and "
 491        f"URL: {deployed_connection.connection_url}"
 492    )
 493
 494
 495@mcp_tool(
 496    open_world=True,
 497    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 498)
 499def run_cloud_sync(
 500    ctx: Context,
 501    connection_id: Annotated[
 502        str,
 503        Field(description="The ID of the Airbyte Cloud connection."),
 504    ],
 505    *,
 506    workspace_id: Annotated[
 507        str | None,
 508        Field(
 509            description=WORKSPACE_ID_TIP_TEXT,
 510            default=None,
 511        ),
 512    ],
 513    wait: Annotated[
 514        bool,
 515        Field(
 516            description=(
 517                "Whether to wait for the sync to complete. Since a sync can take between several "
 518                "minutes and several hours, this option is not recommended for most "
 519                "scenarios."
 520            ),
 521            default=False,
 522        ),
 523    ],
 524    wait_timeout: Annotated[
 525        int,
 526        Field(
 527            description="Maximum time to wait for sync completion (seconds).",
 528            default=300,
 529        ),
 530    ],
 531) -> str:
 532    """Run a sync job on Airbyte Cloud."""
 533    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 534    connection = workspace.get_connection(connection_id=connection_id)
 535    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 536
 537    if wait:
 538        status = sync_result.get_job_status()
 539        return (
 540            f"Sync completed with status: {status}. "
 541            f"Job ID is '{sync_result.job_id}' and "
 542            f"job URL is: {sync_result.job_url}"
 543        )
 544    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 545
 546
 547@mcp_tool(
 548    read_only=True,
 549    idempotent=True,
 550    open_world=True,
 551    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 552)
 553def check_airbyte_cloud_workspace(
 554    ctx: Context,
 555    *,
 556    workspace_id: Annotated[
 557        str | None,
 558        Field(
 559            description=WORKSPACE_ID_TIP_TEXT,
 560            default=None,
 561        ),
 562    ],
 563) -> CloudWorkspaceResult:
 564    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 565
 566    Returns workspace details including workspace ID, name, organization info, and billing status.
 567    """
 568    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 569
 570    # Get workspace details from the public API using workspace's credentials
 571    workspace_response = api_util.get_workspace(
 572        workspace_id=workspace.workspace_id,
 573        api_root=workspace.api_root,
 574        client_id=workspace.client_id,
 575        client_secret=workspace.client_secret,
 576        bearer_token=workspace.bearer_token,
 577    )
 578
 579    # Try to get organization info (including billing), but fail gracefully if we don't have
 580    # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the
 581    # organization, which may not be available with workspace-scoped credentials.
 582    organization = workspace.get_organization(raise_on_error=False)
 583
 584    return CloudWorkspaceResult(
 585        workspace_id=workspace_response.workspace_id,
 586        workspace_name=workspace_response.name,
 587        workspace_url=workspace.workspace_url,
 588        organization_id=(
 589            organization.organization_id
 590            if organization
 591            else "[unavailable - requires ORGANIZATION_READER permission]"
 592        ),
 593        organization_name=organization.organization_name if organization else None,
 594        payment_status=organization.payment_status if organization else None,
 595        subscription_status=organization.subscription_status if organization else None,
 596        is_account_locked=organization.is_account_locked if organization else False,
 597    )
 598
 599
 600@mcp_tool(
 601    open_world=True,
 602    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 603)
 604def deploy_noop_destination_to_cloud(
 605    ctx: Context,
 606    name: str = "No-op Destination",
 607    *,
 608    workspace_id: Annotated[
 609        str | None,
 610        Field(
 611            description=WORKSPACE_ID_TIP_TEXT,
 612            default=None,
 613        ),
 614    ],
 615    unique: bool = True,
 616) -> str:
 617    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 618    destination = get_noop_destination()
 619    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 620    deployed_destination = workspace.deploy_destination(
 621        name=name,
 622        destination=destination,
 623        unique=unique,
 624    )
 625    register_guid_created_in_session(deployed_destination.connector_id)
 626    return (
 627        f"Successfully deployed No-op Destination "
 628        f"with ID '{deployed_destination.connector_id}' and "
 629        f"URL: {deployed_destination.connector_url}"
 630    )
 631
 632
 633@mcp_tool(
 634    read_only=True,
 635    idempotent=True,
 636    open_world=True,
 637    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 638)
 639def get_cloud_sync_status(
 640    ctx: Context,
 641    connection_id: Annotated[
 642        str,
 643        Field(
 644            description="The ID of the Airbyte Cloud connection.",
 645        ),
 646    ],
 647    job_id: Annotated[
 648        int | None,
 649        Field(
 650            description="Optional job ID. If not provided, the latest job will be used.",
 651            default=None,
 652        ),
 653    ],
 654    *,
 655    workspace_id: Annotated[
 656        str | None,
 657        Field(
 658            description=WORKSPACE_ID_TIP_TEXT,
 659            default=None,
 660        ),
 661    ],
 662    include_attempts: Annotated[
 663        bool,
 664        Field(
 665            description="Whether to include detailed attempts information.",
 666            default=False,
 667        ),
 668    ],
 669) -> dict[str, Any]:
 670    """Get the status of a sync job from the Airbyte Cloud."""
 671    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 672    connection = workspace.get_connection(connection_id=connection_id)
 673
 674    # If a job ID is provided, get the job by ID.
 675    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 676
 677    if not sync_result:
 678        return {"status": None, "job_id": None, "attempts": []}
 679
 680    result = {
 681        "status": sync_result.get_job_status(),
 682        "job_id": sync_result.job_id,
 683        "bytes_synced": sync_result.bytes_synced,
 684        "records_synced": sync_result.records_synced,
 685        "start_time": sync_result.start_time.isoformat(),
 686        "job_url": sync_result.job_url,
 687        "attempts": [],
 688    }
 689
 690    if include_attempts:
 691        attempts = sync_result.get_attempts()
 692        result["attempts"] = [
 693            {
 694                "attempt_number": attempt.attempt_number,
 695                "attempt_id": attempt.attempt_id,
 696                "status": attempt.status,
 697                "bytes_synced": attempt.bytes_synced,
 698                "records_synced": attempt.records_synced,
 699                "created_at": attempt.created_at.isoformat(),
 700            }
 701            for attempt in attempts
 702        ]
 703
 704    return result
 705
 706
 707@mcp_tool(
 708    read_only=True,
 709    idempotent=True,
 710    open_world=True,
 711    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 712)
 713def list_cloud_sync_jobs(
 714    ctx: Context,
 715    connection_id: Annotated[
 716        str,
 717        Field(description="The ID of the Airbyte Cloud connection."),
 718    ],
 719    *,
 720    workspace_id: Annotated[
 721        str | None,
 722        Field(
 723            description=WORKSPACE_ID_TIP_TEXT,
 724            default=None,
 725        ),
 726    ],
 727    max_jobs: Annotated[
 728        int,
 729        Field(
 730            description=(
 731                "Maximum number of jobs to return. "
 732                "Defaults to 20 if not specified. "
 733                "Maximum allowed value is 500."
 734            ),
 735            default=20,
 736        ),
 737    ],
 738    from_tail: Annotated[
 739        bool | None,
 740        Field(
 741            description=(
 742                "When True, jobs are ordered newest-first (createdAt DESC). "
 743                "When False, jobs are ordered oldest-first (createdAt ASC). "
 744                "Defaults to True."
 745            ),
 746            default=None,
 747        ),
 748    ],
 749    job_type: Annotated[
 750        JobTypeEnum | None,
 751        Field(
 752            description=(
 753                "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. "
 754                "If not specified, defaults to sync and reset jobs only (API default). "
 755                "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
 756            ),
 757            default=None,
 758        ),
 759    ],
 760) -> SyncJobListResult:
 761    """List sync jobs for a connection with limit support.
 762
 763    This tool allows you to retrieve a list of sync jobs for a connection,
 764    with control over ordering and result limit. By default, jobs are returned
 765    newest-first (`from_tail=True`).
 766    """
 767    if from_tail is None:
 768        from_tail = True
 769
 770    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 771    connection = workspace.get_connection(connection_id=connection_id)
 772
 773    # Cap at 500 to avoid overloading agent context
 774    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
 775
 776    sync_results = connection.get_previous_sync_logs(
 777        limit=effective_limit,
 778        from_tail=from_tail,
 779        job_type=job_type,
 780    )
 781
 782    jobs = [
 783        SyncJobResult(
 784            job_id=sync_result.job_id,
 785            status=str(sync_result.get_job_status()),
 786            bytes_synced=sync_result.bytes_synced,
 787            records_synced=sync_result.records_synced,
 788            start_time=sync_result.start_time.isoformat(),
 789            job_url=sync_result.job_url,
 790        )
 791        for sync_result in sync_results
 792    ]
 793
 794    return SyncJobListResult(
 795        jobs=jobs,
 796        jobs_count=len(jobs),
 797        from_tail=from_tail,
 798    )
 799
 800
 801@mcp_tool(
 802    read_only=True,
 803    idempotent=True,
 804    open_world=True,
 805    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 806)
 807def list_deployed_cloud_source_connectors(
 808    ctx: Context,
 809    *,
 810    workspace_id: Annotated[
 811        str | None,
 812        Field(
 813            description=WORKSPACE_ID_TIP_TEXT,
 814            default=None,
 815        ),
 816    ],
 817    name_contains: Annotated[
 818        str | None,
 819        Field(
 820            description="Optional case-insensitive substring to filter sources by name",
 821            default=None,
 822        ),
 823    ],
 824    limit: Annotated[
 825        int | None,
 826        Field(
 827            description="Optional maximum number of items to return (default: no limit)",
 828            default=None,
 829        ),
 830    ],
 831) -> list[CloudSourceResult]:
 832    """List all deployed source connectors in the Airbyte Cloud workspace."""
 833    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 834    sources = workspace.list_sources(limit=None if name_contains else limit)
 835
 836    # Filter by name if requested
 837    if name_contains:
 838        needle = name_contains.lower()
 839        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 840    if limit is not None:
 841        sources = sources[:limit]
 842
 843    # Note: name and url are guaranteed non-null from list API responses
 844    return [
 845        CloudSourceResult(
 846            id=source.source_id,
 847            name=cast(str, source.name),
 848            url=cast(str, source.connector_url),
 849        )
 850        for source in sources
 851    ]
 852
 853
 854@mcp_tool(
 855    read_only=True,
 856    idempotent=True,
 857    open_world=True,
 858    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 859)
 860def list_deployed_cloud_destination_connectors(
 861    ctx: Context,
 862    *,
 863    workspace_id: Annotated[
 864        str | None,
 865        Field(
 866            description=WORKSPACE_ID_TIP_TEXT,
 867            default=None,
 868        ),
 869    ],
 870    name_contains: Annotated[
 871        str | None,
 872        Field(
 873            description="Optional case-insensitive substring to filter destinations by name",
 874            default=None,
 875        ),
 876    ],
 877    limit: Annotated[
 878        int | None,
 879        Field(
 880            description="Optional maximum number of items to return (default: no limit)",
 881            default=None,
 882        ),
 883    ],
 884) -> list[CloudDestinationResult]:
 885    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 886    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 887    destinations = workspace.list_destinations(limit=None if name_contains else limit)
 888
 889    # Filter by name if requested
 890    if name_contains:
 891        needle = name_contains.lower()
 892        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 893    if limit is not None:
 894        destinations = destinations[:limit]
 895
 896    # Note: name and url are guaranteed non-null from list API responses
 897    return [
 898        CloudDestinationResult(
 899            id=destination.destination_id,
 900            name=cast(str, destination.name),
 901            url=cast(str, destination.connector_url),
 902        )
 903        for destination in destinations
 904    ]
 905
 906
 907@mcp_tool(
 908    read_only=True,
 909    idempotent=True,
 910    open_world=True,
 911    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 912)
 913def describe_cloud_source(
 914    ctx: Context,
 915    source_id: Annotated[
 916        str,
 917        Field(description="The ID of the source to describe."),
 918    ],
 919    *,
 920    workspace_id: Annotated[
 921        str | None,
 922        Field(
 923            description=WORKSPACE_ID_TIP_TEXT,
 924            default=None,
 925        ),
 926    ],
 927) -> CloudSourceDetails:
 928    """Get detailed information about a specific deployed source connector."""
 929    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 930    source = workspace.get_source(source_id=source_id)
 931
 932    # Access name property to ensure _connector_info is populated
 933    source_name = cast(str, source.name)
 934
 935    return CloudSourceDetails(
 936        source_id=source.source_id,
 937        source_name=source_name,
 938        source_url=source.connector_url,
 939        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 940    )
 941
 942
 943@mcp_tool(
 944    read_only=True,
 945    idempotent=True,
 946    open_world=True,
 947    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 948)
 949def describe_cloud_destination(
 950    ctx: Context,
 951    destination_id: Annotated[
 952        str,
 953        Field(description="The ID of the destination to describe."),
 954    ],
 955    *,
 956    workspace_id: Annotated[
 957        str | None,
 958        Field(
 959            description=WORKSPACE_ID_TIP_TEXT,
 960            default=None,
 961        ),
 962    ],
 963) -> CloudDestinationDetails:
 964    """Get detailed information about a specific deployed destination connector."""
 965    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 966    destination = workspace.get_destination(destination_id=destination_id)
 967
 968    # Access name property to ensure _connector_info is populated
 969    destination_name = cast(str, destination.name)
 970
 971    return CloudDestinationDetails(
 972        destination_id=destination.destination_id,
 973        destination_name=destination_name,
 974        destination_url=destination.connector_url,
 975        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 976    )
 977
 978
 979@mcp_tool(
 980    read_only=True,
 981    idempotent=True,
 982    open_world=True,
 983    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 984)
 985def describe_cloud_connection(
 986    ctx: Context,
 987    connection_id: Annotated[
 988        str,
 989        Field(description="The ID of the connection to describe."),
 990    ],
 991    *,
 992    workspace_id: Annotated[
 993        str | None,
 994        Field(
 995            description=WORKSPACE_ID_TIP_TEXT,
 996            default=None,
 997        ),
 998    ],
 999) -> CloudConnectionDetails:
1000    """Get detailed information about a specific deployed connection."""
1001    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1002    connection = workspace.get_connection(connection_id=connection_id)
1003
1004    return CloudConnectionDetails(
1005        connection_id=connection.connection_id,
1006        connection_name=cast(str, connection.name),
1007        connection_url=cast(str, connection.connection_url),
1008        source_id=connection.source_id,
1009        source_name=cast(str, connection.source.name),
1010        destination_id=connection.destination_id,
1011        destination_name=cast(str, connection.destination.name),
1012        selected_streams=connection.stream_names,
1013        table_prefix=connection.table_prefix,
1014    )
1015
1016
1017@mcp_tool(
1018    read_only=True,
1019    idempotent=True,
1020    open_world=True,
1021    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1022)
1023def get_cloud_sync_logs(
1024    ctx: Context,
1025    connection_id: Annotated[
1026        str,
1027        Field(description="The ID of the Airbyte Cloud connection."),
1028    ],
1029    job_id: Annotated[
1030        int | None,
1031        Field(description="Optional job ID. If not provided, the latest job will be used."),
1032    ] = None,
1033    attempt_number: Annotated[
1034        int | None,
1035        Field(
1036            description="Optional attempt number. If not provided, the latest attempt will be used."
1037        ),
1038    ] = None,
1039    *,
1040    workspace_id: Annotated[
1041        str | None,
1042        Field(
1043            description=WORKSPACE_ID_TIP_TEXT,
1044            default=None,
1045        ),
1046    ],
1047    max_lines: Annotated[
1048        int,
1049        Field(
1050            description=(
1051                "Maximum number of lines to return. "
1052                "Defaults to 4000 if not specified. "
1053                "If '0' is provided, no limit is applied."
1054            ),
1055            default=4000,
1056        ),
1057    ],
1058    from_tail: Annotated[
1059        bool | None,
1060        Field(
1061            description=(
1062                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1063                "Defaults to True if `line_offset` is not specified. "
1064                "Cannot combine `from_tail=True` with `line_offset`."
1065            ),
1066            default=None,
1067        ),
1068    ],
1069    line_offset: Annotated[
1070        int | None,
1071        Field(
1072            description=(
1073                "Number of lines to skip from the beginning of the logs. "
1074                "Cannot be combined with `from_tail=True`."
1075            ),
1076            default=None,
1077        ),
1078    ],
1079) -> LogReadResult:
1080    """Get the logs from a sync job attempt on Airbyte Cloud."""
1081    # Validate that line_offset and from_tail are not both set
1082    if line_offset is not None and from_tail:
1083        raise PyAirbyteInputError(
1084            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1085            context={"line_offset": line_offset, "from_tail": from_tail},
1086        )
1087
1088    if from_tail is None and line_offset is None:
1089        from_tail = True
1090    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1091    connection = workspace.get_connection(connection_id=connection_id)
1092
1093    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1094
1095    if not sync_result:
1096        raise AirbyteMissingResourceError(
1097            resource_type="sync job",
1098            resource_name_or_id=connection_id,
1099        )
1100
1101    attempts = sync_result.get_attempts()
1102
1103    if not attempts:
1104        raise AirbyteMissingResourceError(
1105            resource_type="sync attempt",
1106            resource_name_or_id=str(sync_result.job_id),
1107        )
1108
1109    if attempt_number is not None:
1110        target_attempt = None
1111        for attempt in attempts:
1112            if attempt.attempt_number == attempt_number:
1113                target_attempt = attempt
1114                break
1115
1116        if target_attempt is None:
1117            raise AirbyteMissingResourceError(
1118                resource_type="sync attempt",
1119                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1120            )
1121    else:
1122        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1123
1124    logs = target_attempt.get_full_log_text()
1125
1126    if not logs:
1127        # Return empty result with zero lines
1128        return LogReadResult(
1129            log_text=(
1130                f"[No logs available for job '{sync_result.job_id}', "
1131                f"attempt {target_attempt.attempt_number}.]"
1132            ),
1133            log_text_start_line=1,
1134            log_text_line_count=0,
1135            total_log_lines_available=0,
1136            job_id=sync_result.job_id,
1137            attempt_number=target_attempt.attempt_number,
1138        )
1139
1140    # Apply line limiting
1141    log_lines = logs.splitlines()
1142    total_lines = len(log_lines)
1143
1144    # Determine effective max_lines (0 means no limit)
1145    effective_max = total_lines if max_lines == 0 else max_lines
1146
1147    # Calculate start_index and slice based on from_tail or line_offset
1148    if from_tail:
1149        start_index = max(0, total_lines - effective_max)
1150        selected_lines = log_lines[start_index:][:effective_max]
1151    else:
1152        start_index = line_offset or 0
1153        selected_lines = log_lines[start_index : start_index + effective_max]
1154
1155    return LogReadResult(
1156        log_text="\n".join(selected_lines),
1157        log_text_start_line=start_index + 1,  # Convert to 1-based index
1158        log_text_line_count=len(selected_lines),
1159        total_log_lines_available=total_lines,
1160        job_id=sync_result.job_id,
1161        attempt_number=target_attempt.attempt_number,
1162    )
1163
1164
1165@mcp_tool(
1166    read_only=True,
1167    idempotent=True,
1168    open_world=True,
1169    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1170)
1171def list_deployed_cloud_connections(
1172    ctx: Context,
1173    *,
1174    workspace_id: Annotated[
1175        str | None,
1176        Field(
1177            description=WORKSPACE_ID_TIP_TEXT,
1178            default=None,
1179        ),
1180    ],
1181    name_contains: Annotated[
1182        str | None,
1183        Field(
1184            description="Optional case-insensitive substring to filter connections by name",
1185            default=None,
1186        ),
1187    ],
1188    limit: Annotated[
1189        int | None,
1190        Field(
1191            description="Optional maximum number of items to return (default: no limit)",
1192            default=None,
1193        ),
1194    ],
1195    with_connection_status: Annotated[
1196        bool | None,
1197        Field(
1198            description="If True, include status info for each connection's most recent sync job",
1199            default=False,
1200        ),
1201    ],
1202    failing_connections_only: Annotated[
1203        bool | None,
1204        Field(
1205            description="If True, only return connections with failed/cancelled last sync",
1206            default=False,
1207        ),
1208    ],
1209) -> list[CloudConnectionResult]:
1210    """List all deployed connections in the Airbyte Cloud workspace.
1211
1212    When with_connection_status is True, each connection result will include
1213    information about the most recent sync job status, skipping over any
1214    currently in-progress syncs to find the last completed job.
1215
1216    When failing_connections_only is True, only connections where the most
1217    recent completed sync job failed or was cancelled will be returned.
1218    This implicitly enables with_connection_status.
1219    """
1220    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1221    connections = workspace.list_connections(
1222        limit=None if name_contains or failing_connections_only else limit
1223    )
1224
1225    # Filter by name if requested
1226    if name_contains:
1227        needle = name_contains.lower()
1228        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1229
1230    # If failing_connections_only is True, implicitly enable with_connection_status
1231    if failing_connections_only:
1232        with_connection_status = True
1233
1234    results: list[CloudConnectionResult] = []
1235
1236    for connection in connections:
1237        last_job_status: str | None = None
1238        last_job_id: int | None = None
1239        last_job_time: str | None = None
1240        currently_running_job_id: int | None = None
1241        currently_running_job_start_time: str | None = None
1242
1243        if with_connection_status:
1244            sync_logs = connection.get_previous_sync_logs(limit=5)
1245            last_completed_job_status = None  # Keep enum for comparison
1246
1247            for sync_result in sync_logs:
1248                job_status = sync_result.get_job_status()
1249
1250                if not sync_result.is_job_complete():
1251                    currently_running_job_id = sync_result.job_id
1252                    currently_running_job_start_time = sync_result.start_time.isoformat()
1253                    continue
1254
1255                last_completed_job_status = job_status
1256                last_job_status = str(job_status.value) if job_status else None
1257                last_job_id = sync_result.job_id
1258                last_job_time = sync_result.start_time.isoformat()
1259                break
1260
1261            if failing_connections_only and (
1262                last_completed_job_status is None
1263                or last_completed_job_status not in FAILED_STATUSES
1264            ):
1265                continue
1266
1267        results.append(
1268            CloudConnectionResult(
1269                id=connection.connection_id,
1270                name=cast(str, connection.name),
1271                url=cast(str, connection.connection_url),
1272                source_id=connection.source_id,
1273                destination_id=connection.destination_id,
1274                last_job_status=last_job_status,
1275                last_job_id=last_job_id,
1276                last_job_time=last_job_time,
1277                currently_running_job_id=currently_running_job_id,
1278                currently_running_job_start_time=currently_running_job_start_time,
1279            )
1280        )
1281
1282        if limit is not None and len(results) >= limit:
1283            break
1284
1285    return results
1286
1287
1288def _resolve_organization_id(
1289    organization_id: str | None,
1290    organization_name: str | None,
1291    *,
1292    client: CloudClient,
1293) -> str:
1294    """Resolve organization ID from either ID or exact name match."""
1295    if organization_id is not None:
1296        return organization_id
1297
1298    org = client.get_organization(
1299        organization_id=organization_id,
1300        organization_name=organization_name,
1301    )
1302    return org.organization_id
1303
1304
1305@mcp_tool(
1306    read_only=True,
1307    idempotent=True,
1308    open_world=True,
1309    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1310)
1311def list_cloud_workspaces(
1312    ctx: Context,
1313    *,
1314    organization_id: Annotated[
1315        str | None,
1316        Field(
1317            description="Organization ID. Required if organization_name is not provided.",
1318            default=None,
1319        ),
1320    ],
1321    organization_name: Annotated[
1322        str | None,
1323        Field(
1324            description=(
1325                "Organization name (exact match). " "Required if organization_id is not provided."
1326            ),
1327            default=None,
1328        ),
1329    ],
1330    name_contains: Annotated[
1331        str | None,
1332        Field(
1333            description="Optional substring to filter workspaces by name (server-side filtering)",
1334            default=None,
1335        ),
1336    ],
1337    limit: Annotated[
1338        int | None,
1339        Field(
1340            description="Optional maximum number of items to return (default: no limit)",
1341            default=None,
1342        ),
1343    ],
1344) -> list[CloudWorkspaceResult]:
1345    """List all workspaces in a specific organization.
1346
1347    Requires either organization_id OR organization_name (exact match) to be provided.
1348    This tool will NOT list workspaces across all organizations - you must specify
1349    which organization to list workspaces from.
1350    """
1351    client = _get_cloud_client(ctx)
1352
1353    resolved_org_id = _resolve_organization_id(
1354        organization_id=organization_id,
1355        organization_name=organization_name,
1356        client=client,
1357    )
1358
1359    workspaces = client.list_workspaces(
1360        organization_id=resolved_org_id,
1361        name_contains=name_contains,
1362        limit=limit,
1363    )
1364
1365    return [
1366        CloudWorkspaceResult(
1367            workspace_id=ws.get("workspaceId", ""),
1368            workspace_name=ws.get("name", ""),
1369            organization_id=ws.get("organizationId", ""),
1370        )
1371        for ws in workspaces
1372    ]
1373
1374
1375@mcp_tool(
1376    read_only=True,
1377    idempotent=True,
1378    open_world=True,
1379    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1380)
1381def describe_cloud_organization(
1382    ctx: Context,
1383    *,
1384    organization_id: Annotated[
1385        str | None,
1386        Field(
1387            description="Organization ID. Required if organization_name is not provided.",
1388            default=None,
1389        ),
1390    ],
1391    organization_name: Annotated[
1392        str | None,
1393        Field(
1394            description=(
1395                "Organization name (exact match). " "Required if organization_id is not provided."
1396            ),
1397            default=None,
1398        ),
1399    ],
1400) -> CloudOrganizationResult:
1401    """Get details about a specific organization including billing status.
1402
1403    Requires either organization_id OR organization_name (exact match) to be provided.
1404    This tool is useful for looking up an organization's ID from its name, or vice versa.
1405    """
1406    org = _get_cloud_client(ctx).get_organization(
1407        organization_id=organization_id,
1408        organization_name=organization_name,
1409    )
1410
1411    # CloudOrganization has lazy loading of billing properties
1412    return CloudOrganizationResult(
1413        id=org.organization_id,
1414        name=org.organization_name,
1415        email=org.email,
1416        payment_status=org.payment_status,
1417        subscription_status=org.subscription_status,
1418        is_account_locked=org.is_account_locked,
1419    )
1420
1421
1422def _get_custom_source_definition_description(
1423    custom_source: CustomCloudSourceDefinition,
1424) -> str:
1425    return "\n".join(
1426        [
1427            f" - Custom Source Name: {custom_source.name}",
1428            f" - Definition ID: {custom_source.definition_id}",
1429            f" - Definition Version: {custom_source.version}",
1430            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1431            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1432        ]
1433    )
1434
1435
1436@mcp_tool(
1437    open_world=True,
1438    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1439)
1440def publish_custom_source_definition(
1441    ctx: Context,
1442    name: Annotated[
1443        str,
1444        Field(description="The name for the custom connector definition."),
1445    ],
1446    *,
1447    workspace_id: Annotated[
1448        str | None,
1449        Field(
1450            description=WORKSPACE_ID_TIP_TEXT,
1451            default=None,
1452        ),
1453    ],
1454    manifest_yaml: Annotated[
1455        str | Path | None,
1456        Field(
1457            description=(
1458                "The Low-code CDK manifest as a YAML string or file path. "
1459                "Required for YAML connectors."
1460            ),
1461            default=None,
1462        ),
1463    ] = None,
1464    unique: Annotated[
1465        bool,
1466        Field(
1467            description="Whether to require a unique name.",
1468            default=True,
1469        ),
1470    ] = True,
1471    pre_validate: Annotated[
1472        bool,
1473        Field(
1474            description="Whether to validate the manifest client-side before publishing.",
1475            default=True,
1476        ),
1477    ] = True,
1478    testing_values: Annotated[
1479        dict | str | None,
1480        Field(
1481            description=(
1482                "Optional testing configuration values for the Builder UI. "
1483                "Can be provided as a JSON object or JSON string. "
1484                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1485                "If provided, these values replace any existing testing values "
1486                "for the connector builder project, allowing immediate test read operations."
1487            ),
1488            default=None,
1489        ),
1490    ],
1491    testing_values_secret_name: Annotated[
1492        str | None,
1493        Field(
1494            description=(
1495                "Optional name of a secret containing testing configuration values "
1496                "in JSON or YAML format. The secret will be resolved by the MCP "
1497                "server and merged into testing_values, with secret values taking "
1498                "precedence. This lets the agent reference secrets without sending "
1499                "raw values as tool arguments."
1500            ),
1501            default=None,
1502        ),
1503    ],
1504) -> str:
1505    """Publish a custom YAML source connector definition to Airbyte Cloud.
1506
1507    Note: Only YAML (declarative) connectors are currently supported.
1508    Docker-based custom sources are not yet available.
1509    """
1510    processed_manifest = manifest_yaml
1511    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1512        processed_manifest = Path(manifest_yaml)
1513
1514    # Resolve testing values from inline config and/or secret
1515    testing_values_dict: dict[str, Any] | None = None
1516    if testing_values is not None or testing_values_secret_name is not None:
1517        testing_values_dict = (
1518            resolve_connector_config(
1519                config=testing_values,
1520                config_secret_name=testing_values_secret_name,
1521            )
1522            or None
1523        )
1524
1525    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1526    custom_source = workspace.publish_custom_source_definition(
1527        name=name,
1528        manifest_yaml=processed_manifest,
1529        unique=unique,
1530        pre_validate=pre_validate,
1531        testing_values=testing_values_dict,
1532    )
1533    register_guid_created_in_session(custom_source.definition_id)
1534    return (
1535        "Successfully published custom YAML source definition:\n"
1536        + _get_custom_source_definition_description(
1537            custom_source=custom_source,
1538        )
1539        + "\n"
1540    )
1541
1542
1543@mcp_tool(
1544    read_only=True,
1545    idempotent=True,
1546    open_world=True,
1547)
1548def list_custom_source_definitions(
1549    ctx: Context,
1550    *,
1551    workspace_id: Annotated[
1552        str | None,
1553        Field(
1554            description=WORKSPACE_ID_TIP_TEXT,
1555            default=None,
1556        ),
1557    ],
1558) -> list[dict[str, Any]]:
1559    """List custom YAML source definitions in the Airbyte Cloud workspace.
1560
1561    Note: Only YAML (declarative) connectors are currently supported.
1562    Docker-based custom sources are not yet available.
1563    """
1564    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1565    definitions = workspace.list_custom_source_definitions(
1566        definition_type="yaml",
1567    )
1568
1569    return [
1570        {
1571            "definition_id": d.definition_id,
1572            "name": d.name,
1573            "version": d.version,
1574            "connector_builder_project_url": d.connector_builder_project_url,
1575        }
1576        for d in definitions
1577    ]
1578
1579
1580@mcp_tool(
1581    read_only=True,
1582    idempotent=True,
1583    open_world=True,
1584)
1585def get_custom_source_definition(
1586    ctx: Context,
1587    definition_id: Annotated[
1588        str,
1589        Field(description="The ID of the custom source definition to retrieve."),
1590    ],
1591    *,
1592    workspace_id: Annotated[
1593        str | None,
1594        Field(
1595            description=WORKSPACE_ID_TIP_TEXT,
1596            default=None,
1597        ),
1598    ],
1599    include_draft: Annotated[
1600        bool,
1601        Field(
1602            description=(
1603                "Whether to include the Connector Builder draft manifest in the response. "
1604                "If True and a draft exists, the response will include 'has_draft' and "
1605                "'draft_manifest' fields. Defaults to False."
1606            ),
1607            default=False,
1608        ),
1609    ] = False,
1610) -> dict[str, Any]:
1611    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1612
1613    Returns the full definition details including the published manifest YAML content.
1614    Optionally includes the Connector Builder draft manifest (unpublished changes)
1615    when include_draft=True.
1616
1617    Note: Only YAML (declarative) connectors are currently supported.
1618    Docker-based custom sources are not yet available.
1619    """
1620    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1621    definition = workspace.get_custom_source_definition(
1622        definition_id=definition_id,
1623        definition_type="yaml",
1624    )
1625
1626    result: dict[str, Any] = {
1627        "definition_id": definition.definition_id,
1628        "name": definition.name,
1629        "version": definition.version,
1630        "connector_builder_project_id": definition.connector_builder_project_id,
1631        "connector_builder_project_url": definition.connector_builder_project_url,
1632        "manifest": definition.manifest,
1633    }
1634
1635    if include_draft:
1636        result["has_draft"] = definition.has_draft
1637        result["draft_manifest"] = definition.draft_manifest
1638
1639    return result
1640
1641
1642@mcp_tool(
1643    read_only=True,
1644    idempotent=True,
1645    open_world=True,
1646)
1647def get_connector_builder_draft_manifest(
1648    ctx: Context,
1649    definition_id: Annotated[
1650        str,
1651        Field(description="The ID of the custom source definition to retrieve the draft for."),
1652    ],
1653    *,
1654    workspace_id: Annotated[
1655        str | None,
1656        Field(
1657            description=WORKSPACE_ID_TIP_TEXT,
1658            default=None,
1659        ),
1660    ],
1661) -> dict[str, Any]:
1662    """Get the Connector Builder draft manifest for a custom source definition.
1663
1664    Returns the working draft manifest that has been saved in the Connector Builder UI
1665    but not yet published. This is useful for inspecting what a user is currently working
1666    on before they publish their changes.
1667
1668    If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None.
1669    The published manifest is always included for comparison.
1670    """
1671    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1672    definition = workspace.get_custom_source_definition(
1673        definition_id=definition_id,
1674        definition_type="yaml",
1675    )
1676
1677    return {
1678        "definition_id": definition.definition_id,
1679        "name": definition.name,
1680        "connector_builder_project_id": definition.connector_builder_project_id,
1681        "connector_builder_project_url": definition.connector_builder_project_url,
1682        "has_draft": definition.has_draft,
1683        "draft_manifest": definition.draft_manifest,
1684        "published_manifest": definition.manifest,
1685    }
1686
1687
1688@mcp_tool(
1689    destructive=True,
1690    open_world=True,
1691)
1692def update_custom_source_definition(
1693    ctx: Context,
1694    definition_id: Annotated[
1695        str,
1696        Field(description="The ID of the definition to update."),
1697    ],
1698    manifest_yaml: Annotated[
1699        str | Path | None,
1700        Field(
1701            description=(
1702                "New manifest as YAML string or file path. "
1703                "Optional; omit to update only testing values."
1704            ),
1705            default=None,
1706        ),
1707    ] = None,
1708    *,
1709    workspace_id: Annotated[
1710        str | None,
1711        Field(
1712            description=WORKSPACE_ID_TIP_TEXT,
1713            default=None,
1714        ),
1715    ],
1716    pre_validate: Annotated[
1717        bool,
1718        Field(
1719            description="Whether to validate the manifest client-side before updating.",
1720            default=True,
1721        ),
1722    ] = True,
1723    testing_values: Annotated[
1724        dict | str | None,
1725        Field(
1726            description=(
1727                "Optional testing configuration values for the Builder UI. "
1728                "Can be provided as a JSON object or JSON string. "
1729                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1730                "If provided, these values replace any existing testing values "
1731                "for the connector builder project. The entire testing values object "
1732                "is overwritten, so pass the full set of values you want to persist."
1733            ),
1734            default=None,
1735        ),
1736    ],
1737    testing_values_secret_name: Annotated[
1738        str | None,
1739        Field(
1740            description=(
1741                "Optional name of a secret containing testing configuration values "
1742                "in JSON or YAML format. The secret will be resolved by the MCP "
1743                "server and merged into testing_values, with secret values taking "
1744                "precedence. This lets the agent reference secrets without sending "
1745                "raw values as tool arguments."
1746            ),
1747            default=None,
1748        ),
1749    ],
1750) -> str:
1751    """Update a custom YAML source definition in Airbyte Cloud.
1752
1753    Updates the manifest and/or testing values for an existing custom source definition.
1754    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1755    """
1756    check_guid_created_in_session(definition_id)
1757
1758    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1759
1760    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1761        raise PyAirbyteInputError(
1762            message=(
1763                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1764                "must be provided to update a custom source definition."
1765            ),
1766            context={
1767                "definition_id": definition_id,
1768                "workspace_id": workspace.workspace_id,
1769            },
1770        )
1771
1772    processed_manifest: str | Path | None = manifest_yaml
1773    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1774        processed_manifest = Path(manifest_yaml)
1775
1776    # Resolve testing values from inline config and/or secret
1777    testing_values_dict: dict[str, Any] | None = None
1778    if testing_values is not None or testing_values_secret_name is not None:
1779        testing_values_dict = (
1780            resolve_connector_config(
1781                config=testing_values,
1782                config_secret_name=testing_values_secret_name,
1783            )
1784            or None
1785        )
1786
1787    definition = workspace.get_custom_source_definition(
1788        definition_id=definition_id,
1789        definition_type="yaml",
1790    )
1791    custom_source: CustomCloudSourceDefinition = definition
1792
1793    if processed_manifest is not None:
1794        custom_source = definition.update_definition(
1795            manifest_yaml=processed_manifest,
1796            pre_validate=pre_validate,
1797        )
1798
1799    if testing_values_dict is not None:
1800        custom_source.set_testing_values(testing_values_dict)
1801
1802    return (
1803        "Successfully updated custom YAML source definition:\n"
1804        + _get_custom_source_definition_description(
1805            custom_source=custom_source,
1806        )
1807    )
1808
1809
1810@mcp_tool(
1811    destructive=True,
1812    open_world=True,
1813)
1814def permanently_delete_custom_source_definition(
1815    ctx: Context,
1816    definition_id: Annotated[
1817        str,
1818        Field(description="The ID of the custom source definition to delete."),
1819    ],
1820    name: Annotated[
1821        str,
1822        Field(description="The expected name of the custom source definition (for verification)."),
1823    ],
1824    *,
1825    workspace_id: Annotated[
1826        str | None,
1827        Field(
1828            description=WORKSPACE_ID_TIP_TEXT,
1829            default=None,
1830        ),
1831    ],
1832) -> str:
1833    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1834
1835    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1836    (case insensitive).
1837
1838    If the connector does not meet this requirement, the deletion will be rejected with a
1839    helpful error message. Instruct the user to rename the connector appropriately to authorize
1840    the deletion.
1841
1842    The provided name must match the actual name of the definition for the operation to proceed.
1843    This is a safety measure to ensure you are deleting the correct resource.
1844
1845    Note: Only YAML (declarative) connectors are currently supported.
1846    Docker-based custom sources are not yet available.
1847    """
1848    check_guid_created_in_session(definition_id)
1849    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1850    definition = workspace.get_custom_source_definition(
1851        definition_id=definition_id,
1852        definition_type="yaml",
1853    )
1854    actual_name: str = definition.name
1855
1856    # Verify the name matches
1857    if actual_name != name:
1858        raise PyAirbyteInputError(
1859            message=(
1860                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1861                "The provided name must exactly match the definition's actual name. "
1862                "This is a safety measure to prevent accidental deletion."
1863            ),
1864            context={
1865                "definition_id": definition_id,
1866                "expected_name": name,
1867                "actual_name": actual_name,
1868            },
1869        )
1870
1871    definition.permanently_delete(
1872        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1873    )
1874    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
1875
1876
1877@mcp_tool(
1878    destructive=True,
1879    open_world=True,
1880    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1881)
1882def permanently_delete_cloud_source(
1883    ctx: Context,
1884    source_id: Annotated[
1885        str,
1886        Field(description="The ID of the deployed source to delete."),
1887    ],
1888    name: Annotated[
1889        str,
1890        Field(description="The expected name of the source (for verification)."),
1891    ],
1892) -> str:
1893    """Permanently delete a deployed source connector from Airbyte Cloud.
1894
1895    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1896    (case insensitive).
1897
1898    If the source does not meet this requirement, the deletion will be rejected with a
1899    helpful error message. Instruct the user to rename the source appropriately to authorize
1900    the deletion.
1901
1902    The provided name must match the actual name of the source for the operation to proceed.
1903    This is a safety measure to ensure you are deleting the correct resource.
1904    """
1905    check_guid_created_in_session(source_id)
1906    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
1907    source = workspace.get_source(source_id=source_id)
1908    actual_name: str = cast(str, source.name)
1909
1910    # Verify the name matches
1911    if actual_name != name:
1912        raise PyAirbyteInputError(
1913            message=(
1914                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1915                "The provided name must exactly match the source's actual name. "
1916                "This is a safety measure to prevent accidental deletion."
1917            ),
1918            context={
1919                "source_id": source_id,
1920                "expected_name": name,
1921                "actual_name": actual_name,
1922            },
1923        )
1924
1925    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1926    workspace.permanently_delete_source(
1927        source=source_id,
1928        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1929    )
1930    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
1931
1932
1933@mcp_tool(
1934    destructive=True,
1935    open_world=True,
1936    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1937)
1938def permanently_delete_cloud_destination(
1939    ctx: Context,
1940    destination_id: Annotated[
1941        str,
1942        Field(description="The ID of the deployed destination to delete."),
1943    ],
1944    name: Annotated[
1945        str,
1946        Field(description="The expected name of the destination (for verification)."),
1947    ],
1948) -> str:
1949    """Permanently delete a deployed destination connector from Airbyte Cloud.
1950
1951    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1952    (case insensitive).
1953
1954    If the destination does not meet this requirement, the deletion will be rejected with a
1955    helpful error message. Instruct the user to rename the destination appropriately to authorize
1956    the deletion.
1957
1958    The provided name must match the actual name of the destination for the operation to proceed.
1959    This is a safety measure to ensure you are deleting the correct resource.
1960    """
1961    check_guid_created_in_session(destination_id)
1962    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
1963    destination = workspace.get_destination(destination_id=destination_id)
1964    actual_name: str = cast(str, destination.name)
1965
1966    # Verify the name matches
1967    if actual_name != name:
1968        raise PyAirbyteInputError(
1969            message=(
1970                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1971                "The provided name must exactly match the destination's actual name. "
1972                "This is a safety measure to prevent accidental deletion."
1973            ),
1974            context={
1975                "destination_id": destination_id,
1976                "expected_name": name,
1977                "actual_name": actual_name,
1978            },
1979        )
1980
1981    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1982    workspace.permanently_delete_destination(
1983        destination=destination_id,
1984        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1985    )
1986    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
1987
1988
1989@mcp_tool(
1990    destructive=True,
1991    open_world=True,
1992    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1993)
1994def permanently_delete_cloud_connection(
1995    ctx: Context,
1996    connection_id: Annotated[
1997        str,
1998        Field(description="The ID of the connection to delete."),
1999    ],
2000    name: Annotated[
2001        str,
2002        Field(description="The expected name of the connection (for verification)."),
2003    ],
2004    *,
2005    cascade_delete_source: Annotated[
2006        bool,
2007        Field(
2008            description=(
2009                "Whether to also delete the source connector associated with this connection."
2010            ),
2011            default=False,
2012        ),
2013    ] = False,
2014    cascade_delete_destination: Annotated[
2015        bool,
2016        Field(
2017            description=(
2018                "Whether to also delete the destination connector associated with this connection."
2019            ),
2020            default=False,
2021        ),
2022    ] = False,
2023) -> str:
2024    """Permanently delete a connection from Airbyte Cloud.
2025
2026    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2027    (case insensitive).
2028
2029    If the connection does not meet this requirement, the deletion will be rejected with a
2030    helpful error message. Instruct the user to rename the connection appropriately to authorize
2031    the deletion.
2032
2033    The provided name must match the actual name of the connection for the operation to proceed.
2034    This is a safety measure to ensure you are deleting the correct resource.
2035    """
2036    check_guid_created_in_session(connection_id)
2037    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2038    connection = workspace.get_connection(connection_id=connection_id)
2039    actual_name: str = cast(str, connection.name)
2040
2041    # Verify the name matches
2042    if actual_name != name:
2043        raise PyAirbyteInputError(
2044            message=(
2045                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2046                "The provided name must exactly match the connection's actual name. "
2047                "This is a safety measure to prevent accidental deletion."
2048            ),
2049            context={
2050                "connection_id": connection_id,
2051                "expected_name": name,
2052                "actual_name": actual_name,
2053            },
2054        )
2055
2056    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2057    workspace.permanently_delete_connection(
2058        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2059        connection=connection_id,
2060        cascade_delete_source=cascade_delete_source,
2061        cascade_delete_destination=cascade_delete_destination,
2062    )
2063    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
2064
2065
2066@mcp_tool(
2067    open_world=True,
2068    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2069)
2070def rename_cloud_source(
2071    ctx: Context,
2072    source_id: Annotated[
2073        str,
2074        Field(description="The ID of the deployed source to rename."),
2075    ],
2076    name: Annotated[
2077        str,
2078        Field(description="New name for the source."),
2079    ],
2080    *,
2081    workspace_id: Annotated[
2082        str | None,
2083        Field(
2084            description=WORKSPACE_ID_TIP_TEXT,
2085            default=None,
2086        ),
2087    ],
2088) -> str:
2089    """Rename a deployed source connector on Airbyte Cloud."""
2090    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2091    source = workspace.get_source(source_id=source_id)
2092    source.rename(name=name)
2093    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
2094
2095
2096@mcp_tool(
2097    destructive=True,
2098    open_world=True,
2099    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2100)
2101def update_cloud_source_config(
2102    ctx: Context,
2103    source_id: Annotated[
2104        str,
2105        Field(description="The ID of the deployed source to update."),
2106    ],
2107    config: Annotated[
2108        dict | str,
2109        Field(
2110            description="New configuration for the source connector.",
2111        ),
2112    ],
2113    config_secret_name: Annotated[
2114        str | None,
2115        Field(
2116            description="The name of the secret containing the configuration.",
2117            default=None,
2118        ),
2119    ] = None,
2120    *,
2121    workspace_id: Annotated[
2122        str | None,
2123        Field(
2124            description=WORKSPACE_ID_TIP_TEXT,
2125            default=None,
2126        ),
2127    ],
2128) -> str:
2129    """Update a deployed source connector's configuration on Airbyte Cloud.
2130
2131    This is a destructive operation that can break existing connections if the
2132    configuration is changed incorrectly. Use with caution.
2133    """
2134    check_guid_created_in_session(source_id)
2135    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2136    source = workspace.get_source(source_id=source_id)
2137
2138    config_dict = resolve_connector_config(
2139        config=config,
2140        config_secret_name=config_secret_name,
2141        config_spec_jsonschema=None,  # We don't have the spec here
2142    )
2143
2144    source.update_config(config=config_dict)
2145    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
2146
2147
2148@mcp_tool(
2149    open_world=True,
2150    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2151)
2152def rename_cloud_destination(
2153    ctx: Context,
2154    destination_id: Annotated[
2155        str,
2156        Field(description="The ID of the deployed destination to rename."),
2157    ],
2158    name: Annotated[
2159        str,
2160        Field(description="New name for the destination."),
2161    ],
2162    *,
2163    workspace_id: Annotated[
2164        str | None,
2165        Field(
2166            description=WORKSPACE_ID_TIP_TEXT,
2167            default=None,
2168        ),
2169    ],
2170) -> str:
2171    """Rename a deployed destination connector on Airbyte Cloud."""
2172    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2173    destination = workspace.get_destination(destination_id=destination_id)
2174    destination.rename(name=name)
2175    return (
2176        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2177        f"URL: {destination.connector_url}"
2178    )
2179
2180
2181@mcp_tool(
2182    destructive=True,
2183    open_world=True,
2184    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2185)
2186def update_cloud_destination_config(
2187    ctx: Context,
2188    destination_id: Annotated[
2189        str,
2190        Field(description="The ID of the deployed destination to update."),
2191    ],
2192    config: Annotated[
2193        dict | str,
2194        Field(
2195            description="New configuration for the destination connector.",
2196        ),
2197    ],
2198    config_secret_name: Annotated[
2199        str | None,
2200        Field(
2201            description="The name of the secret containing the configuration.",
2202            default=None,
2203        ),
2204    ],
2205    *,
2206    workspace_id: Annotated[
2207        str | None,
2208        Field(
2209            description=WORKSPACE_ID_TIP_TEXT,
2210            default=None,
2211        ),
2212    ],
2213) -> str:
2214    """Update a deployed destination connector's configuration on Airbyte Cloud.
2215
2216    This is a destructive operation that can break existing connections if the
2217    configuration is changed incorrectly. Use with caution.
2218    """
2219    check_guid_created_in_session(destination_id)
2220    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2221    destination = workspace.get_destination(destination_id=destination_id)
2222
2223    config_dict = resolve_connector_config(
2224        config=config,
2225        config_secret_name=config_secret_name,
2226        config_spec_jsonschema=None,  # We don't have the spec here
2227    )
2228
2229    destination.update_config(config=config_dict)
2230    return (
2231        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2232    )
2233
2234
2235@mcp_tool(
2236    open_world=True,
2237    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2238)
2239def rename_cloud_connection(
2240    ctx: Context,
2241    connection_id: Annotated[
2242        str,
2243        Field(description="The ID of the connection to rename."),
2244    ],
2245    name: Annotated[
2246        str,
2247        Field(description="New name for the connection."),
2248    ],
2249    *,
2250    workspace_id: Annotated[
2251        str | None,
2252        Field(
2253            description=WORKSPACE_ID_TIP_TEXT,
2254            default=None,
2255        ),
2256    ],
2257) -> str:
2258    """Rename a connection on Airbyte Cloud."""
2259    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2260    connection = workspace.get_connection(connection_id=connection_id)
2261    connection.rename(name=name)
2262    return (
2263        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2264        f"URL: {connection.connection_url}"
2265    )
2266
2267
2268@mcp_tool(
2269    destructive=True,
2270    open_world=True,
2271    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2272)
2273def set_cloud_connection_table_prefix(
2274    ctx: Context,
2275    connection_id: Annotated[
2276        str,
2277        Field(description="The ID of the connection to update."),
2278    ],
2279    prefix: Annotated[
2280        str,
2281        Field(description="New table prefix to use when syncing to the destination."),
2282    ],
2283    *,
2284    workspace_id: Annotated[
2285        str | None,
2286        Field(
2287            description=WORKSPACE_ID_TIP_TEXT,
2288            default=None,
2289        ),
2290    ],
2291) -> str:
2292    """Set the table prefix for a connection on Airbyte Cloud.
2293
2294    This is a destructive operation that can break downstream dependencies if the
2295    table prefix is changed incorrectly. Use with caution.
2296    """
2297    check_guid_created_in_session(connection_id)
2298    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2299    connection = workspace.get_connection(connection_id=connection_id)
2300    connection.set_table_prefix(prefix=prefix)
2301    return (
2302        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2303        f"URL: {connection.connection_url}"
2304    )
2305
2306
2307@mcp_tool(
2308    destructive=True,
2309    open_world=True,
2310    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2311)
2312def set_cloud_connection_selected_streams(
2313    ctx: Context,
2314    connection_id: Annotated[
2315        str,
2316        Field(description="The ID of the connection to update."),
2317    ],
2318    stream_names: Annotated[
2319        str | list[str],
2320        Field(
2321            description=(
2322                "The selected stream names to sync within the connection. "
2323                "Must be an explicit stream name or list of streams."
2324            )
2325        ),
2326    ],
2327    *,
2328    workspace_id: Annotated[
2329        str | None,
2330        Field(
2331            description=WORKSPACE_ID_TIP_TEXT,
2332            default=None,
2333        ),
2334    ],
2335) -> str:
2336    """Set the selected streams for a connection on Airbyte Cloud.
2337
2338    This is a destructive operation that can break existing connections if the
2339    stream selection is changed incorrectly. Use with caution.
2340    """
2341    check_guid_created_in_session(connection_id)
2342    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2343    connection = workspace.get_connection(connection_id=connection_id)
2344
2345    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2346    connection.set_selected_streams(stream_names=resolved_streams_list)
2347
2348    return (
2349        f"Successfully set selected streams for connection '{connection_id}' "
2350        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2351    )
2352
2353
2354@mcp_tool(
2355    open_world=True,
2356    destructive=True,
2357    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2358)
2359def update_cloud_connection(
2360    ctx: Context,
2361    connection_id: Annotated[
2362        str,
2363        Field(description="The ID of the connection to update."),
2364    ],
2365    *,
2366    enabled: Annotated[
2367        bool | None,
2368        Field(
2369            description=(
2370                "Set the connection's enabled status. "
2371                "True enables the connection (status='active'), "
2372                "False disables it (status='inactive'). "
2373                "Leave unset to keep the current status."
2374            ),
2375            default=None,
2376        ),
2377    ],
2378    cron_expression: Annotated[
2379        str | None,
2380        Field(
2381            description=(
2382                "A cron expression defining when syncs should run. "
2383                "Examples: '0 0 * * *' (daily at midnight UTC), "
2384                "'0 */6 * * *' (every 6 hours), "
2385                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2386                "Leave unset to keep the current schedule. "
2387                "Cannot be used together with 'manual_schedule'."
2388            ),
2389            default=None,
2390        ),
2391    ],
2392    manual_schedule: Annotated[
2393        bool | None,
2394        Field(
2395            description=(
2396                "Set to True to disable automatic syncs (manual scheduling only). "
2397                "Syncs will only run when manually triggered. "
2398                "Cannot be used together with 'cron_expression'."
2399            ),
2400            default=None,
2401        ),
2402    ],
2403    workspace_id: Annotated[
2404        str | None,
2405        Field(
2406            description=WORKSPACE_ID_TIP_TEXT,
2407            default=None,
2408        ),
2409    ],
2410) -> str:
2411    """Update a connection's settings on Airbyte Cloud.
2412
2413    This tool allows updating multiple connection settings in a single call:
2414    - Enable or disable the connection
2415    - Set a cron schedule for automatic syncs
2416    - Switch to manual scheduling (no automatic syncs)
2417
2418    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2419    parameters are mutually exclusive.
2420    """
2421    check_guid_created_in_session(connection_id)
2422
2423    # Validate that at least one setting is provided
2424    if enabled is None and cron_expression is None and manual_schedule is None:
2425        raise ValueError(
2426            "At least one setting must be provided: 'enabled', 'cron_expression', "
2427            "or 'manual_schedule'."
2428        )
2429
2430    # Validate mutually exclusive schedule options
2431    if cron_expression is not None and manual_schedule is True:
2432        raise ValueError(
2433            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2434            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2435            "for manual-only syncs."
2436        )
2437
2438    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2439    connection = workspace.get_connection(connection_id=connection_id)
2440
2441    changes_made: list[str] = []
2442
2443    # Apply enabled status change
2444    if enabled is not None:
2445        connection.set_enabled(enabled=enabled)
2446        status_str = "enabled" if enabled else "disabled"
2447        changes_made.append(f"status set to '{status_str}'")
2448
2449    # Apply schedule change
2450    if cron_expression is not None:
2451        connection.set_schedule(cron_expression=cron_expression)
2452        changes_made.append(f"schedule set to '{cron_expression}'")
2453    elif manual_schedule is True:
2454        connection.set_manual_schedule()
2455        changes_made.append("schedule set to 'manual'")
2456
2457    changes_summary = ", ".join(changes_made)
2458    return (
2459        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2460        f"URL: {connection.connection_url}"
2461    )
2462
2463
2464@mcp_tool(
2465    read_only=True,
2466    idempotent=True,
2467    open_world=True,
2468    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2469)
2470def get_connection_artifact(
2471    ctx: Context,
2472    connection_id: Annotated[
2473        str,
2474        Field(description="The ID of the Airbyte Cloud connection."),
2475    ],
2476    artifact_type: Annotated[
2477        Literal["state", "catalog"],
2478        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2479    ],
2480    *,
2481    workspace_id: Annotated[
2482        str | None,
2483        Field(
2484            description=WORKSPACE_ID_TIP_TEXT,
2485            default=None,
2486        ),
2487    ],
2488) -> dict[str, Any] | list[dict[str, Any]]:
2489    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2490
2491    By default, returns artifacts in Airbyte protocol format (snake_case,
2492    suitable for passing to connector CLI flags like `--state` or `--catalog`).
2493
2494    Retrieves the specified artifact for a connection:
2495    - `state`: Returns a list of protocol-format `AirbyteStateMessage` dicts,
2496      or `{"ERROR": "..."}` if no state is set.
2497    - `catalog`: Returns the protocol-format `ConfiguredAirbyteCatalog` dict,
2498      or `{"ERROR": "..."}` if not found.
2499    """
2500    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2501    connection = workspace.get_connection(connection_id=connection_id)
2502
2503    if artifact_type == "state":
2504        state = connection.dump_raw_state()
2505        if not state:
2506            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2507        return state
2508
2509    # artifact_type == "catalog"
2510    catalog = connection.dump_raw_catalog()
2511    if catalog is None:
2512        return {"ERROR": "No catalog found for this connection"}
2513    return catalog
2514
2515
2516def _add_defaults_for_exclude_args(
2517    exclude_args: list[str],
2518) -> None:
2519    """Patch registered tool functions to add Python-level defaults for excluded args.
2520
2521    FastMCP requires that excluded args have Python-level default values, but MCP tool
2522    functions should only use Field(default=...) in their Annotated type hints (not
2523    Python-level `= None`). This function bridges the gap by dynamically adding Python
2524    defaults to the function signatures at registration time, so the source code stays
2525    clean while satisfying FastMCP's requirement.
2526
2527    Args:
2528        exclude_args: List of argument names that will be excluded from the tool schema.
2529    """
2530    import inspect  # noqa: PLC0415  # Local import for optional patching logic
2531
2532    from fastmcp_extensions.decorators import (  # noqa: PLC0415
2533        _REGISTERED_TOOLS,  # noqa: PLC2701
2534    )
2535
2536    for func, _annotations in _REGISTERED_TOOLS:
2537        sig = inspect.signature(func)
2538        needs_patch = any(
2539            arg_name in sig.parameters
2540            and sig.parameters[arg_name].default is inspect.Parameter.empty
2541            for arg_name in exclude_args
2542        )
2543        if needs_patch:
2544            new_params = [
2545                p.replace(default=None)
2546                if name in exclude_args and p.default is inspect.Parameter.empty
2547                else p
2548                for name, p in sig.parameters.items()
2549            ]
2550            func.__signature__ = sig.replace(parameters=new_params)  # type: ignore[attr-defined]
2551
2552
2553def register_cloud_tools(app: FastMCP) -> None:
2554    """Register cloud tools with the FastMCP app.
2555
2556    Args:
2557        app: FastMCP application instance
2558    """
2559    exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None
2560    if exclude_args:
2561        _add_defaults_for_exclude_args(exclude_args)
2562    register_mcp_tools(
2563        app,
2564        mcp_module=__name__,
2565        exclude_args=exclude_args,
2566    )