airbyte.mcp.cloud

Airbyte Cloud MCP operations.

cloud module

MCP primitives registered by the cloud module of the airbyte-mcp server: 35 tool(s), 0 prompt(s), 0 resource(s).

Tools (35)

check_airbyte_cloud_workspace

Hints: read-only · idempotent · open-world

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace details including workspace ID, name, organization info, and billing status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Information about a workspace in Airbyte Cloud.",
  "properties": {
    "workspace_id": {
      "type": "string"
    },
    "workspace_name": {
      "type": "string"
    },
    "workspace_url": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "organization_id": {
      "type": "string"
    },
    "organization_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "payment_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "subscription_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "is_account_locked": {
      "default": false,
      "type": "boolean"
    }
  },
  "required": [
    "workspace_id",
    "workspace_name",
    "organization_id"
  ],
  "type": "object"
}

create_connection_on_cloud

Hints: open-world

Create a connection between a deployed source and destination on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_name string yes The name of the connection.
source_id string yes The ID of the deployed source.
destination_id string yes The ID of the deployed destination.
selected_streams string | array<string> yes The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
table_prefix string | null no null Optional table prefix to use when syncing to the destination.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_name": {
      "description": "The name of the connection.",
      "type": "string"
    },
    "source_id": {
      "description": "The ID of the deployed source.",
      "type": "string"
    },
    "destination_id": {
      "description": "The ID of the deployed destination.",
      "type": "string"
    },
    "selected_streams": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        }
      ],
      "description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "table_prefix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional table prefix to use when syncing to the destination."
    }
  },
  "required": [
    "connection_name",
    "source_id",
    "destination_id",
    "selected_streams"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

deploy_destination_to_cloud

Hints: open-world

Deploy a destination connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_name string yes The name to use when deploying the destination.
destination_connector_name string yes The name of the destination connector (e.g., 'destination-postgres').
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
config object | string | null no null The configuration for the destination connector.
config_secret_name string | null no null The name of the secret containing the configuration.
unique boolean no true Whether to require a unique name.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_name": {
      "description": "The name to use when deploying the destination.",
      "type": "string"
    },
    "destination_connector_name": {
      "description": "The name of the destination connector (e.g., 'destination-postgres').",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the destination connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "unique": {
      "default": true,
      "description": "Whether to require a unique name.",
      "type": "boolean"
    }
  },
  "required": [
    "destination_name",
    "destination_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

deploy_noop_destination_to_cloud

Hints: open-world

Deploy the No-op destination to Airbyte Cloud for testing purposes.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
name string no "No-op Destination"
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
unique boolean no true

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "name": {
      "default": "No-op Destination",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "unique": {
      "default": true,
      "type": "boolean"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

deploy_source_to_cloud

Hints: open-world

Deploy a source connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_name string yes The name to use when deploying the source.
source_connector_name string yes The name of the source connector (e.g., 'source-faker').
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
config object | string | null no null The configuration for the source connector.
config_secret_name string | null no null The name of the secret containing the configuration.
unique boolean no true Whether to require a unique name.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_name": {
      "description": "The name to use when deploying the source.",
      "type": "string"
    },
    "source_connector_name": {
      "description": "The name of the source connector (e.g., 'source-faker').",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The configuration for the source connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "unique": {
      "default": true,
      "description": "Whether to require a unique name.",
      "type": "boolean"
    }
  },
  "required": [
    "source_name",
    "source_connector_name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

describe_cloud_connection

Hints: read-only · idempotent · open-world

Get detailed information about a specific deployed connection.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to describe.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to describe.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Detailed information about a deployed connection in Airbyte Cloud.",
  "properties": {
    "connection_id": {
      "type": "string"
    },
    "connection_name": {
      "type": "string"
    },
    "connection_url": {
      "type": "string"
    },
    "source_id": {
      "type": "string"
    },
    "source_name": {
      "type": "string"
    },
    "destination_id": {
      "type": "string"
    },
    "destination_name": {
      "type": "string"
    },
    "selected_streams": {
      "items": {
        "type": "string"
      },
      "type": "array"
    },
    "table_prefix": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "required": [
    "connection_id",
    "connection_name",
    "connection_url",
    "source_id",
    "source_name",
    "destination_id",
    "destination_name",
    "selected_streams",
    "table_prefix"
  ],
  "type": "object"
}

describe_cloud_destination

Hints: read-only · idempotent · open-world

Get detailed information about a specific deployed destination connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the destination to describe.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the destination to describe.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "destination_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Detailed information about a deployed destination connector in Airbyte Cloud.",
  "properties": {
    "destination_id": {
      "type": "string"
    },
    "destination_name": {
      "type": "string"
    },
    "destination_url": {
      "type": "string"
    },
    "connector_definition_id": {
      "type": "string"
    }
  },
  "required": [
    "destination_id",
    "destination_name",
    "destination_url",
    "connector_definition_id"
  ],
  "type": "object"
}

describe_cloud_organization

Hints: read-only · idempotent · open-world

Get details about a specific organization including billing status.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
organization_id string | null no null Organization ID. Required if organization_name is not provided.
organization_name string | null no null Organization name (exact match). Required if organization_id is not provided.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization ID. Required if organization_name is not provided."
    },
    "organization_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization name (exact match). Required if organization_id is not provided."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "description": "Information about an organization in Airbyte Cloud.",
  "properties": {
    "id": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "email": {
      "type": "string"
    },
    "payment_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "subscription_status": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "is_account_locked": {
      "default": false,
      "type": "boolean"
    }
  },
  "required": [
    "id",
    "name",
    "email"
  ],
  "type": "object"
}

describe_cloud_source

Hints: read-only · idempotent · open-world

Get detailed information about a specific deployed source connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the source to describe.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the source to describe.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "source_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Detailed information about a deployed source connector in Airbyte Cloud.",
  "properties": {
    "source_id": {
      "type": "string"
    },
    "source_name": {
      "type": "string"
    },
    "source_url": {
      "type": "string"
    },
    "connector_definition_id": {
      "type": "string"
    }
  },
  "required": [
    "source_id",
    "source_name",
    "source_url",
    "connector_definition_id"
  ],
  "type": "object"
}

get_cloud_sync_logs

Hints: read-only · idempotent · open-world

Get the logs from a sync job attempt on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
job_id integer | null | null no null
attempt_number integer | null | null no null
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
max_lines integer no 4000 Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.
from_tail boolean | null no null Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if line_offset is not specified. Cannot combine from_tail=True with line_offset.
line_offset integer | null no null Number of lines to skip from the beginning of the logs. Cannot be combined with from_tail=True.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "job_id": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "description": "Optional job ID. If not provided, the latest job will be used."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "attempt_number": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "integer"
            },
            {
              "type": "null"
            }
          ],
          "description": "Optional attempt number. If not provided, the latest attempt will be used."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "max_lines": {
      "default": 4000,
      "description": "Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.",
      "type": "integer"
    },
    "from_tail": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`."
    },
    "line_offset": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of reading sync logs with pagination support.",
  "properties": {
    "job_id": {
      "type": "integer"
    },
    "attempt_number": {
      "type": "integer"
    },
    "log_text": {
      "type": "string"
    },
    "log_text_start_line": {
      "type": "integer"
    },
    "log_text_line_count": {
      "type": "integer"
    },
    "total_log_lines_available": {
      "type": "integer"
    }
  },
  "required": [
    "job_id",
    "attempt_number",
    "log_text",
    "log_text_start_line",
    "log_text_line_count",
    "total_log_lines_available"
  ],
  "type": "object"
}

get_cloud_sync_status

Hints: read-only · idempotent · open-world

Get the status of a sync job from the Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
job_id integer | null no null Optional job ID. If not provided, the latest job will be used.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
include_attempts boolean no false Whether to include detailed attempts information.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "job_id": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional job ID. If not provided, the latest job will be used."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "include_attempts": {
      "default": false,
      "description": "Whether to include detailed attempts information.",
      "type": "boolean"
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_connection_artifact

Hints: read-only · idempotent · open-world

Get a connection artifact (state or catalog) from Airbyte Cloud.

Retrieves the specified artifact for a connection:
- 'state': Returns the full raw connection state including stateType and all
  state data, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
  or {"ERROR": "..."} if not found.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
artifact_type enum("state", "catalog") yes The type of artifact to retrieve: 'state' or 'catalog'.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "artifact_type": {
      "description": "The type of artifact to retrieve: 'state' or 'catalog'.",
      "enum": [
        "state",
        "catalog"
      ],
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "artifact_type"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_connector_builder_draft_manifest

Hints: read-only · idempotent · open-world

Get the Connector Builder draft manifest for a custom source definition.

Returns the working draft manifest that has been saved in the Connector Builder UI but not yet published. This is useful for inspecting what a user is currently working on before they publish their changes.

If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. The published manifest is always included for comparison.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the custom source definition to retrieve the draft for.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the custom source definition to retrieve the draft for.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

get_custom_source_definition

Hints: read-only · idempotent · open-world

Get a custom YAML source definition from Airbyte Cloud, including its manifest.

Returns the full definition details including the published manifest YAML content. Optionally includes the Connector Builder draft manifest (unpublished changes) when include_draft=True.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the custom source definition to retrieve.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
include_draft boolean no false Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the custom source definition to retrieve.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "include_draft": {
      "default": false,
      "description": "Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.",
      "type": "boolean"
    }
  },
  "required": [
    "definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "additionalProperties": true,
  "type": "object"
}

list_cloud_sync_jobs

Hints: read-only · idempotent · open-world

List sync jobs for a connection with pagination support.

This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
max_jobs integer no 20 Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.
from_tail boolean | null no null When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if jobs_offset is not specified. Cannot combine from_tail=True with jobs_offset.
jobs_offset integer | null no null Number of jobs to skip from the beginning. Cannot be combined with from_tail=True.
job_type enum("sync", "reset", "refresh", "clear") | null no null Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "max_jobs": {
      "default": 20,
      "description": "Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.",
      "type": "integer"
    },
    "from_tail": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if `jobs_offset` is not specified. Cannot combine `from_tail=True` with `jobs_offset`."
    },
    "jobs_offset": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Number of jobs to skip from the beginning. Cannot be combined with `from_tail=True`."
    },
    "job_type": {
      "anyOf": [
        {
          "description": "Enum that describes the different types of jobs that the platform runs.",
          "enum": [
            "sync",
            "reset",
            "refresh",
            "clear"
          ],
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "description": "Result of listing sync jobs with pagination support.",
  "properties": {
    "jobs": {
      "items": {
        "description": "Information about a sync job.",
        "properties": {
          "job_id": {
            "type": "integer"
          },
          "status": {
            "type": "string"
          },
          "bytes_synced": {
            "type": "integer"
          },
          "records_synced": {
            "type": "integer"
          },
          "start_time": {
            "type": "string"
          },
          "job_url": {
            "type": "string"
          }
        },
        "required": [
          "job_id",
          "status",
          "bytes_synced",
          "records_synced",
          "start_time",
          "job_url"
        ],
        "type": "object"
      },
      "type": "array"
    },
    "jobs_count": {
      "type": "integer"
    },
    "jobs_offset": {
      "type": "integer"
    },
    "from_tail": {
      "type": "boolean"
    }
  },
  "required": [
    "jobs",
    "jobs_count",
    "jobs_offset",
    "from_tail"
  ],
  "type": "object"
}

list_cloud_workspaces

Hints: read-only · idempotent · open-world

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
organization_id string | null no null Organization ID. Required if organization_name is not provided.
organization_name string | null no null Organization name (exact match). Required if organization_id is not provided.
name_contains string | null no null Optional substring to filter workspaces by name (server-side filtering)
max_items_limit integer | null no null Optional maximum number of items to return (default: no limit)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "organization_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization ID. Required if organization_name is not provided."
    },
    "organization_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Organization name (exact match). Required if organization_id is not provided."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional substring to filter workspaces by name (server-side filtering)"
    },
    "max_items_limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a workspace in Airbyte Cloud.",
        "properties": {
          "workspace_id": {
            "type": "string"
          },
          "workspace_name": {
            "type": "string"
          },
          "workspace_url": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "organization_id": {
            "type": "string"
          },
          "organization_name": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "payment_status": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "subscription_status": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "is_account_locked": {
            "default": false,
            "type": "boolean"
          }
        },
        "required": [
          "workspace_id",
          "workspace_name",
          "organization_id"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_custom_source_definitions

Hints: read-only · idempotent · open-world

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "additionalProperties": true,
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_deployed_cloud_connections

Hints: read-only · idempotent · open-world

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
name_contains string | null no null Optional case-insensitive substring to filter connections by name
max_items_limit integer | null no null Optional maximum number of items to return (default: no limit)
with_connection_status boolean | null no false If True, include status info for each connection's most recent sync job
failing_connections_only boolean | null no false If True, only return connections with failed/cancelled last sync

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional case-insensitive substring to filter connections by name"
    },
    "max_items_limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    },
    "with_connection_status": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": false,
      "description": "If True, include status info for each connection's most recent sync job"
    },
    "failing_connections_only": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": false,
      "description": "If True, only return connections with failed/cancelled last sync"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a deployed connection in Airbyte Cloud.",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "url": {
            "type": "string"
          },
          "source_id": {
            "type": "string"
          },
          "destination_id": {
            "type": "string"
          },
          "last_job_status": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "last_job_id": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "last_job_time": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "currently_running_job_id": {
            "anyOf": [
              {
                "type": "integer"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          },
          "currently_running_job_start_time": {
            "anyOf": [
              {
                "type": "string"
              },
              {
                "type": "null"
              }
            ],
            "default": null
          }
        },
        "required": [
          "id",
          "name",
          "url",
          "source_id",
          "destination_id"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_deployed_cloud_destination_connectors

Hints: read-only · idempotent · open-world

List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
name_contains string | null no null Optional case-insensitive substring to filter destinations by name
max_items_limit integer | null no null Optional maximum number of items to return (default: no limit)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional case-insensitive substring to filter destinations by name"
    },
    "max_items_limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a deployed destination connector in Airbyte Cloud.",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "url": {
            "type": "string"
          }
        },
        "required": [
          "id",
          "name",
          "url"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

list_deployed_cloud_source_connectors

Hints: read-only · idempotent · open-world

List all deployed source connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
name_contains string | null no null Optional case-insensitive substring to filter sources by name
max_items_limit integer | null no null Optional maximum number of items to return (default: no limit)

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "name_contains": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional case-insensitive substring to filter sources by name"
    },
    "max_items_limit": {
      "anyOf": [
        {
          "type": "integer"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional maximum number of items to return (default: no limit)"
    }
  },
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "items": {
        "description": "Information about a deployed source connector in Airbyte Cloud.",
        "properties": {
          "id": {
            "type": "string"
          },
          "name": {
            "type": "string"
          },
          "url": {
            "type": "string"
          }
        },
        "required": [
          "id",
          "name",
          "url"
        ],
        "type": "object"
      },
      "type": "array"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_cloud_connection

Hints: destructive · open-world

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.

The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to delete.
name string yes The expected name of the connection (for verification).
cascade_delete_source boolean no false Whether to also delete the source connector associated with this connection.
cascade_delete_destination boolean no false Whether to also delete the destination connector associated with this connection.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the connection (for verification).",
      "type": "string"
    },
    "cascade_delete_source": {
      "default": false,
      "description": "Whether to also delete the source connector associated with this connection.",
      "type": "boolean"
    },
    "cascade_delete_destination": {
      "default": false,
      "description": "Whether to also delete the destination connector associated with this connection.",
      "type": "boolean"
    }
  },
  "required": [
    "connection_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_cloud_destination

Hints: destructive · open-world

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.

The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the deployed destination to delete.
name string yes The expected name of the destination (for verification).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the deployed destination to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the destination (for verification).",
      "type": "string"
    }
  },
  "required": [
    "destination_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_cloud_source

Hints: destructive · open-world

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.

The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the deployed source to delete.
name string yes The expected name of the source (for verification).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the deployed source to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the source (for verification).",
      "type": "string"
    }
  },
  "required": [
    "source_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

permanently_delete_custom_source_definition

Hints: destructive · open-world

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the custom source definition to delete.
name string yes The expected name of the custom source definition (for verification).
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the custom source definition to delete.",
      "type": "string"
    },
    "name": {
      "description": "The expected name of the custom source definition (for verification).",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "definition_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

publish_custom_source_definition

Hints: open-world

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
name string yes The name for the custom connector definition.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
manifest_yaml string | string | null | null no null
unique boolean no true Whether to require a unique name.
pre_validate boolean no true Whether to validate the manifest client-side before publishing.
testing_values object | string | null no null Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations.
testing_values_secret_name string | null no null Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "name": {
      "description": "The name for the custom connector definition.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "manifest_yaml": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "path",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "unique": {
      "default": true,
      "description": "Whether to require a unique name.",
      "type": "boolean"
    },
    "pre_validate": {
      "default": true,
      "description": "Whether to validate the manifest client-side before publishing.",
      "type": "boolean"
    },
    "testing_values": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations."
    },
    "testing_values_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
    }
  },
  "required": [
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

rename_cloud_connection

Hints: open-world

Rename a connection on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to rename.
name string yes New name for the connection.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to rename.",
      "type": "string"
    },
    "name": {
      "description": "New name for the connection.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

rename_cloud_destination

Hints: open-world

Rename a deployed destination connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the deployed destination to rename.
name string yes New name for the destination.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the deployed destination to rename.",
      "type": "string"
    },
    "name": {
      "description": "New name for the destination.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "destination_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

rename_cloud_source

Hints: open-world

Rename a deployed source connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the deployed source to rename.
name string yes New name for the source.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the deployed source to rename.",
      "type": "string"
    },
    "name": {
      "description": "New name for the source.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "source_id",
    "name"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

run_cloud_sync

Hints: open-world

Run a sync job on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the Airbyte Cloud connection.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
wait boolean no false Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.
wait_timeout integer no 300 Maximum time to wait for sync completion (seconds).

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the Airbyte Cloud connection.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "wait": {
      "default": false,
      "description": "Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.",
      "type": "boolean"
    },
    "wait_timeout": {
      "default": 300,
      "description": "Maximum time to wait for sync completion (seconds).",
      "type": "integer"
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

set_cloud_connection_selected_streams

Hints: destructive · open-world

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to update.
stream_names string | array<string> yes The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to update.",
      "type": "string"
    },
    "stream_names": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "items": {
            "type": "string"
          },
          "type": "array"
        }
      ],
      "description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "stream_names"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

set_cloud_connection_table_prefix

Hints: destructive · open-world

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to update.
prefix string yes New table prefix to use when syncing to the destination.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to update.",
      "type": "string"
    },
    "prefix": {
      "description": "New table prefix to use when syncing to the destination.",
      "type": "string"
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id",
    "prefix"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_cloud_connection

Hints: destructive · open-world

Update a connection's settings on Airbyte Cloud.

This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)

At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
connection_id string yes The ID of the connection to update.
enabled boolean | null no null Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status.
cron_expression string | null no null A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'.
manual_schedule boolean | null no null Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "connection_id": {
      "description": "The ID of the connection to update.",
      "type": "string"
    },
    "enabled": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status."
    },
    "cron_expression": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'."
    },
    "manual_schedule": {
      "anyOf": [
        {
          "type": "boolean"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "connection_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_cloud_destination_config

Hints: destructive · open-world

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
destination_id string yes The ID of the deployed destination to update.
config object | string yes New configuration for the destination connector.
config_secret_name string | null no null The name of the secret containing the configuration.
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "destination_id": {
      "description": "The ID of the deployed destination to update.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        }
      ],
      "description": "New configuration for the destination connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "The name of the secret containing the configuration."
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "destination_id",
    "config"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_cloud_source_config

Hints: destructive · open-world

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

Parameters

Name Type Required Default Description
source_id string yes The ID of the deployed source to update.
config object | string yes New configuration for the source connector.
config_secret_name string | null | null no null
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "source_id": {
      "description": "The ID of the deployed source to update.",
      "type": "string"
    },
    "config": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        }
      ],
      "description": "New configuration for the source connector."
    },
    "config_secret_name": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "The name of the secret containing the configuration."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    }
  },
  "required": [
    "source_id",
    "config"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

update_custom_source_definition

Hints: destructive · open-world

Update a custom YAML source definition in Airbyte Cloud.

Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.

Parameters

Name Type Required Default Description
definition_id string yes The ID of the definition to update.
manifest_yaml string | string | null | null no null
workspace_id string | null no null Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var.
pre_validate boolean no true Whether to validate the manifest client-side before updating.
testing_values object | string | null no null Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist.
testing_values_secret_name string | null no null Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.

Show input JSON schema

{
  "additionalProperties": false,
  "properties": {
    "definition_id": {
      "description": "The ID of the definition to update.",
      "type": "string"
    },
    "manifest_yaml": {
      "anyOf": [
        {
          "anyOf": [
            {
              "type": "string"
            },
            {
              "format": "path",
              "type": "string"
            },
            {
              "type": "null"
            }
          ],
          "default": null,
          "description": "New manifest as YAML string or file path. Optional; omit to update only testing values."
        },
        {
          "type": "null"
        }
      ],
      "default": null
    },
    "workspace_id": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
    },
    "pre_validate": {
      "default": true,
      "description": "Whether to validate the manifest client-side before updating.",
      "type": "boolean"
    },
    "testing_values": {
      "anyOf": [
        {
          "additionalProperties": true,
          "type": "object"
        },
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist."
    },
    "testing_values_secret_name": {
      "anyOf": [
        {
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
    }
  },
  "required": [
    "definition_id"
  ],
  "type": "object"
}

Show output JSON schema

{
  "properties": {
    "result": {
      "type": "string"
    }
  },
  "required": [
    "result"
  ],
  "type": "object",
  "x-fastmcp-wrap-result": true
}

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations.
   3
   4.. include:: ../../docs/mcp-generated/cloud.md
   5"""
   6
   7# No public Python API — MCP primitives are registered via decorators and
   8# documented via the generated Markdown include above. Setting `__all__` to an
   9# empty list tells pdoc (and other doc tools) not to surface the individual
  10# tool / helper definitions as a redundant "API Documentation" list.
  11__all__: list[str] = []
  12
  13from pathlib import Path
  14from typing import Annotated, Any, Literal, cast
  15
  16from airbyte_api.models import JobTypeEnum
  17from fastmcp import Context, FastMCP
  18from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
  19from pydantic import BaseModel, Field
  20
  21from airbyte import cloud, get_destination, get_source
  22from airbyte._util import api_util
  23from airbyte.cloud.connectors import CustomCloudSourceDefinition
  24from airbyte.cloud.constants import FAILED_STATUSES
  25from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace
  26from airbyte.constants import (
  27    MCP_CONFIG_API_URL,
  28    MCP_CONFIG_BEARER_TOKEN,
  29    MCP_CONFIG_CLIENT_ID,
  30    MCP_CONFIG_CLIENT_SECRET,
  31    MCP_CONFIG_WORKSPACE_ID,
  32)
  33from airbyte.destinations.util import get_noop_destination
  34from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  35from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
  36from airbyte.mcp._tool_utils import (
  37    AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET,
  38    check_guid_created_in_session,
  39    register_guid_created_in_session,
  40)
  41from airbyte.secrets import SecretString
  42
  43
  44CLOUD_AUTH_TIP_TEXT = (
  45    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  46    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  47    "will be used to authenticate with the Airbyte Cloud API."
  48)
  49WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  50
  51
  52class CloudSourceResult(BaseModel):
  53    """Information about a deployed source connector in Airbyte Cloud."""
  54
  55    id: str
  56    """The source ID."""
  57    name: str
  58    """Display name of the source."""
  59    url: str
  60    """Web URL for managing this source in Airbyte Cloud."""
  61
  62
  63class CloudDestinationResult(BaseModel):
  64    """Information about a deployed destination connector in Airbyte Cloud."""
  65
  66    id: str
  67    """The destination ID."""
  68    name: str
  69    """Display name of the destination."""
  70    url: str
  71    """Web URL for managing this destination in Airbyte Cloud."""
  72
  73
  74class CloudConnectionResult(BaseModel):
  75    """Information about a deployed connection in Airbyte Cloud."""
  76
  77    id: str
  78    """The connection ID."""
  79    name: str
  80    """Display name of the connection."""
  81    url: str
  82    """Web URL for managing this connection in Airbyte Cloud."""
  83    source_id: str
  84    """ID of the source used by this connection."""
  85    destination_id: str
  86    """ID of the destination used by this connection."""
  87    last_job_status: str | None = None
  88    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  89    Only populated when with_connection_status=True."""
  90    last_job_id: int | None = None
  91    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  92    last_job_time: str | None = None
  93    """ISO 8601 timestamp of the most recent completed sync.
  94    Only populated when with_connection_status=True."""
  95    currently_running_job_id: int | None = None
  96    """Job ID of a currently running sync, if any.
  97    Only populated when with_connection_status=True."""
  98    currently_running_job_start_time: str | None = None
  99    """ISO 8601 timestamp of when the currently running sync started.
 100    Only populated when with_connection_status=True."""
 101
 102
 103class CloudSourceDetails(BaseModel):
 104    """Detailed information about a deployed source connector in Airbyte Cloud."""
 105
 106    source_id: str
 107    """The source ID."""
 108    source_name: str
 109    """Display name of the source."""
 110    source_url: str
 111    """Web URL for managing this source in Airbyte Cloud."""
 112    connector_definition_id: str
 113    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 114
 115
 116class CloudDestinationDetails(BaseModel):
 117    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 118
 119    destination_id: str
 120    """The destination ID."""
 121    destination_name: str
 122    """Display name of the destination."""
 123    destination_url: str
 124    """Web URL for managing this destination in Airbyte Cloud."""
 125    connector_definition_id: str
 126    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 127
 128
 129class CloudConnectionDetails(BaseModel):
 130    """Detailed information about a deployed connection in Airbyte Cloud."""
 131
 132    connection_id: str
 133    """The connection ID."""
 134    connection_name: str
 135    """Display name of the connection."""
 136    connection_url: str
 137    """Web URL for managing this connection in Airbyte Cloud."""
 138    source_id: str
 139    """ID of the source used by this connection."""
 140    source_name: str
 141    """Display name of the source."""
 142    destination_id: str
 143    """ID of the destination used by this connection."""
 144    destination_name: str
 145    """Display name of the destination."""
 146    selected_streams: list[str]
 147    """List of stream names selected for syncing."""
 148    table_prefix: str | None
 149    """Table prefix applied when syncing to the destination."""
 150
 151
 152class CloudOrganizationResult(BaseModel):
 153    """Information about an organization in Airbyte Cloud."""
 154
 155    id: str
 156    """The organization ID."""
 157    name: str
 158    """Display name of the organization."""
 159    email: str
 160    """Email associated with the organization."""
 161    payment_status: str | None = None
 162    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 163    When 'disabled', syncs are blocked due to unpaid invoices."""
 164    subscription_status: str | None = None
 165    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 166    'unsubscribed')."""
 167    is_account_locked: bool = False
 168    """Whether the account is locked due to billing issues.
 169    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 170    Defaults to False unless we have affirmative evidence of a locked state."""
 171
 172
 173class CloudWorkspaceResult(BaseModel):
 174    """Information about a workspace in Airbyte Cloud."""
 175
 176    workspace_id: str
 177    """The workspace ID."""
 178    workspace_name: str
 179    """Display name of the workspace."""
 180    workspace_url: str | None = None
 181    """URL to access the workspace in Airbyte Cloud."""
 182    organization_id: str
 183    """ID of the organization (requires ORGANIZATION_READER permission)."""
 184    organization_name: str | None = None
 185    """Name of the organization (requires ORGANIZATION_READER permission)."""
 186    payment_status: str | None = None
 187    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 188    When 'disabled', syncs are blocked due to unpaid invoices.
 189    Requires ORGANIZATION_READER permission."""
 190    subscription_status: str | None = None
 191    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 192    'unsubscribed'). Requires ORGANIZATION_READER permission."""
 193    is_account_locked: bool = False
 194    """Whether the account is locked due to billing issues.
 195    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 196    Defaults to False unless we have affirmative evidence of a locked state.
 197    Requires ORGANIZATION_READER permission."""
 198
 199
 200class LogReadResult(BaseModel):
 201    """Result of reading sync logs with pagination support."""
 202
 203    job_id: int
 204    """The job ID the logs belong to."""
 205    attempt_number: int
 206    """The attempt number the logs belong to."""
 207    log_text: str
 208    """The string containing the log text we are returning."""
 209    log_text_start_line: int
 210    """1-based line index of the first line returned."""
 211    log_text_line_count: int
 212    """Count of lines we are returning."""
 213    total_log_lines_available: int
 214    """Total number of log lines available, shows if any lines were missed due to the limit."""
 215
 216
 217class SyncJobResult(BaseModel):
 218    """Information about a sync job."""
 219
 220    job_id: int
 221    """The job ID."""
 222    status: str
 223    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
 224    bytes_synced: int
 225    """Number of bytes synced in this job."""
 226    records_synced: int
 227    """Number of records synced in this job."""
 228    start_time: str
 229    """ISO 8601 timestamp of when the job started."""
 230    job_url: str
 231    """URL to view the job in Airbyte Cloud."""
 232
 233
 234class SyncJobListResult(BaseModel):
 235    """Result of listing sync jobs with pagination support."""
 236
 237    jobs: list[SyncJobResult]
 238    """List of sync jobs."""
 239    jobs_count: int
 240    """Number of jobs returned in this response."""
 241    jobs_offset: int
 242    """Offset used for this request (0 if not specified)."""
 243    from_tail: bool
 244    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
 245
 246
 247def _get_cloud_workspace(
 248    ctx: Context,
 249    workspace_id: str | None = None,
 250) -> CloudWorkspace:
 251    """Get an authenticated CloudWorkspace.
 252
 253    Resolves credentials from multiple sources via MCP config args in order:
 254    1. HTTP headers (when running as MCP server with HTTP/SSE transport)
 255    2. Environment variables
 256
 257    The ctx parameter provides access to MCP config values that are resolved
 258    from HTTP headers or environment variables based on the config args
 259    defined in server.py.
 260    """
 261    resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID)
 262    if not resolved_workspace_id:
 263        raise PyAirbyteInputError(
 264            message="Workspace ID is required but not provided.",
 265            guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.",
 266        )
 267
 268    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
 269    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
 270    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
 271    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
 272
 273    return CloudWorkspace(
 274        workspace_id=resolved_workspace_id,
 275        client_id=SecretString(client_id) if client_id else None,
 276        client_secret=SecretString(client_secret) if client_secret else None,
 277        bearer_token=SecretString(bearer_token) if bearer_token else None,
 278        api_root=api_url,
 279    )
 280
 281
 282@mcp_tool(
 283    open_world=True,
 284    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 285)
 286def deploy_source_to_cloud(
 287    ctx: Context,
 288    source_name: Annotated[
 289        str,
 290        Field(description="The name to use when deploying the source."),
 291    ],
 292    source_connector_name: Annotated[
 293        str,
 294        Field(description="The name of the source connector (e.g., 'source-faker')."),
 295    ],
 296    *,
 297    workspace_id: Annotated[
 298        str | None,
 299        Field(
 300            description=WORKSPACE_ID_TIP_TEXT,
 301            default=None,
 302        ),
 303    ],
 304    config: Annotated[
 305        dict | str | None,
 306        Field(
 307            description="The configuration for the source connector.",
 308            default=None,
 309        ),
 310    ],
 311    config_secret_name: Annotated[
 312        str | None,
 313        Field(
 314            description="The name of the secret containing the configuration.",
 315            default=None,
 316        ),
 317    ],
 318    unique: Annotated[
 319        bool,
 320        Field(
 321            description="Whether to require a unique name.",
 322            default=True,
 323        ),
 324    ],
 325) -> str:
 326    """Deploy a source connector to Airbyte Cloud."""
 327    source = get_source(
 328        source_connector_name,
 329        no_executor=True,
 330    )
 331    config_dict = resolve_connector_config(
 332        config=config,
 333        config_secret_name=config_secret_name,
 334        config_spec_jsonschema=source.config_spec,
 335    )
 336    source.set_config(config_dict, validate=True)
 337
 338    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 339    deployed_source = workspace.deploy_source(
 340        name=source_name,
 341        source=source,
 342        unique=unique,
 343    )
 344
 345    register_guid_created_in_session(deployed_source.connector_id)
 346    return (
 347        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 348        f" and URL: {deployed_source.connector_url}"
 349    )
 350
 351
 352@mcp_tool(
 353    open_world=True,
 354    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 355)
 356def deploy_destination_to_cloud(
 357    ctx: Context,
 358    destination_name: Annotated[
 359        str,
 360        Field(description="The name to use when deploying the destination."),
 361    ],
 362    destination_connector_name: Annotated[
 363        str,
 364        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 365    ],
 366    *,
 367    workspace_id: Annotated[
 368        str | None,
 369        Field(
 370            description=WORKSPACE_ID_TIP_TEXT,
 371            default=None,
 372        ),
 373    ],
 374    config: Annotated[
 375        dict | str | None,
 376        Field(
 377            description="The configuration for the destination connector.",
 378            default=None,
 379        ),
 380    ],
 381    config_secret_name: Annotated[
 382        str | None,
 383        Field(
 384            description="The name of the secret containing the configuration.",
 385            default=None,
 386        ),
 387    ],
 388    unique: Annotated[
 389        bool,
 390        Field(
 391            description="Whether to require a unique name.",
 392            default=True,
 393        ),
 394    ],
 395) -> str:
 396    """Deploy a destination connector to Airbyte Cloud."""
 397    destination = get_destination(
 398        destination_connector_name,
 399        no_executor=True,
 400    )
 401    config_dict = resolve_connector_config(
 402        config=config,
 403        config_secret_name=config_secret_name,
 404        config_spec_jsonschema=destination.config_spec,
 405    )
 406    destination.set_config(config_dict, validate=True)
 407
 408    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 409    deployed_destination = workspace.deploy_destination(
 410        name=destination_name,
 411        destination=destination,
 412        unique=unique,
 413    )
 414
 415    register_guid_created_in_session(deployed_destination.connector_id)
 416    return (
 417        f"Successfully deployed destination '{destination_name}' "
 418        f"with ID: {deployed_destination.connector_id}"
 419    )
 420
 421
 422@mcp_tool(
 423    open_world=True,
 424    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 425)
 426def create_connection_on_cloud(
 427    ctx: Context,
 428    connection_name: Annotated[
 429        str,
 430        Field(description="The name of the connection."),
 431    ],
 432    source_id: Annotated[
 433        str,
 434        Field(description="The ID of the deployed source."),
 435    ],
 436    destination_id: Annotated[
 437        str,
 438        Field(description="The ID of the deployed destination."),
 439    ],
 440    selected_streams: Annotated[
 441        str | list[str],
 442        Field(
 443            description=(
 444                "The selected stream names to sync within the connection. "
 445                "Must be an explicit stream name or list of streams. "
 446                "Cannot be empty or '*'."
 447            )
 448        ),
 449    ],
 450    *,
 451    workspace_id: Annotated[
 452        str | None,
 453        Field(
 454            description=WORKSPACE_ID_TIP_TEXT,
 455            default=None,
 456        ),
 457    ],
 458    table_prefix: Annotated[
 459        str | None,
 460        Field(
 461            description="Optional table prefix to use when syncing to the destination.",
 462            default=None,
 463        ),
 464    ],
 465) -> str:
 466    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 467    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 468    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 469    deployed_connection = workspace.deploy_connection(
 470        connection_name=connection_name,
 471        source=source_id,
 472        destination=destination_id,
 473        selected_streams=resolved_streams_list,
 474        table_prefix=table_prefix,
 475    )
 476
 477    register_guid_created_in_session(deployed_connection.connection_id)
 478    return (
 479        f"Successfully created connection '{connection_name}' "
 480        f"with ID '{deployed_connection.connection_id}' and "
 481        f"URL: {deployed_connection.connection_url}"
 482    )
 483
 484
 485@mcp_tool(
 486    open_world=True,
 487    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 488)
 489def run_cloud_sync(
 490    ctx: Context,
 491    connection_id: Annotated[
 492        str,
 493        Field(description="The ID of the Airbyte Cloud connection."),
 494    ],
 495    *,
 496    workspace_id: Annotated[
 497        str | None,
 498        Field(
 499            description=WORKSPACE_ID_TIP_TEXT,
 500            default=None,
 501        ),
 502    ],
 503    wait: Annotated[
 504        bool,
 505        Field(
 506            description=(
 507                "Whether to wait for the sync to complete. Since a sync can take between several "
 508                "minutes and several hours, this option is not recommended for most "
 509                "scenarios."
 510            ),
 511            default=False,
 512        ),
 513    ],
 514    wait_timeout: Annotated[
 515        int,
 516        Field(
 517            description="Maximum time to wait for sync completion (seconds).",
 518            default=300,
 519        ),
 520    ],
 521) -> str:
 522    """Run a sync job on Airbyte Cloud."""
 523    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 524    connection = workspace.get_connection(connection_id=connection_id)
 525    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 526
 527    if wait:
 528        status = sync_result.get_job_status()
 529        return (
 530            f"Sync completed with status: {status}. "
 531            f"Job ID is '{sync_result.job_id}' and "
 532            f"job URL is: {sync_result.job_url}"
 533        )
 534    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 535
 536
 537@mcp_tool(
 538    read_only=True,
 539    idempotent=True,
 540    open_world=True,
 541    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 542)
 543def check_airbyte_cloud_workspace(
 544    ctx: Context,
 545    *,
 546    workspace_id: Annotated[
 547        str | None,
 548        Field(
 549            description=WORKSPACE_ID_TIP_TEXT,
 550            default=None,
 551        ),
 552    ],
 553) -> CloudWorkspaceResult:
 554    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 555
 556    Returns workspace details including workspace ID, name, organization info, and billing status.
 557    """
 558    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 559
 560    # Get workspace details from the public API using workspace's credentials
 561    workspace_response = api_util.get_workspace(
 562        workspace_id=workspace.workspace_id,
 563        api_root=workspace.api_root,
 564        client_id=workspace.client_id,
 565        client_secret=workspace.client_secret,
 566        bearer_token=workspace.bearer_token,
 567    )
 568
 569    # Try to get organization info (including billing), but fail gracefully if we don't have
 570    # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the
 571    # organization, which may not be available with workspace-scoped credentials.
 572    organization = workspace.get_organization(raise_on_error=False)
 573
 574    return CloudWorkspaceResult(
 575        workspace_id=workspace_response.workspace_id,
 576        workspace_name=workspace_response.name,
 577        workspace_url=workspace.workspace_url,
 578        organization_id=(
 579            organization.organization_id
 580            if organization
 581            else "[unavailable - requires ORGANIZATION_READER permission]"
 582        ),
 583        organization_name=organization.organization_name if organization else None,
 584        payment_status=organization.payment_status if organization else None,
 585        subscription_status=organization.subscription_status if organization else None,
 586        is_account_locked=organization.is_account_locked if organization else False,
 587    )
 588
 589
 590@mcp_tool(
 591    open_world=True,
 592    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 593)
 594def deploy_noop_destination_to_cloud(
 595    ctx: Context,
 596    name: str = "No-op Destination",
 597    *,
 598    workspace_id: Annotated[
 599        str | None,
 600        Field(
 601            description=WORKSPACE_ID_TIP_TEXT,
 602            default=None,
 603        ),
 604    ],
 605    unique: bool = True,
 606) -> str:
 607    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 608    destination = get_noop_destination()
 609    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 610    deployed_destination = workspace.deploy_destination(
 611        name=name,
 612        destination=destination,
 613        unique=unique,
 614    )
 615    register_guid_created_in_session(deployed_destination.connector_id)
 616    return (
 617        f"Successfully deployed No-op Destination "
 618        f"with ID '{deployed_destination.connector_id}' and "
 619        f"URL: {deployed_destination.connector_url}"
 620    )
 621
 622
 623@mcp_tool(
 624    read_only=True,
 625    idempotent=True,
 626    open_world=True,
 627    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 628)
 629def get_cloud_sync_status(
 630    ctx: Context,
 631    connection_id: Annotated[
 632        str,
 633        Field(
 634            description="The ID of the Airbyte Cloud connection.",
 635        ),
 636    ],
 637    job_id: Annotated[
 638        int | None,
 639        Field(
 640            description="Optional job ID. If not provided, the latest job will be used.",
 641            default=None,
 642        ),
 643    ],
 644    *,
 645    workspace_id: Annotated[
 646        str | None,
 647        Field(
 648            description=WORKSPACE_ID_TIP_TEXT,
 649            default=None,
 650        ),
 651    ],
 652    include_attempts: Annotated[
 653        bool,
 654        Field(
 655            description="Whether to include detailed attempts information.",
 656            default=False,
 657        ),
 658    ],
 659) -> dict[str, Any]:
 660    """Get the status of a sync job from the Airbyte Cloud."""
 661    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 662    connection = workspace.get_connection(connection_id=connection_id)
 663
 664    # If a job ID is provided, get the job by ID.
 665    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 666
 667    if not sync_result:
 668        return {"status": None, "job_id": None, "attempts": []}
 669
 670    result = {
 671        "status": sync_result.get_job_status(),
 672        "job_id": sync_result.job_id,
 673        "bytes_synced": sync_result.bytes_synced,
 674        "records_synced": sync_result.records_synced,
 675        "start_time": sync_result.start_time.isoformat(),
 676        "job_url": sync_result.job_url,
 677        "attempts": [],
 678    }
 679
 680    if include_attempts:
 681        attempts = sync_result.get_attempts()
 682        result["attempts"] = [
 683            {
 684                "attempt_number": attempt.attempt_number,
 685                "attempt_id": attempt.attempt_id,
 686                "status": attempt.status,
 687                "bytes_synced": attempt.bytes_synced,
 688                "records_synced": attempt.records_synced,
 689                "created_at": attempt.created_at.isoformat(),
 690            }
 691            for attempt in attempts
 692        ]
 693
 694    return result
 695
 696
 697@mcp_tool(
 698    read_only=True,
 699    idempotent=True,
 700    open_world=True,
 701    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 702)
 703def list_cloud_sync_jobs(
 704    ctx: Context,
 705    connection_id: Annotated[
 706        str,
 707        Field(description="The ID of the Airbyte Cloud connection."),
 708    ],
 709    *,
 710    workspace_id: Annotated[
 711        str | None,
 712        Field(
 713            description=WORKSPACE_ID_TIP_TEXT,
 714            default=None,
 715        ),
 716    ],
 717    max_jobs: Annotated[
 718        int,
 719        Field(
 720            description=(
 721                "Maximum number of jobs to return. "
 722                "Defaults to 20 if not specified. "
 723                "Maximum allowed value is 500."
 724            ),
 725            default=20,
 726        ),
 727    ],
 728    from_tail: Annotated[
 729        bool | None,
 730        Field(
 731            description=(
 732                "When True, jobs are ordered newest-first (createdAt DESC). "
 733                "When False, jobs are ordered oldest-first (createdAt ASC). "
 734                "Defaults to True if `jobs_offset` is not specified. "
 735                "Cannot combine `from_tail=True` with `jobs_offset`."
 736            ),
 737            default=None,
 738        ),
 739    ],
 740    jobs_offset: Annotated[
 741        int | None,
 742        Field(
 743            description=(
 744                "Number of jobs to skip from the beginning. "
 745                "Cannot be combined with `from_tail=True`."
 746            ),
 747            default=None,
 748        ),
 749    ],
 750    job_type: Annotated[
 751        JobTypeEnum | None,
 752        Field(
 753            description=(
 754                "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. "
 755                "If not specified, defaults to sync and reset jobs only (API default). "
 756                "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
 757            ),
 758            default=None,
 759        ),
 760    ],
 761) -> SyncJobListResult:
 762    """List sync jobs for a connection with pagination support.
 763
 764    This tool allows you to retrieve a list of sync jobs for a connection,
 765    with control over ordering and pagination. By default, jobs are returned
 766    newest-first (from_tail=True).
 767    """
 768    # Validate that jobs_offset and from_tail are not both set
 769    if jobs_offset is not None and from_tail is True:
 770        raise PyAirbyteInputError(
 771            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
 772            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
 773        )
 774
 775    # Default to from_tail=True if neither is specified
 776    if from_tail is None and jobs_offset is None:
 777        from_tail = True
 778    elif from_tail is None:
 779        from_tail = False
 780
 781    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 782    connection = workspace.get_connection(connection_id=connection_id)
 783
 784    # Cap at 500 to avoid overloading agent context
 785    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
 786
 787    sync_results = connection.get_previous_sync_logs(
 788        limit=effective_limit,
 789        offset=jobs_offset,
 790        from_tail=from_tail,
 791        job_type=job_type,
 792    )
 793
 794    jobs = [
 795        SyncJobResult(
 796            job_id=sync_result.job_id,
 797            status=str(sync_result.get_job_status()),
 798            bytes_synced=sync_result.bytes_synced,
 799            records_synced=sync_result.records_synced,
 800            start_time=sync_result.start_time.isoformat(),
 801            job_url=sync_result.job_url,
 802        )
 803        for sync_result in sync_results
 804    ]
 805
 806    return SyncJobListResult(
 807        jobs=jobs,
 808        jobs_count=len(jobs),
 809        jobs_offset=jobs_offset or 0,
 810        from_tail=from_tail,
 811    )
 812
 813
 814@mcp_tool(
 815    read_only=True,
 816    idempotent=True,
 817    open_world=True,
 818    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 819)
 820def list_deployed_cloud_source_connectors(
 821    ctx: Context,
 822    *,
 823    workspace_id: Annotated[
 824        str | None,
 825        Field(
 826            description=WORKSPACE_ID_TIP_TEXT,
 827            default=None,
 828        ),
 829    ],
 830    name_contains: Annotated[
 831        str | None,
 832        Field(
 833            description="Optional case-insensitive substring to filter sources by name",
 834            default=None,
 835        ),
 836    ],
 837    max_items_limit: Annotated[
 838        int | None,
 839        Field(
 840            description="Optional maximum number of items to return (default: no limit)",
 841            default=None,
 842        ),
 843    ],
 844) -> list[CloudSourceResult]:
 845    """List all deployed source connectors in the Airbyte Cloud workspace."""
 846    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 847    sources = workspace.list_sources()
 848
 849    # Filter by name if requested
 850    if name_contains:
 851        needle = name_contains.lower()
 852        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 853
 854    # Apply limit if requested
 855    if max_items_limit is not None:
 856        sources = sources[:max_items_limit]
 857
 858    # Note: name and url are guaranteed non-null from list API responses
 859    return [
 860        CloudSourceResult(
 861            id=source.source_id,
 862            name=cast(str, source.name),
 863            url=cast(str, source.connector_url),
 864        )
 865        for source in sources
 866    ]
 867
 868
 869@mcp_tool(
 870    read_only=True,
 871    idempotent=True,
 872    open_world=True,
 873    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 874)
 875def list_deployed_cloud_destination_connectors(
 876    ctx: Context,
 877    *,
 878    workspace_id: Annotated[
 879        str | None,
 880        Field(
 881            description=WORKSPACE_ID_TIP_TEXT,
 882            default=None,
 883        ),
 884    ],
 885    name_contains: Annotated[
 886        str | None,
 887        Field(
 888            description="Optional case-insensitive substring to filter destinations by name",
 889            default=None,
 890        ),
 891    ],
 892    max_items_limit: Annotated[
 893        int | None,
 894        Field(
 895            description="Optional maximum number of items to return (default: no limit)",
 896            default=None,
 897        ),
 898    ],
 899) -> list[CloudDestinationResult]:
 900    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 901    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 902    destinations = workspace.list_destinations()
 903
 904    # Filter by name if requested
 905    if name_contains:
 906        needle = name_contains.lower()
 907        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 908
 909    # Apply limit if requested
 910    if max_items_limit is not None:
 911        destinations = destinations[:max_items_limit]
 912
 913    # Note: name and url are guaranteed non-null from list API responses
 914    return [
 915        CloudDestinationResult(
 916            id=destination.destination_id,
 917            name=cast(str, destination.name),
 918            url=cast(str, destination.connector_url),
 919        )
 920        for destination in destinations
 921    ]
 922
 923
 924@mcp_tool(
 925    read_only=True,
 926    idempotent=True,
 927    open_world=True,
 928    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 929)
 930def describe_cloud_source(
 931    ctx: Context,
 932    source_id: Annotated[
 933        str,
 934        Field(description="The ID of the source to describe."),
 935    ],
 936    *,
 937    workspace_id: Annotated[
 938        str | None,
 939        Field(
 940            description=WORKSPACE_ID_TIP_TEXT,
 941            default=None,
 942        ),
 943    ],
 944) -> CloudSourceDetails:
 945    """Get detailed information about a specific deployed source connector."""
 946    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 947    source = workspace.get_source(source_id=source_id)
 948
 949    # Access name property to ensure _connector_info is populated
 950    source_name = cast(str, source.name)
 951
 952    return CloudSourceDetails(
 953        source_id=source.source_id,
 954        source_name=source_name,
 955        source_url=source.connector_url,
 956        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 957    )
 958
 959
 960@mcp_tool(
 961    read_only=True,
 962    idempotent=True,
 963    open_world=True,
 964    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 965)
 966def describe_cloud_destination(
 967    ctx: Context,
 968    destination_id: Annotated[
 969        str,
 970        Field(description="The ID of the destination to describe."),
 971    ],
 972    *,
 973    workspace_id: Annotated[
 974        str | None,
 975        Field(
 976            description=WORKSPACE_ID_TIP_TEXT,
 977            default=None,
 978        ),
 979    ],
 980) -> CloudDestinationDetails:
 981    """Get detailed information about a specific deployed destination connector."""
 982    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 983    destination = workspace.get_destination(destination_id=destination_id)
 984
 985    # Access name property to ensure _connector_info is populated
 986    destination_name = cast(str, destination.name)
 987
 988    return CloudDestinationDetails(
 989        destination_id=destination.destination_id,
 990        destination_name=destination_name,
 991        destination_url=destination.connector_url,
 992        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 993    )
 994
 995
 996@mcp_tool(
 997    read_only=True,
 998    idempotent=True,
 999    open_world=True,
1000    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1001)
1002def describe_cloud_connection(
1003    ctx: Context,
1004    connection_id: Annotated[
1005        str,
1006        Field(description="The ID of the connection to describe."),
1007    ],
1008    *,
1009    workspace_id: Annotated[
1010        str | None,
1011        Field(
1012            description=WORKSPACE_ID_TIP_TEXT,
1013            default=None,
1014        ),
1015    ],
1016) -> CloudConnectionDetails:
1017    """Get detailed information about a specific deployed connection."""
1018    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1019    connection = workspace.get_connection(connection_id=connection_id)
1020
1021    return CloudConnectionDetails(
1022        connection_id=connection.connection_id,
1023        connection_name=cast(str, connection.name),
1024        connection_url=cast(str, connection.connection_url),
1025        source_id=connection.source_id,
1026        source_name=cast(str, connection.source.name),
1027        destination_id=connection.destination_id,
1028        destination_name=cast(str, connection.destination.name),
1029        selected_streams=connection.stream_names,
1030        table_prefix=connection.table_prefix,
1031    )
1032
1033
1034@mcp_tool(
1035    read_only=True,
1036    idempotent=True,
1037    open_world=True,
1038    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1039)
1040def get_cloud_sync_logs(
1041    ctx: Context,
1042    connection_id: Annotated[
1043        str,
1044        Field(description="The ID of the Airbyte Cloud connection."),
1045    ],
1046    job_id: Annotated[
1047        int | None,
1048        Field(description="Optional job ID. If not provided, the latest job will be used."),
1049    ] = None,
1050    attempt_number: Annotated[
1051        int | None,
1052        Field(
1053            description="Optional attempt number. If not provided, the latest attempt will be used."
1054        ),
1055    ] = None,
1056    *,
1057    workspace_id: Annotated[
1058        str | None,
1059        Field(
1060            description=WORKSPACE_ID_TIP_TEXT,
1061            default=None,
1062        ),
1063    ],
1064    max_lines: Annotated[
1065        int,
1066        Field(
1067            description=(
1068                "Maximum number of lines to return. "
1069                "Defaults to 4000 if not specified. "
1070                "If '0' is provided, no limit is applied."
1071            ),
1072            default=4000,
1073        ),
1074    ],
1075    from_tail: Annotated[
1076        bool | None,
1077        Field(
1078            description=(
1079                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1080                "Defaults to True if `line_offset` is not specified. "
1081                "Cannot combine `from_tail=True` with `line_offset`."
1082            ),
1083            default=None,
1084        ),
1085    ],
1086    line_offset: Annotated[
1087        int | None,
1088        Field(
1089            description=(
1090                "Number of lines to skip from the beginning of the logs. "
1091                "Cannot be combined with `from_tail=True`."
1092            ),
1093            default=None,
1094        ),
1095    ],
1096) -> LogReadResult:
1097    """Get the logs from a sync job attempt on Airbyte Cloud."""
1098    # Validate that line_offset and from_tail are not both set
1099    if line_offset is not None and from_tail:
1100        raise PyAirbyteInputError(
1101            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1102            context={"line_offset": line_offset, "from_tail": from_tail},
1103        )
1104
1105    if from_tail is None and line_offset is None:
1106        from_tail = True
1107    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1108    connection = workspace.get_connection(connection_id=connection_id)
1109
1110    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1111
1112    if not sync_result:
1113        raise AirbyteMissingResourceError(
1114            resource_type="sync job",
1115            resource_name_or_id=connection_id,
1116        )
1117
1118    attempts = sync_result.get_attempts()
1119
1120    if not attempts:
1121        raise AirbyteMissingResourceError(
1122            resource_type="sync attempt",
1123            resource_name_or_id=str(sync_result.job_id),
1124        )
1125
1126    if attempt_number is not None:
1127        target_attempt = None
1128        for attempt in attempts:
1129            if attempt.attempt_number == attempt_number:
1130                target_attempt = attempt
1131                break
1132
1133        if target_attempt is None:
1134            raise AirbyteMissingResourceError(
1135                resource_type="sync attempt",
1136                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1137            )
1138    else:
1139        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1140
1141    logs = target_attempt.get_full_log_text()
1142
1143    if not logs:
1144        # Return empty result with zero lines
1145        return LogReadResult(
1146            log_text=(
1147                f"[No logs available for job '{sync_result.job_id}', "
1148                f"attempt {target_attempt.attempt_number}.]"
1149            ),
1150            log_text_start_line=1,
1151            log_text_line_count=0,
1152            total_log_lines_available=0,
1153            job_id=sync_result.job_id,
1154            attempt_number=target_attempt.attempt_number,
1155        )
1156
1157    # Apply line limiting
1158    log_lines = logs.splitlines()
1159    total_lines = len(log_lines)
1160
1161    # Determine effective max_lines (0 means no limit)
1162    effective_max = total_lines if max_lines == 0 else max_lines
1163
1164    # Calculate start_index and slice based on from_tail or line_offset
1165    if from_tail:
1166        start_index = max(0, total_lines - effective_max)
1167        selected_lines = log_lines[start_index:][:effective_max]
1168    else:
1169        start_index = line_offset or 0
1170        selected_lines = log_lines[start_index : start_index + effective_max]
1171
1172    return LogReadResult(
1173        log_text="\n".join(selected_lines),
1174        log_text_start_line=start_index + 1,  # Convert to 1-based index
1175        log_text_line_count=len(selected_lines),
1176        total_log_lines_available=total_lines,
1177        job_id=sync_result.job_id,
1178        attempt_number=target_attempt.attempt_number,
1179    )
1180
1181
1182@mcp_tool(
1183    read_only=True,
1184    idempotent=True,
1185    open_world=True,
1186    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1187)
1188def list_deployed_cloud_connections(
1189    ctx: Context,
1190    *,
1191    workspace_id: Annotated[
1192        str | None,
1193        Field(
1194            description=WORKSPACE_ID_TIP_TEXT,
1195            default=None,
1196        ),
1197    ],
1198    name_contains: Annotated[
1199        str | None,
1200        Field(
1201            description="Optional case-insensitive substring to filter connections by name",
1202            default=None,
1203        ),
1204    ],
1205    max_items_limit: Annotated[
1206        int | None,
1207        Field(
1208            description="Optional maximum number of items to return (default: no limit)",
1209            default=None,
1210        ),
1211    ],
1212    with_connection_status: Annotated[
1213        bool | None,
1214        Field(
1215            description="If True, include status info for each connection's most recent sync job",
1216            default=False,
1217        ),
1218    ],
1219    failing_connections_only: Annotated[
1220        bool | None,
1221        Field(
1222            description="If True, only return connections with failed/cancelled last sync",
1223            default=False,
1224        ),
1225    ],
1226) -> list[CloudConnectionResult]:
1227    """List all deployed connections in the Airbyte Cloud workspace.
1228
1229    When with_connection_status is True, each connection result will include
1230    information about the most recent sync job status, skipping over any
1231    currently in-progress syncs to find the last completed job.
1232
1233    When failing_connections_only is True, only connections where the most
1234    recent completed sync job failed or was cancelled will be returned.
1235    This implicitly enables with_connection_status.
1236    """
1237    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1238    connections = workspace.list_connections()
1239
1240    # Filter by name if requested
1241    if name_contains:
1242        needle = name_contains.lower()
1243        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1244
1245    # If failing_connections_only is True, implicitly enable with_connection_status
1246    if failing_connections_only:
1247        with_connection_status = True
1248
1249    results: list[CloudConnectionResult] = []
1250
1251    for connection in connections:
1252        last_job_status: str | None = None
1253        last_job_id: int | None = None
1254        last_job_time: str | None = None
1255        currently_running_job_id: int | None = None
1256        currently_running_job_start_time: str | None = None
1257
1258        if with_connection_status:
1259            sync_logs = connection.get_previous_sync_logs(limit=5)
1260            last_completed_job_status = None  # Keep enum for comparison
1261
1262            for sync_result in sync_logs:
1263                job_status = sync_result.get_job_status()
1264
1265                if not sync_result.is_job_complete():
1266                    currently_running_job_id = sync_result.job_id
1267                    currently_running_job_start_time = sync_result.start_time.isoformat()
1268                    continue
1269
1270                last_completed_job_status = job_status
1271                last_job_status = str(job_status.value) if job_status else None
1272                last_job_id = sync_result.job_id
1273                last_job_time = sync_result.start_time.isoformat()
1274                break
1275
1276            if failing_connections_only and (
1277                last_completed_job_status is None
1278                or last_completed_job_status not in FAILED_STATUSES
1279            ):
1280                continue
1281
1282        results.append(
1283            CloudConnectionResult(
1284                id=connection.connection_id,
1285                name=cast(str, connection.name),
1286                url=cast(str, connection.connection_url),
1287                source_id=connection.source_id,
1288                destination_id=connection.destination_id,
1289                last_job_status=last_job_status,
1290                last_job_id=last_job_id,
1291                last_job_time=last_job_time,
1292                currently_running_job_id=currently_running_job_id,
1293                currently_running_job_start_time=currently_running_job_start_time,
1294            )
1295        )
1296
1297        if max_items_limit is not None and len(results) >= max_items_limit:
1298            break
1299
1300    return results
1301
1302
1303def _resolve_organization(
1304    organization_id: str | None,
1305    organization_name: str | None,
1306    *,
1307    api_root: str,
1308    client_id: SecretString | None,
1309    client_secret: SecretString | None,
1310    bearer_token: SecretString | None = None,
1311) -> CloudOrganization:
1312    """Resolve organization from either ID or exact name match.
1313
1314    Args:
1315        organization_id: The organization ID (if provided directly)
1316        organization_name: The organization name (exact match required)
1317        api_root: The API root URL
1318        client_id: OAuth client ID (optional if bearer_token is provided)
1319        client_secret: OAuth client secret (optional if bearer_token is provided)
1320        bearer_token: Bearer token for authentication (optional if client credentials provided)
1321
1322    Returns:
1323        A CloudOrganization object with credentials for lazy loading of billing info.
1324
1325    Raises:
1326        PyAirbyteInputError: If neither or both parameters are provided,
1327            or if no organization matches the exact name
1328        AirbyteMissingResourceError: If the organization is not found
1329    """
1330    if organization_id and organization_name:
1331        raise PyAirbyteInputError(
1332            message="Provide either 'organization_id' or 'organization_name', not both."
1333        )
1334    if not organization_id and not organization_name:
1335        raise PyAirbyteInputError(
1336            message="Either 'organization_id' or 'organization_name' must be provided."
1337        )
1338
1339    # Get all organizations for the user
1340    orgs = api_util.list_organizations_for_user(
1341        api_root=api_root,
1342        client_id=client_id,
1343        client_secret=client_secret,
1344        bearer_token=bearer_token,
1345    )
1346
1347    org_response: api_util.models.OrganizationResponse | None = None
1348
1349    if organization_id:
1350        # Find by ID
1351        matching_orgs = [org for org in orgs if org.organization_id == organization_id]
1352        if not matching_orgs:
1353            raise AirbyteMissingResourceError(
1354                resource_type="organization",
1355                context={
1356                    "organization_id": organization_id,
1357                    "message": f"No organization found with ID '{organization_id}' "
1358                    "for the current user.",
1359                },
1360            )
1361        org_response = matching_orgs[0]
1362    else:
1363        # Find by exact name match (case-sensitive)
1364        matching_orgs = [org for org in orgs if org.organization_name == organization_name]
1365
1366        if not matching_orgs:
1367            raise AirbyteMissingResourceError(
1368                resource_type="organization",
1369                context={
1370                    "organization_name": organization_name,
1371                    "message": f"No organization found with exact name '{organization_name}' "
1372                    "for the current user.",
1373                },
1374            )
1375
1376        if len(matching_orgs) > 1:
1377            raise PyAirbyteInputError(
1378                message=f"Multiple organizations found with name '{organization_name}'. "
1379                "Please use 'organization_id' instead to specify the exact organization."
1380            )
1381
1382        org_response = matching_orgs[0]
1383
1384    # Return a CloudOrganization with credentials for lazy loading of billing info
1385    return CloudOrganization(
1386        organization_id=org_response.organization_id,
1387        organization_name=org_response.organization_name,
1388        email=org_response.email,
1389        api_root=api_root,
1390        client_id=client_id,
1391        client_secret=client_secret,
1392        bearer_token=bearer_token,
1393    )
1394
1395
1396def _resolve_organization_id(
1397    organization_id: str | None,
1398    organization_name: str | None,
1399    *,
1400    api_root: str,
1401    client_id: SecretString | None,
1402    client_secret: SecretString | None,
1403    bearer_token: SecretString | None = None,
1404) -> str:
1405    """Resolve organization ID from either ID or exact name match.
1406
1407    This is a convenience wrapper around _resolve_organization that returns just the ID.
1408    """
1409    org = _resolve_organization(
1410        organization_id=organization_id,
1411        organization_name=organization_name,
1412        api_root=api_root,
1413        client_id=client_id,
1414        client_secret=client_secret,
1415        bearer_token=bearer_token,
1416    )
1417    return org.organization_id
1418
1419
1420@mcp_tool(
1421    read_only=True,
1422    idempotent=True,
1423    open_world=True,
1424    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1425)
1426def list_cloud_workspaces(
1427    ctx: Context,
1428    *,
1429    organization_id: Annotated[
1430        str | None,
1431        Field(
1432            description="Organization ID. Required if organization_name is not provided.",
1433            default=None,
1434        ),
1435    ],
1436    organization_name: Annotated[
1437        str | None,
1438        Field(
1439            description=(
1440                "Organization name (exact match). " "Required if organization_id is not provided."
1441            ),
1442            default=None,
1443        ),
1444    ],
1445    name_contains: Annotated[
1446        str | None,
1447        Field(
1448            description="Optional substring to filter workspaces by name (server-side filtering)",
1449            default=None,
1450        ),
1451    ],
1452    max_items_limit: Annotated[
1453        int | None,
1454        Field(
1455            description="Optional maximum number of items to return (default: no limit)",
1456            default=None,
1457        ),
1458    ],
1459) -> list[CloudWorkspaceResult]:
1460    """List all workspaces in a specific organization.
1461
1462    Requires either organization_id OR organization_name (exact match) to be provided.
1463    This tool will NOT list workspaces across all organizations - you must specify
1464    which organization to list workspaces from.
1465    """
1466    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1467    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1468    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1469    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1470
1471    resolved_org_id = _resolve_organization_id(
1472        organization_id=organization_id,
1473        organization_name=organization_name,
1474        api_root=api_url,
1475        client_id=SecretString(client_id) if client_id else None,
1476        client_secret=SecretString(client_secret) if client_secret else None,
1477        bearer_token=SecretString(bearer_token) if bearer_token else None,
1478    )
1479
1480    workspaces = api_util.list_workspaces_in_organization(
1481        organization_id=resolved_org_id,
1482        api_root=api_url,
1483        client_id=SecretString(client_id) if client_id else None,
1484        client_secret=SecretString(client_secret) if client_secret else None,
1485        bearer_token=SecretString(bearer_token) if bearer_token else None,
1486        name_contains=name_contains,
1487        max_items_limit=max_items_limit,
1488    )
1489
1490    return [
1491        CloudWorkspaceResult(
1492            workspace_id=ws.get("workspaceId", ""),
1493            workspace_name=ws.get("name", ""),
1494            organization_id=ws.get("organizationId", ""),
1495        )
1496        for ws in workspaces
1497    ]
1498
1499
1500@mcp_tool(
1501    read_only=True,
1502    idempotent=True,
1503    open_world=True,
1504    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1505)
1506def describe_cloud_organization(
1507    ctx: Context,
1508    *,
1509    organization_id: Annotated[
1510        str | None,
1511        Field(
1512            description="Organization ID. Required if organization_name is not provided.",
1513            default=None,
1514        ),
1515    ],
1516    organization_name: Annotated[
1517        str | None,
1518        Field(
1519            description=(
1520                "Organization name (exact match). " "Required if organization_id is not provided."
1521            ),
1522            default=None,
1523        ),
1524    ],
1525) -> CloudOrganizationResult:
1526    """Get details about a specific organization including billing status.
1527
1528    Requires either organization_id OR organization_name (exact match) to be provided.
1529    This tool is useful for looking up an organization's ID from its name, or vice versa.
1530    """
1531    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1532    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1533    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1534    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1535
1536    org = _resolve_organization(
1537        organization_id=organization_id,
1538        organization_name=organization_name,
1539        api_root=api_url,
1540        client_id=SecretString(client_id) if client_id else None,
1541        client_secret=SecretString(client_secret) if client_secret else None,
1542        bearer_token=SecretString(bearer_token) if bearer_token else None,
1543    )
1544
1545    # CloudOrganization has lazy loading of billing properties
1546    return CloudOrganizationResult(
1547        id=org.organization_id,
1548        name=org.organization_name,
1549        email=org.email,
1550        payment_status=org.payment_status,
1551        subscription_status=org.subscription_status,
1552        is_account_locked=org.is_account_locked,
1553    )
1554
1555
1556def _get_custom_source_definition_description(
1557    custom_source: CustomCloudSourceDefinition,
1558) -> str:
1559    return "\n".join(
1560        [
1561            f" - Custom Source Name: {custom_source.name}",
1562            f" - Definition ID: {custom_source.definition_id}",
1563            f" - Definition Version: {custom_source.version}",
1564            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1565            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1566        ]
1567    )
1568
1569
1570@mcp_tool(
1571    open_world=True,
1572    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1573)
1574def publish_custom_source_definition(
1575    ctx: Context,
1576    name: Annotated[
1577        str,
1578        Field(description="The name for the custom connector definition."),
1579    ],
1580    *,
1581    workspace_id: Annotated[
1582        str | None,
1583        Field(
1584            description=WORKSPACE_ID_TIP_TEXT,
1585            default=None,
1586        ),
1587    ],
1588    manifest_yaml: Annotated[
1589        str | Path | None,
1590        Field(
1591            description=(
1592                "The Low-code CDK manifest as a YAML string or file path. "
1593                "Required for YAML connectors."
1594            ),
1595            default=None,
1596        ),
1597    ] = None,
1598    unique: Annotated[
1599        bool,
1600        Field(
1601            description="Whether to require a unique name.",
1602            default=True,
1603        ),
1604    ] = True,
1605    pre_validate: Annotated[
1606        bool,
1607        Field(
1608            description="Whether to validate the manifest client-side before publishing.",
1609            default=True,
1610        ),
1611    ] = True,
1612    testing_values: Annotated[
1613        dict | str | None,
1614        Field(
1615            description=(
1616                "Optional testing configuration values for the Builder UI. "
1617                "Can be provided as a JSON object or JSON string. "
1618                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1619                "If provided, these values replace any existing testing values "
1620                "for the connector builder project, allowing immediate test read operations."
1621            ),
1622            default=None,
1623        ),
1624    ],
1625    testing_values_secret_name: Annotated[
1626        str | None,
1627        Field(
1628            description=(
1629                "Optional name of a secret containing testing configuration values "
1630                "in JSON or YAML format. The secret will be resolved by the MCP "
1631                "server and merged into testing_values, with secret values taking "
1632                "precedence. This lets the agent reference secrets without sending "
1633                "raw values as tool arguments."
1634            ),
1635            default=None,
1636        ),
1637    ],
1638) -> str:
1639    """Publish a custom YAML source connector definition to Airbyte Cloud.
1640
1641    Note: Only YAML (declarative) connectors are currently supported.
1642    Docker-based custom sources are not yet available.
1643    """
1644    processed_manifest = manifest_yaml
1645    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1646        processed_manifest = Path(manifest_yaml)
1647
1648    # Resolve testing values from inline config and/or secret
1649    testing_values_dict: dict[str, Any] | None = None
1650    if testing_values is not None or testing_values_secret_name is not None:
1651        testing_values_dict = (
1652            resolve_connector_config(
1653                config=testing_values,
1654                config_secret_name=testing_values_secret_name,
1655            )
1656            or None
1657        )
1658
1659    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1660    custom_source = workspace.publish_custom_source_definition(
1661        name=name,
1662        manifest_yaml=processed_manifest,
1663        unique=unique,
1664        pre_validate=pre_validate,
1665        testing_values=testing_values_dict,
1666    )
1667    register_guid_created_in_session(custom_source.definition_id)
1668    return (
1669        "Successfully published custom YAML source definition:\n"
1670        + _get_custom_source_definition_description(
1671            custom_source=custom_source,
1672        )
1673        + "\n"
1674    )
1675
1676
1677@mcp_tool(
1678    read_only=True,
1679    idempotent=True,
1680    open_world=True,
1681)
1682def list_custom_source_definitions(
1683    ctx: Context,
1684    *,
1685    workspace_id: Annotated[
1686        str | None,
1687        Field(
1688            description=WORKSPACE_ID_TIP_TEXT,
1689            default=None,
1690        ),
1691    ],
1692) -> list[dict[str, Any]]:
1693    """List custom YAML source definitions in the Airbyte Cloud workspace.
1694
1695    Note: Only YAML (declarative) connectors are currently supported.
1696    Docker-based custom sources are not yet available.
1697    """
1698    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1699    definitions = workspace.list_custom_source_definitions(
1700        definition_type="yaml",
1701    )
1702
1703    return [
1704        {
1705            "definition_id": d.definition_id,
1706            "name": d.name,
1707            "version": d.version,
1708            "connector_builder_project_url": d.connector_builder_project_url,
1709        }
1710        for d in definitions
1711    ]
1712
1713
1714@mcp_tool(
1715    read_only=True,
1716    idempotent=True,
1717    open_world=True,
1718)
1719def get_custom_source_definition(
1720    ctx: Context,
1721    definition_id: Annotated[
1722        str,
1723        Field(description="The ID of the custom source definition to retrieve."),
1724    ],
1725    *,
1726    workspace_id: Annotated[
1727        str | None,
1728        Field(
1729            description=WORKSPACE_ID_TIP_TEXT,
1730            default=None,
1731        ),
1732    ],
1733    include_draft: Annotated[
1734        bool,
1735        Field(
1736            description=(
1737                "Whether to include the Connector Builder draft manifest in the response. "
1738                "If True and a draft exists, the response will include 'has_draft' and "
1739                "'draft_manifest' fields. Defaults to False."
1740            ),
1741            default=False,
1742        ),
1743    ] = False,
1744) -> dict[str, Any]:
1745    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1746
1747    Returns the full definition details including the published manifest YAML content.
1748    Optionally includes the Connector Builder draft manifest (unpublished changes)
1749    when include_draft=True.
1750
1751    Note: Only YAML (declarative) connectors are currently supported.
1752    Docker-based custom sources are not yet available.
1753    """
1754    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1755    definition = workspace.get_custom_source_definition(
1756        definition_id=definition_id,
1757        definition_type="yaml",
1758    )
1759
1760    result: dict[str, Any] = {
1761        "definition_id": definition.definition_id,
1762        "name": definition.name,
1763        "version": definition.version,
1764        "connector_builder_project_id": definition.connector_builder_project_id,
1765        "connector_builder_project_url": definition.connector_builder_project_url,
1766        "manifest": definition.manifest,
1767    }
1768
1769    if include_draft:
1770        result["has_draft"] = definition.has_draft
1771        result["draft_manifest"] = definition.draft_manifest
1772
1773    return result
1774
1775
1776@mcp_tool(
1777    read_only=True,
1778    idempotent=True,
1779    open_world=True,
1780)
1781def get_connector_builder_draft_manifest(
1782    ctx: Context,
1783    definition_id: Annotated[
1784        str,
1785        Field(description="The ID of the custom source definition to retrieve the draft for."),
1786    ],
1787    *,
1788    workspace_id: Annotated[
1789        str | None,
1790        Field(
1791            description=WORKSPACE_ID_TIP_TEXT,
1792            default=None,
1793        ),
1794    ],
1795) -> dict[str, Any]:
1796    """Get the Connector Builder draft manifest for a custom source definition.
1797
1798    Returns the working draft manifest that has been saved in the Connector Builder UI
1799    but not yet published. This is useful for inspecting what a user is currently working
1800    on before they publish their changes.
1801
1802    If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None.
1803    The published manifest is always included for comparison.
1804    """
1805    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1806    definition = workspace.get_custom_source_definition(
1807        definition_id=definition_id,
1808        definition_type="yaml",
1809    )
1810
1811    return {
1812        "definition_id": definition.definition_id,
1813        "name": definition.name,
1814        "connector_builder_project_id": definition.connector_builder_project_id,
1815        "connector_builder_project_url": definition.connector_builder_project_url,
1816        "has_draft": definition.has_draft,
1817        "draft_manifest": definition.draft_manifest,
1818        "published_manifest": definition.manifest,
1819    }
1820
1821
1822@mcp_tool(
1823    destructive=True,
1824    open_world=True,
1825)
1826def update_custom_source_definition(
1827    ctx: Context,
1828    definition_id: Annotated[
1829        str,
1830        Field(description="The ID of the definition to update."),
1831    ],
1832    manifest_yaml: Annotated[
1833        str | Path | None,
1834        Field(
1835            description=(
1836                "New manifest as YAML string or file path. "
1837                "Optional; omit to update only testing values."
1838            ),
1839            default=None,
1840        ),
1841    ] = None,
1842    *,
1843    workspace_id: Annotated[
1844        str | None,
1845        Field(
1846            description=WORKSPACE_ID_TIP_TEXT,
1847            default=None,
1848        ),
1849    ],
1850    pre_validate: Annotated[
1851        bool,
1852        Field(
1853            description="Whether to validate the manifest client-side before updating.",
1854            default=True,
1855        ),
1856    ] = True,
1857    testing_values: Annotated[
1858        dict | str | None,
1859        Field(
1860            description=(
1861                "Optional testing configuration values for the Builder UI. "
1862                "Can be provided as a JSON object or JSON string. "
1863                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1864                "If provided, these values replace any existing testing values "
1865                "for the connector builder project. The entire testing values object "
1866                "is overwritten, so pass the full set of values you want to persist."
1867            ),
1868            default=None,
1869        ),
1870    ],
1871    testing_values_secret_name: Annotated[
1872        str | None,
1873        Field(
1874            description=(
1875                "Optional name of a secret containing testing configuration values "
1876                "in JSON or YAML format. The secret will be resolved by the MCP "
1877                "server and merged into testing_values, with secret values taking "
1878                "precedence. This lets the agent reference secrets without sending "
1879                "raw values as tool arguments."
1880            ),
1881            default=None,
1882        ),
1883    ],
1884) -> str:
1885    """Update a custom YAML source definition in Airbyte Cloud.
1886
1887    Updates the manifest and/or testing values for an existing custom source definition.
1888    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1889    """
1890    check_guid_created_in_session(definition_id)
1891
1892    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1893
1894    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1895        raise PyAirbyteInputError(
1896            message=(
1897                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1898                "must be provided to update a custom source definition."
1899            ),
1900            context={
1901                "definition_id": definition_id,
1902                "workspace_id": workspace.workspace_id,
1903            },
1904        )
1905
1906    processed_manifest: str | Path | None = manifest_yaml
1907    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1908        processed_manifest = Path(manifest_yaml)
1909
1910    # Resolve testing values from inline config and/or secret
1911    testing_values_dict: dict[str, Any] | None = None
1912    if testing_values is not None or testing_values_secret_name is not None:
1913        testing_values_dict = (
1914            resolve_connector_config(
1915                config=testing_values,
1916                config_secret_name=testing_values_secret_name,
1917            )
1918            or None
1919        )
1920
1921    definition = workspace.get_custom_source_definition(
1922        definition_id=definition_id,
1923        definition_type="yaml",
1924    )
1925    custom_source: CustomCloudSourceDefinition = definition
1926
1927    if processed_manifest is not None:
1928        custom_source = definition.update_definition(
1929            manifest_yaml=processed_manifest,
1930            pre_validate=pre_validate,
1931        )
1932
1933    if testing_values_dict is not None:
1934        custom_source.set_testing_values(testing_values_dict)
1935
1936    return (
1937        "Successfully updated custom YAML source definition:\n"
1938        + _get_custom_source_definition_description(
1939            custom_source=custom_source,
1940        )
1941    )
1942
1943
1944@mcp_tool(
1945    destructive=True,
1946    open_world=True,
1947)
1948def permanently_delete_custom_source_definition(
1949    ctx: Context,
1950    definition_id: Annotated[
1951        str,
1952        Field(description="The ID of the custom source definition to delete."),
1953    ],
1954    name: Annotated[
1955        str,
1956        Field(description="The expected name of the custom source definition (for verification)."),
1957    ],
1958    *,
1959    workspace_id: Annotated[
1960        str | None,
1961        Field(
1962            description=WORKSPACE_ID_TIP_TEXT,
1963            default=None,
1964        ),
1965    ],
1966) -> str:
1967    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1968
1969    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1970    (case insensitive).
1971
1972    If the connector does not meet this requirement, the deletion will be rejected with a
1973    helpful error message. Instruct the user to rename the connector appropriately to authorize
1974    the deletion.
1975
1976    The provided name must match the actual name of the definition for the operation to proceed.
1977    This is a safety measure to ensure you are deleting the correct resource.
1978
1979    Note: Only YAML (declarative) connectors are currently supported.
1980    Docker-based custom sources are not yet available.
1981    """
1982    check_guid_created_in_session(definition_id)
1983    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1984    definition = workspace.get_custom_source_definition(
1985        definition_id=definition_id,
1986        definition_type="yaml",
1987    )
1988    actual_name: str = definition.name
1989
1990    # Verify the name matches
1991    if actual_name != name:
1992        raise PyAirbyteInputError(
1993            message=(
1994                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1995                "The provided name must exactly match the definition's actual name. "
1996                "This is a safety measure to prevent accidental deletion."
1997            ),
1998            context={
1999                "definition_id": definition_id,
2000                "expected_name": name,
2001                "actual_name": actual_name,
2002            },
2003        )
2004
2005    definition.permanently_delete(
2006        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
2007    )
2008    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
2009
2010
2011@mcp_tool(
2012    destructive=True,
2013    open_world=True,
2014    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2015)
2016def permanently_delete_cloud_source(
2017    ctx: Context,
2018    source_id: Annotated[
2019        str,
2020        Field(description="The ID of the deployed source to delete."),
2021    ],
2022    name: Annotated[
2023        str,
2024        Field(description="The expected name of the source (for verification)."),
2025    ],
2026) -> str:
2027    """Permanently delete a deployed source connector from Airbyte Cloud.
2028
2029    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
2030    (case insensitive).
2031
2032    If the source does not meet this requirement, the deletion will be rejected with a
2033    helpful error message. Instruct the user to rename the source appropriately to authorize
2034    the deletion.
2035
2036    The provided name must match the actual name of the source for the operation to proceed.
2037    This is a safety measure to ensure you are deleting the correct resource.
2038    """
2039    check_guid_created_in_session(source_id)
2040    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2041    source = workspace.get_source(source_id=source_id)
2042    actual_name: str = cast(str, source.name)
2043
2044    # Verify the name matches
2045    if actual_name != name:
2046        raise PyAirbyteInputError(
2047            message=(
2048                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2049                "The provided name must exactly match the source's actual name. "
2050                "This is a safety measure to prevent accidental deletion."
2051            ),
2052            context={
2053                "source_id": source_id,
2054                "expected_name": name,
2055                "actual_name": actual_name,
2056            },
2057        )
2058
2059    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2060    workspace.permanently_delete_source(
2061        source=source_id,
2062        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
2063    )
2064    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
2065
2066
2067@mcp_tool(
2068    destructive=True,
2069    open_world=True,
2070    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2071)
2072def permanently_delete_cloud_destination(
2073    ctx: Context,
2074    destination_id: Annotated[
2075        str,
2076        Field(description="The ID of the deployed destination to delete."),
2077    ],
2078    name: Annotated[
2079        str,
2080        Field(description="The expected name of the destination (for verification)."),
2081    ],
2082) -> str:
2083    """Permanently delete a deployed destination connector from Airbyte Cloud.
2084
2085    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
2086    (case insensitive).
2087
2088    If the destination does not meet this requirement, the deletion will be rejected with a
2089    helpful error message. Instruct the user to rename the destination appropriately to authorize
2090    the deletion.
2091
2092    The provided name must match the actual name of the destination for the operation to proceed.
2093    This is a safety measure to ensure you are deleting the correct resource.
2094    """
2095    check_guid_created_in_session(destination_id)
2096    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2097    destination = workspace.get_destination(destination_id=destination_id)
2098    actual_name: str = cast(str, destination.name)
2099
2100    # Verify the name matches
2101    if actual_name != name:
2102        raise PyAirbyteInputError(
2103            message=(
2104                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2105                "The provided name must exactly match the destination's actual name. "
2106                "This is a safety measure to prevent accidental deletion."
2107            ),
2108            context={
2109                "destination_id": destination_id,
2110                "expected_name": name,
2111                "actual_name": actual_name,
2112            },
2113        )
2114
2115    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2116    workspace.permanently_delete_destination(
2117        destination=destination_id,
2118        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2119    )
2120    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
2121
2122
2123@mcp_tool(
2124    destructive=True,
2125    open_world=True,
2126    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2127)
2128def permanently_delete_cloud_connection(
2129    ctx: Context,
2130    connection_id: Annotated[
2131        str,
2132        Field(description="The ID of the connection to delete."),
2133    ],
2134    name: Annotated[
2135        str,
2136        Field(description="The expected name of the connection (for verification)."),
2137    ],
2138    *,
2139    cascade_delete_source: Annotated[
2140        bool,
2141        Field(
2142            description=(
2143                "Whether to also delete the source connector associated with this connection."
2144            ),
2145            default=False,
2146        ),
2147    ] = False,
2148    cascade_delete_destination: Annotated[
2149        bool,
2150        Field(
2151            description=(
2152                "Whether to also delete the destination connector associated with this connection."
2153            ),
2154            default=False,
2155        ),
2156    ] = False,
2157) -> str:
2158    """Permanently delete a connection from Airbyte Cloud.
2159
2160    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2161    (case insensitive).
2162
2163    If the connection does not meet this requirement, the deletion will be rejected with a
2164    helpful error message. Instruct the user to rename the connection appropriately to authorize
2165    the deletion.
2166
2167    The provided name must match the actual name of the connection for the operation to proceed.
2168    This is a safety measure to ensure you are deleting the correct resource.
2169    """
2170    check_guid_created_in_session(connection_id)
2171    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2172    connection = workspace.get_connection(connection_id=connection_id)
2173    actual_name: str = cast(str, connection.name)
2174
2175    # Verify the name matches
2176    if actual_name != name:
2177        raise PyAirbyteInputError(
2178            message=(
2179                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2180                "The provided name must exactly match the connection's actual name. "
2181                "This is a safety measure to prevent accidental deletion."
2182            ),
2183            context={
2184                "connection_id": connection_id,
2185                "expected_name": name,
2186                "actual_name": actual_name,
2187            },
2188        )
2189
2190    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2191    workspace.permanently_delete_connection(
2192        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2193        connection=connection_id,
2194        cascade_delete_source=cascade_delete_source,
2195        cascade_delete_destination=cascade_delete_destination,
2196    )
2197    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
2198
2199
2200@mcp_tool(
2201    open_world=True,
2202    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2203)
2204def rename_cloud_source(
2205    ctx: Context,
2206    source_id: Annotated[
2207        str,
2208        Field(description="The ID of the deployed source to rename."),
2209    ],
2210    name: Annotated[
2211        str,
2212        Field(description="New name for the source."),
2213    ],
2214    *,
2215    workspace_id: Annotated[
2216        str | None,
2217        Field(
2218            description=WORKSPACE_ID_TIP_TEXT,
2219            default=None,
2220        ),
2221    ],
2222) -> str:
2223    """Rename a deployed source connector on Airbyte Cloud."""
2224    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2225    source = workspace.get_source(source_id=source_id)
2226    source.rename(name=name)
2227    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
2228
2229
2230@mcp_tool(
2231    destructive=True,
2232    open_world=True,
2233    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2234)
2235def update_cloud_source_config(
2236    ctx: Context,
2237    source_id: Annotated[
2238        str,
2239        Field(description="The ID of the deployed source to update."),
2240    ],
2241    config: Annotated[
2242        dict | str,
2243        Field(
2244            description="New configuration for the source connector.",
2245        ),
2246    ],
2247    config_secret_name: Annotated[
2248        str | None,
2249        Field(
2250            description="The name of the secret containing the configuration.",
2251            default=None,
2252        ),
2253    ] = None,
2254    *,
2255    workspace_id: Annotated[
2256        str | None,
2257        Field(
2258            description=WORKSPACE_ID_TIP_TEXT,
2259            default=None,
2260        ),
2261    ],
2262) -> str:
2263    """Update a deployed source connector's configuration on Airbyte Cloud.
2264
2265    This is a destructive operation that can break existing connections if the
2266    configuration is changed incorrectly. Use with caution.
2267    """
2268    check_guid_created_in_session(source_id)
2269    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2270    source = workspace.get_source(source_id=source_id)
2271
2272    config_dict = resolve_connector_config(
2273        config=config,
2274        config_secret_name=config_secret_name,
2275        config_spec_jsonschema=None,  # We don't have the spec here
2276    )
2277
2278    source.update_config(config=config_dict)
2279    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
2280
2281
2282@mcp_tool(
2283    open_world=True,
2284    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2285)
2286def rename_cloud_destination(
2287    ctx: Context,
2288    destination_id: Annotated[
2289        str,
2290        Field(description="The ID of the deployed destination to rename."),
2291    ],
2292    name: Annotated[
2293        str,
2294        Field(description="New name for the destination."),
2295    ],
2296    *,
2297    workspace_id: Annotated[
2298        str | None,
2299        Field(
2300            description=WORKSPACE_ID_TIP_TEXT,
2301            default=None,
2302        ),
2303    ],
2304) -> str:
2305    """Rename a deployed destination connector on Airbyte Cloud."""
2306    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2307    destination = workspace.get_destination(destination_id=destination_id)
2308    destination.rename(name=name)
2309    return (
2310        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2311        f"URL: {destination.connector_url}"
2312    )
2313
2314
2315@mcp_tool(
2316    destructive=True,
2317    open_world=True,
2318    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2319)
2320def update_cloud_destination_config(
2321    ctx: Context,
2322    destination_id: Annotated[
2323        str,
2324        Field(description="The ID of the deployed destination to update."),
2325    ],
2326    config: Annotated[
2327        dict | str,
2328        Field(
2329            description="New configuration for the destination connector.",
2330        ),
2331    ],
2332    config_secret_name: Annotated[
2333        str | None,
2334        Field(
2335            description="The name of the secret containing the configuration.",
2336            default=None,
2337        ),
2338    ],
2339    *,
2340    workspace_id: Annotated[
2341        str | None,
2342        Field(
2343            description=WORKSPACE_ID_TIP_TEXT,
2344            default=None,
2345        ),
2346    ],
2347) -> str:
2348    """Update a deployed destination connector's configuration on Airbyte Cloud.
2349
2350    This is a destructive operation that can break existing connections if the
2351    configuration is changed incorrectly. Use with caution.
2352    """
2353    check_guid_created_in_session(destination_id)
2354    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2355    destination = workspace.get_destination(destination_id=destination_id)
2356
2357    config_dict = resolve_connector_config(
2358        config=config,
2359        config_secret_name=config_secret_name,
2360        config_spec_jsonschema=None,  # We don't have the spec here
2361    )
2362
2363    destination.update_config(config=config_dict)
2364    return (
2365        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2366    )
2367
2368
2369@mcp_tool(
2370    open_world=True,
2371    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2372)
2373def rename_cloud_connection(
2374    ctx: Context,
2375    connection_id: Annotated[
2376        str,
2377        Field(description="The ID of the connection to rename."),
2378    ],
2379    name: Annotated[
2380        str,
2381        Field(description="New name for the connection."),
2382    ],
2383    *,
2384    workspace_id: Annotated[
2385        str | None,
2386        Field(
2387            description=WORKSPACE_ID_TIP_TEXT,
2388            default=None,
2389        ),
2390    ],
2391) -> str:
2392    """Rename a connection on Airbyte Cloud."""
2393    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2394    connection = workspace.get_connection(connection_id=connection_id)
2395    connection.rename(name=name)
2396    return (
2397        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2398        f"URL: {connection.connection_url}"
2399    )
2400
2401
2402@mcp_tool(
2403    destructive=True,
2404    open_world=True,
2405    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2406)
2407def set_cloud_connection_table_prefix(
2408    ctx: Context,
2409    connection_id: Annotated[
2410        str,
2411        Field(description="The ID of the connection to update."),
2412    ],
2413    prefix: Annotated[
2414        str,
2415        Field(description="New table prefix to use when syncing to the destination."),
2416    ],
2417    *,
2418    workspace_id: Annotated[
2419        str | None,
2420        Field(
2421            description=WORKSPACE_ID_TIP_TEXT,
2422            default=None,
2423        ),
2424    ],
2425) -> str:
2426    """Set the table prefix for a connection on Airbyte Cloud.
2427
2428    This is a destructive operation that can break downstream dependencies if the
2429    table prefix is changed incorrectly. Use with caution.
2430    """
2431    check_guid_created_in_session(connection_id)
2432    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2433    connection = workspace.get_connection(connection_id=connection_id)
2434    connection.set_table_prefix(prefix=prefix)
2435    return (
2436        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2437        f"URL: {connection.connection_url}"
2438    )
2439
2440
2441@mcp_tool(
2442    destructive=True,
2443    open_world=True,
2444    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2445)
2446def set_cloud_connection_selected_streams(
2447    ctx: Context,
2448    connection_id: Annotated[
2449        str,
2450        Field(description="The ID of the connection to update."),
2451    ],
2452    stream_names: Annotated[
2453        str | list[str],
2454        Field(
2455            description=(
2456                "The selected stream names to sync within the connection. "
2457                "Must be an explicit stream name or list of streams."
2458            )
2459        ),
2460    ],
2461    *,
2462    workspace_id: Annotated[
2463        str | None,
2464        Field(
2465            description=WORKSPACE_ID_TIP_TEXT,
2466            default=None,
2467        ),
2468    ],
2469) -> str:
2470    """Set the selected streams for a connection on Airbyte Cloud.
2471
2472    This is a destructive operation that can break existing connections if the
2473    stream selection is changed incorrectly. Use with caution.
2474    """
2475    check_guid_created_in_session(connection_id)
2476    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2477    connection = workspace.get_connection(connection_id=connection_id)
2478
2479    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2480    connection.set_selected_streams(stream_names=resolved_streams_list)
2481
2482    return (
2483        f"Successfully set selected streams for connection '{connection_id}' "
2484        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2485    )
2486
2487
2488@mcp_tool(
2489    open_world=True,
2490    destructive=True,
2491    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2492)
2493def update_cloud_connection(
2494    ctx: Context,
2495    connection_id: Annotated[
2496        str,
2497        Field(description="The ID of the connection to update."),
2498    ],
2499    *,
2500    enabled: Annotated[
2501        bool | None,
2502        Field(
2503            description=(
2504                "Set the connection's enabled status. "
2505                "True enables the connection (status='active'), "
2506                "False disables it (status='inactive'). "
2507                "Leave unset to keep the current status."
2508            ),
2509            default=None,
2510        ),
2511    ],
2512    cron_expression: Annotated[
2513        str | None,
2514        Field(
2515            description=(
2516                "A cron expression defining when syncs should run. "
2517                "Examples: '0 0 * * *' (daily at midnight UTC), "
2518                "'0 */6 * * *' (every 6 hours), "
2519                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2520                "Leave unset to keep the current schedule. "
2521                "Cannot be used together with 'manual_schedule'."
2522            ),
2523            default=None,
2524        ),
2525    ],
2526    manual_schedule: Annotated[
2527        bool | None,
2528        Field(
2529            description=(
2530                "Set to True to disable automatic syncs (manual scheduling only). "
2531                "Syncs will only run when manually triggered. "
2532                "Cannot be used together with 'cron_expression'."
2533            ),
2534            default=None,
2535        ),
2536    ],
2537    workspace_id: Annotated[
2538        str | None,
2539        Field(
2540            description=WORKSPACE_ID_TIP_TEXT,
2541            default=None,
2542        ),
2543    ],
2544) -> str:
2545    """Update a connection's settings on Airbyte Cloud.
2546
2547    This tool allows updating multiple connection settings in a single call:
2548    - Enable or disable the connection
2549    - Set a cron schedule for automatic syncs
2550    - Switch to manual scheduling (no automatic syncs)
2551
2552    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2553    parameters are mutually exclusive.
2554    """
2555    check_guid_created_in_session(connection_id)
2556
2557    # Validate that at least one setting is provided
2558    if enabled is None and cron_expression is None and manual_schedule is None:
2559        raise ValueError(
2560            "At least one setting must be provided: 'enabled', 'cron_expression', "
2561            "or 'manual_schedule'."
2562        )
2563
2564    # Validate mutually exclusive schedule options
2565    if cron_expression is not None and manual_schedule is True:
2566        raise ValueError(
2567            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2568            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2569            "for manual-only syncs."
2570        )
2571
2572    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2573    connection = workspace.get_connection(connection_id=connection_id)
2574
2575    changes_made: list[str] = []
2576
2577    # Apply enabled status change
2578    if enabled is not None:
2579        connection.set_enabled(enabled=enabled)
2580        status_str = "enabled" if enabled else "disabled"
2581        changes_made.append(f"status set to '{status_str}'")
2582
2583    # Apply schedule change
2584    if cron_expression is not None:
2585        connection.set_schedule(cron_expression=cron_expression)
2586        changes_made.append(f"schedule set to '{cron_expression}'")
2587    elif manual_schedule is True:
2588        connection.set_manual_schedule()
2589        changes_made.append("schedule set to 'manual'")
2590
2591    changes_summary = ", ".join(changes_made)
2592    return (
2593        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2594        f"URL: {connection.connection_url}"
2595    )
2596
2597
2598@mcp_tool(
2599    read_only=True,
2600    idempotent=True,
2601    open_world=True,
2602    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2603)
2604def get_connection_artifact(
2605    ctx: Context,
2606    connection_id: Annotated[
2607        str,
2608        Field(description="The ID of the Airbyte Cloud connection."),
2609    ],
2610    artifact_type: Annotated[
2611        Literal["state", "catalog"],
2612        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2613    ],
2614    *,
2615    workspace_id: Annotated[
2616        str | None,
2617        Field(
2618            description=WORKSPACE_ID_TIP_TEXT,
2619            default=None,
2620        ),
2621    ],
2622) -> dict[str, Any]:
2623    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2624
2625    Retrieves the specified artifact for a connection:
2626    - 'state': Returns the full raw connection state including stateType and all
2627      state data, or {"ERROR": "..."} if no state is set.
2628    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2629      or {"ERROR": "..."} if not found.
2630    """
2631    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2632    connection = workspace.get_connection(connection_id=connection_id)
2633
2634    if artifact_type == "state":
2635        result = connection.dump_raw_state()
2636        if result.get("stateType") == "not_set":
2637            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2638        return result
2639
2640    # artifact_type == "catalog"
2641    result = connection.dump_raw_catalog()
2642    if result is None:
2643        return {"ERROR": "No catalog found for this connection"}
2644    return result
2645
2646
2647def _add_defaults_for_exclude_args(
2648    exclude_args: list[str],
2649) -> None:
2650    """Patch registered tool functions to add Python-level defaults for excluded args.
2651
2652    FastMCP requires that excluded args have Python-level default values, but MCP tool
2653    functions should only use Field(default=...) in their Annotated type hints (not
2654    Python-level `= None`). This function bridges the gap by dynamically adding Python
2655    defaults to the function signatures at registration time, so the source code stays
2656    clean while satisfying FastMCP's requirement.
2657
2658    Args:
2659        exclude_args: List of argument names that will be excluded from the tool schema.
2660    """
2661    import inspect  # noqa: PLC0415  # Local import for optional patching logic
2662
2663    from fastmcp_extensions.decorators import (  # noqa: PLC0415
2664        _REGISTERED_TOOLS,  # noqa: PLC2701
2665    )
2666
2667    for func, _annotations in _REGISTERED_TOOLS:
2668        sig = inspect.signature(func)
2669        needs_patch = any(
2670            arg_name in sig.parameters
2671            and sig.parameters[arg_name].default is inspect.Parameter.empty
2672            for arg_name in exclude_args
2673        )
2674        if needs_patch:
2675            new_params = [
2676                p.replace(default=None)
2677                if name in exclude_args and p.default is inspect.Parameter.empty
2678                else p
2679                for name, p in sig.parameters.items()
2680            ]
2681            func.__signature__ = sig.replace(parameters=new_params)  # type: ignore[attr-defined]
2682
2683
2684def register_cloud_tools(app: FastMCP) -> None:
2685    """Register cloud tools with the FastMCP app.
2686
2687    Args:
2688        app: FastMCP application instance
2689    """
2690    exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None
2691    if exclude_args:
2692        _add_defaults_for_exclude_args(exclude_args)
2693    register_mcp_tools(
2694        app,
2695        mcp_module=__name__,
2696        exclude_args=exclude_args,
2697    )