airbyte.mcp.cloud
Airbyte Cloud MCP operations.
cloud module
MCP primitives registered by the cloud module of the airbyte-mcp server: 35 tool(s), 0 prompt(s), 0 resource(s).
Tools (35)
check_airbyte_cloud_workspace
Hints: read-only · idempotent · open-world
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace details including workspace ID, name, organization info, and billing status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Information about a workspace in Airbyte Cloud.",
"properties": {
"workspace_id": {
"type": "string"
},
"workspace_name": {
"type": "string"
},
"workspace_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"organization_id": {
"type": "string"
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"payment_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"subscription_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"is_account_locked": {
"default": false,
"type": "boolean"
}
},
"required": [
"workspace_id",
"workspace_name",
"organization_id"
],
"type": "object"
}
create_connection_on_cloud
Hints: open-world
Create a connection between a deployed source and destination on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_name |
string |
yes | — | The name of the connection. |
source_id |
string |
yes | — | The ID of the deployed source. |
destination_id |
string |
yes | — | The ID of the deployed destination. |
selected_streams |
string | array<string> |
yes | — | The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
table_prefix |
string | null |
no | null |
Optional table prefix to use when syncing to the destination. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_name": {
"description": "The name of the connection.",
"type": "string"
},
"source_id": {
"description": "The ID of the deployed source.",
"type": "string"
},
"destination_id": {
"description": "The ID of the deployed destination.",
"type": "string"
},
"selected_streams": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
],
"description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"table_prefix": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional table prefix to use when syncing to the destination."
}
},
"required": [
"connection_name",
"source_id",
"destination_id",
"selected_streams"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
deploy_destination_to_cloud
Hints: open-world
Deploy a destination connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_name |
string |
yes | — | The name to use when deploying the destination. |
destination_connector_name |
string |
yes | — | The name of the destination connector (e.g., 'destination-postgres'). |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
config |
object | string | null |
no | null |
The configuration for the destination connector. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
unique |
boolean |
no | true |
Whether to require a unique name. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_name": {
"description": "The name to use when deploying the destination.",
"type": "string"
},
"destination_connector_name": {
"description": "The name of the destination connector (e.g., 'destination-postgres').",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the destination connector."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"unique": {
"default": true,
"description": "Whether to require a unique name.",
"type": "boolean"
}
},
"required": [
"destination_name",
"destination_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
deploy_noop_destination_to_cloud
Hints: open-world
Deploy the No-op destination to Airbyte Cloud for testing purposes.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
no | "No-op Destination" |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
unique |
boolean |
no | true |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"name": {
"default": "No-op Destination",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"unique": {
"default": true,
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
deploy_source_to_cloud
Hints: open-world
Deploy a source connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_name |
string |
yes | — | The name to use when deploying the source. |
source_connector_name |
string |
yes | — | The name of the source connector (e.g., 'source-faker'). |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
config |
object | string | null |
no | null |
The configuration for the source connector. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
unique |
boolean |
no | true |
Whether to require a unique name. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_name": {
"description": "The name to use when deploying the source.",
"type": "string"
},
"source_connector_name": {
"description": "The name of the source connector (e.g., 'source-faker').",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"unique": {
"default": true,
"description": "Whether to require a unique name.",
"type": "boolean"
}
},
"required": [
"source_name",
"source_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
describe_cloud_connection
Hints: read-only · idempotent · open-world
Get detailed information about a specific deployed connection.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to describe. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to describe.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Detailed information about a deployed connection in Airbyte Cloud.",
"properties": {
"connection_id": {
"type": "string"
},
"connection_name": {
"type": "string"
},
"connection_url": {
"type": "string"
},
"source_id": {
"type": "string"
},
"source_name": {
"type": "string"
},
"destination_id": {
"type": "string"
},
"destination_name": {
"type": "string"
},
"selected_streams": {
"items": {
"type": "string"
},
"type": "array"
},
"table_prefix": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
}
},
"required": [
"connection_id",
"connection_name",
"connection_url",
"source_id",
"source_name",
"destination_id",
"destination_name",
"selected_streams",
"table_prefix"
],
"type": "object"
}
describe_cloud_destination
Hints: read-only · idempotent · open-world
Get detailed information about a specific deployed destination connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the destination to describe. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the destination to describe.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"destination_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Detailed information about a deployed destination connector in Airbyte Cloud.",
"properties": {
"destination_id": {
"type": "string"
},
"destination_name": {
"type": "string"
},
"destination_url": {
"type": "string"
},
"connector_definition_id": {
"type": "string"
}
},
"required": [
"destination_id",
"destination_name",
"destination_url",
"connector_definition_id"
],
"type": "object"
}
describe_cloud_organization
Hints: read-only · idempotent · open-world
Get details about a specific organization including billing status.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
organization_id |
string | null |
no | null |
Organization ID. Required if organization_name is not provided. |
organization_name |
string | null |
no | null |
Organization name (exact match). Required if organization_id is not provided. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization ID. Required if organization_name is not provided."
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization name (exact match). Required if organization_id is not provided."
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Information about an organization in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"email": {
"type": "string"
},
"payment_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"subscription_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"is_account_locked": {
"default": false,
"type": "boolean"
}
},
"required": [
"id",
"name",
"email"
],
"type": "object"
}
describe_cloud_source
Hints: read-only · idempotent · open-world
Get detailed information about a specific deployed source connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the source to describe. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the source to describe.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"source_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Detailed information about a deployed source connector in Airbyte Cloud.",
"properties": {
"source_id": {
"type": "string"
},
"source_name": {
"type": "string"
},
"source_url": {
"type": "string"
},
"connector_definition_id": {
"type": "string"
}
},
"required": [
"source_id",
"source_name",
"source_url",
"connector_definition_id"
],
"type": "object"
}
get_cloud_sync_logs
Hints: read-only · idempotent · open-world
Get the logs from a sync job attempt on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
job_id |
integer | null | null |
no | null |
|
attempt_number |
integer | null | null |
no | null |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
max_lines |
integer |
no | 4000 |
Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied. |
from_tail |
boolean | null |
no | null |
Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if line_offset is not specified. Cannot combine from_tail=True with line_offset. |
line_offset |
integer | null |
no | null |
Number of lines to skip from the beginning of the logs. Cannot be combined with from_tail=True. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"job_id": {
"anyOf": [
{
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"description": "Optional job ID. If not provided, the latest job will be used."
},
{
"type": "null"
}
],
"default": null
},
"attempt_number": {
"anyOf": [
{
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"description": "Optional attempt number. If not provided, the latest attempt will be used."
},
{
"type": "null"
}
],
"default": null
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"max_lines": {
"default": 4000,
"description": "Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.",
"type": "integer"
},
"from_tail": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`."
},
"line_offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of reading sync logs with pagination support.",
"properties": {
"job_id": {
"type": "integer"
},
"attempt_number": {
"type": "integer"
},
"log_text": {
"type": "string"
},
"log_text_start_line": {
"type": "integer"
},
"log_text_line_count": {
"type": "integer"
},
"total_log_lines_available": {
"type": "integer"
}
},
"required": [
"job_id",
"attempt_number",
"log_text",
"log_text_start_line",
"log_text_line_count",
"total_log_lines_available"
],
"type": "object"
}
get_cloud_sync_status
Hints: read-only · idempotent · open-world
Get the status of a sync job from the Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
job_id |
integer | null |
no | null |
Optional job ID. If not provided, the latest job will be used. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
include_attempts |
boolean |
no | false |
Whether to include detailed attempts information. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"job_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional job ID. If not provided, the latest job will be used."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"include_attempts": {
"default": false,
"description": "Whether to include detailed attempts information.",
"type": "boolean"
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_connection_artifact
Hints: read-only · idempotent · open-world
Get a connection artifact (state or catalog) from Airbyte Cloud.
Retrieves the specified artifact for a connection:
- 'state': Returns the full raw connection state including stateType and all
state data, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
or {"ERROR": "..."} if not found.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
artifact_type |
enum("state", "catalog") |
yes | — | The type of artifact to retrieve: 'state' or 'catalog'. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"artifact_type": {
"description": "The type of artifact to retrieve: 'state' or 'catalog'.",
"enum": [
"state",
"catalog"
],
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"artifact_type"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_connector_builder_draft_manifest
Hints: read-only · idempotent · open-world
Get the Connector Builder draft manifest for a custom source definition.
Returns the working draft manifest that has been saved in the Connector Builder UI but not yet published. This is useful for inspecting what a user is currently working on before they publish their changes.
If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. The published manifest is always included for comparison.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the custom source definition to retrieve the draft for. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the custom source definition to retrieve the draft for.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"definition_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_custom_source_definition
Hints: read-only · idempotent · open-world
Get a custom YAML source definition from Airbyte Cloud, including its manifest.
Returns the full definition details including the published manifest YAML content. Optionally includes the Connector Builder draft manifest (unpublished changes) when include_draft=True.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the custom source definition to retrieve. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
include_draft |
boolean |
no | false |
Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the custom source definition to retrieve.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"include_draft": {
"default": false,
"description": "Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.",
"type": "boolean"
}
},
"required": [
"definition_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
list_cloud_sync_jobs
Hints: read-only · idempotent · open-world
List sync jobs for a connection with pagination support.
This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
max_jobs |
integer |
no | 20 |
Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500. |
from_tail |
boolean | null |
no | null |
When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if jobs_offset is not specified. Cannot combine from_tail=True with jobs_offset. |
jobs_offset |
integer | null |
no | null |
Number of jobs to skip from the beginning. Cannot be combined with from_tail=True. |
job_type |
enum("sync", "reset", "refresh", "clear") | null |
no | null |
Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"max_jobs": {
"default": 20,
"description": "Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.",
"type": "integer"
},
"from_tail": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if `jobs_offset` is not specified. Cannot combine `from_tail=True` with `jobs_offset`."
},
"jobs_offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number of jobs to skip from the beginning. Cannot be combined with `from_tail=True`."
},
"job_type": {
"anyOf": [
{
"description": "Enum that describes the different types of jobs that the platform runs.",
"enum": [
"sync",
"reset",
"refresh",
"clear"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of listing sync jobs with pagination support.",
"properties": {
"jobs": {
"items": {
"description": "Information about a sync job.",
"properties": {
"job_id": {
"type": "integer"
},
"status": {
"type": "string"
},
"bytes_synced": {
"type": "integer"
},
"records_synced": {
"type": "integer"
},
"start_time": {
"type": "string"
},
"job_url": {
"type": "string"
}
},
"required": [
"job_id",
"status",
"bytes_synced",
"records_synced",
"start_time",
"job_url"
],
"type": "object"
},
"type": "array"
},
"jobs_count": {
"type": "integer"
},
"jobs_offset": {
"type": "integer"
},
"from_tail": {
"type": "boolean"
}
},
"required": [
"jobs",
"jobs_count",
"jobs_offset",
"from_tail"
],
"type": "object"
}
list_cloud_workspaces
Hints: read-only · idempotent · open-world
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
organization_id |
string | null |
no | null |
Organization ID. Required if organization_name is not provided. |
organization_name |
string | null |
no | null |
Organization name (exact match). Required if organization_id is not provided. |
name_contains |
string | null |
no | null |
Optional substring to filter workspaces by name (server-side filtering) |
max_items_limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization ID. Required if organization_name is not provided."
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization name (exact match). Required if organization_id is not provided."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional substring to filter workspaces by name (server-side filtering)"
},
"max_items_limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a workspace in Airbyte Cloud.",
"properties": {
"workspace_id": {
"type": "string"
},
"workspace_name": {
"type": "string"
},
"workspace_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"organization_id": {
"type": "string"
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"payment_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"subscription_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"is_account_locked": {
"default": false,
"type": "boolean"
}
},
"required": [
"workspace_id",
"workspace_name",
"organization_id"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_custom_source_definitions
Hints: read-only · idempotent · open-world
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_deployed_cloud_connections
Hints: read-only · idempotent · open-world
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
name_contains |
string | null |
no | null |
Optional case-insensitive substring to filter connections by name |
max_items_limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
with_connection_status |
boolean | null |
no | false |
If True, include status info for each connection's most recent sync job |
failing_connections_only |
boolean | null |
no | false |
If True, only return connections with failed/cancelled last sync |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional case-insensitive substring to filter connections by name"
},
"max_items_limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
},
"with_connection_status": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": false,
"description": "If True, include status info for each connection's most recent sync job"
},
"failing_connections_only": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": false,
"description": "If True, only return connections with failed/cancelled last sync"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a deployed connection in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"url": {
"type": "string"
},
"source_id": {
"type": "string"
},
"destination_id": {
"type": "string"
},
"last_job_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"last_job_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"last_job_time": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"currently_running_job_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"currently_running_job_start_time": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"id",
"name",
"url",
"source_id",
"destination_id"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_deployed_cloud_destination_connectors
Hints: read-only · idempotent · open-world
List all deployed destination connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
name_contains |
string | null |
no | null |
Optional case-insensitive substring to filter destinations by name |
max_items_limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional case-insensitive substring to filter destinations by name"
},
"max_items_limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a deployed destination connector in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"id",
"name",
"url"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_deployed_cloud_source_connectors
Hints: read-only · idempotent · open-world
List all deployed source connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
name_contains |
string | null |
no | null |
Optional case-insensitive substring to filter sources by name |
max_items_limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional case-insensitive substring to filter sources by name"
},
"max_items_limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a deployed source connector in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"id",
"name",
"url"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_cloud_connection
Hints: destructive · open-world
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.
The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to delete. |
name |
string |
yes | — | The expected name of the connection (for verification). |
cascade_delete_source |
boolean |
no | false |
Whether to also delete the source connector associated with this connection. |
cascade_delete_destination |
boolean |
no | false |
Whether to also delete the destination connector associated with this connection. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the connection (for verification).",
"type": "string"
},
"cascade_delete_source": {
"default": false,
"description": "Whether to also delete the source connector associated with this connection.",
"type": "boolean"
},
"cascade_delete_destination": {
"default": false,
"description": "Whether to also delete the destination connector associated with this connection.",
"type": "boolean"
}
},
"required": [
"connection_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_cloud_destination
Hints: destructive · open-world
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.
The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the deployed destination to delete. |
name |
string |
yes | — | The expected name of the destination (for verification). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the deployed destination to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the destination (for verification).",
"type": "string"
}
},
"required": [
"destination_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_cloud_source
Hints: destructive · open-world
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.
The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the deployed source to delete. |
name |
string |
yes | — | The expected name of the source (for verification). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the deployed source to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the source (for verification).",
"type": "string"
}
},
"required": [
"source_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_custom_source_definition
Hints: destructive · open-world
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the custom source definition to delete. |
name |
string |
yes | — | The expected name of the custom source definition (for verification). |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the custom source definition to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the custom source definition (for verification).",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"definition_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
publish_custom_source_definition
Hints: open-world
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | — | The name for the custom connector definition. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
manifest_yaml |
string | string | null | null |
no | null |
|
unique |
boolean |
no | true |
Whether to require a unique name. |
pre_validate |
boolean |
no | true |
Whether to validate the manifest client-side before publishing. |
testing_values |
object | string | null |
no | null |
Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations. |
testing_values_secret_name |
string | null |
no | null |
Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"name": {
"description": "The name for the custom connector definition.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"manifest_yaml": {
"anyOf": [
{
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors."
},
{
"type": "null"
}
],
"default": null
},
"unique": {
"default": true,
"description": "Whether to require a unique name.",
"type": "boolean"
},
"pre_validate": {
"default": true,
"description": "Whether to validate the manifest client-side before publishing.",
"type": "boolean"
},
"testing_values": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations."
},
"testing_values_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
}
},
"required": [
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
rename_cloud_connection
Hints: open-world
Rename a connection on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to rename. |
name |
string |
yes | — | New name for the connection. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to rename.",
"type": "string"
},
"name": {
"description": "New name for the connection.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
rename_cloud_destination
Hints: open-world
Rename a deployed destination connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the deployed destination to rename. |
name |
string |
yes | — | New name for the destination. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the deployed destination to rename.",
"type": "string"
},
"name": {
"description": "New name for the destination.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"destination_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
rename_cloud_source
Hints: open-world
Rename a deployed source connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the deployed source to rename. |
name |
string |
yes | — | New name for the source. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the deployed source to rename.",
"type": "string"
},
"name": {
"description": "New name for the source.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"source_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
run_cloud_sync
Hints: open-world
Run a sync job on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
wait |
boolean |
no | false |
Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios. |
wait_timeout |
integer |
no | 300 |
Maximum time to wait for sync completion (seconds). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"wait": {
"default": false,
"description": "Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.",
"type": "boolean"
},
"wait_timeout": {
"default": 300,
"description": "Maximum time to wait for sync completion (seconds).",
"type": "integer"
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
set_cloud_connection_selected_streams
Hints: destructive · open-world
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to update. |
stream_names |
string | array<string> |
yes | — | The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to update.",
"type": "string"
},
"stream_names": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
],
"description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"stream_names"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
set_cloud_connection_table_prefix
Hints: destructive · open-world
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to update. |
prefix |
string |
yes | — | New table prefix to use when syncing to the destination. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to update.",
"type": "string"
},
"prefix": {
"description": "New table prefix to use when syncing to the destination.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"prefix"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_cloud_connection
Hints: destructive · open-world
Update a connection's settings on Airbyte Cloud.
This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)
At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to update. |
enabled |
boolean | null |
no | null |
Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status. |
cron_expression |
string | null |
no | null |
A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'. |
manual_schedule |
boolean | null |
no | null |
Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to update.",
"type": "string"
},
"enabled": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status."
},
"cron_expression": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'."
},
"manual_schedule": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_cloud_destination_config
Hints: destructive · open-world
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the deployed destination to update. |
config |
object | string |
yes | — | New configuration for the destination connector. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the deployed destination to update.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
}
],
"description": "New configuration for the destination connector."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"destination_id",
"config"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_cloud_source_config
Hints: destructive · open-world
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the deployed source to update. |
config |
object | string |
yes | — | New configuration for the source connector. |
config_secret_name |
string | null | null |
no | null |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the deployed source to update.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
}
],
"description": "New configuration for the source connector."
},
"config_secret_name": {
"anyOf": [
{
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
{
"type": "null"
}
],
"default": null
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"source_id",
"config"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_custom_source_definition
Hints: destructive · open-world
Update a custom YAML source definition in Airbyte Cloud.
Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the definition to update. |
manifest_yaml |
string | string | null | null |
no | null |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
pre_validate |
boolean |
no | true |
Whether to validate the manifest client-side before updating. |
testing_values |
object | string | null |
no | null |
Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist. |
testing_values_secret_name |
string | null |
no | null |
Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the definition to update.",
"type": "string"
},
"manifest_yaml": {
"anyOf": [
{
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "New manifest as YAML string or file path. Optional; omit to update only testing values."
},
{
"type": "null"
}
],
"default": null
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"pre_validate": {
"default": true,
"description": "Whether to validate the manifest client-side before updating.",
"type": "boolean"
},
"testing_values": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist."
},
"testing_values_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
}
},
"required": [
"definition_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations. 3 4.. include:: ../../docs/mcp-generated/cloud.md 5""" 6 7# No public Python API — MCP primitives are registered via decorators and 8# documented via the generated Markdown include above. Setting `__all__` to an 9# empty list tells pdoc (and other doc tools) not to surface the individual 10# tool / helper definitions as a redundant "API Documentation" list. 11__all__: list[str] = [] 12 13from pathlib import Path 14from typing import Annotated, Any, Literal, cast 15 16from airbyte_api.models import JobTypeEnum 17from fastmcp import Context, FastMCP 18from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 19from pydantic import BaseModel, Field 20 21from airbyte import cloud, get_destination, get_source 22from airbyte._util import api_util 23from airbyte.cloud.connectors import CustomCloudSourceDefinition 24from airbyte.cloud.constants import FAILED_STATUSES 25from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace 26from airbyte.constants import ( 27 MCP_CONFIG_API_URL, 28 MCP_CONFIG_BEARER_TOKEN, 29 MCP_CONFIG_CLIENT_ID, 30 MCP_CONFIG_CLIENT_SECRET, 31 MCP_CONFIG_WORKSPACE_ID, 32) 33from airbyte.destinations.util import get_noop_destination 34from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 35from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 36from airbyte.mcp._tool_utils import ( 37 AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET, 38 check_guid_created_in_session, 39 register_guid_created_in_session, 40) 41from airbyte.secrets import SecretString 42 43 44CLOUD_AUTH_TIP_TEXT = ( 45 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 46 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 47 "will be used to authenticate with the Airbyte Cloud API." 48) 49WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 50 51 52class CloudSourceResult(BaseModel): 53 """Information about a deployed source connector in Airbyte Cloud.""" 54 55 id: str 56 """The source ID.""" 57 name: str 58 """Display name of the source.""" 59 url: str 60 """Web URL for managing this source in Airbyte Cloud.""" 61 62 63class CloudDestinationResult(BaseModel): 64 """Information about a deployed destination connector in Airbyte Cloud.""" 65 66 id: str 67 """The destination ID.""" 68 name: str 69 """Display name of the destination.""" 70 url: str 71 """Web URL for managing this destination in Airbyte Cloud.""" 72 73 74class CloudConnectionResult(BaseModel): 75 """Information about a deployed connection in Airbyte Cloud.""" 76 77 id: str 78 """The connection ID.""" 79 name: str 80 """Display name of the connection.""" 81 url: str 82 """Web URL for managing this connection in Airbyte Cloud.""" 83 source_id: str 84 """ID of the source used by this connection.""" 85 destination_id: str 86 """ID of the destination used by this connection.""" 87 last_job_status: str | None = None 88 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 89 Only populated when with_connection_status=True.""" 90 last_job_id: int | None = None 91 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 92 last_job_time: str | None = None 93 """ISO 8601 timestamp of the most recent completed sync. 94 Only populated when with_connection_status=True.""" 95 currently_running_job_id: int | None = None 96 """Job ID of a currently running sync, if any. 97 Only populated when with_connection_status=True.""" 98 currently_running_job_start_time: str | None = None 99 """ISO 8601 timestamp of when the currently running sync started. 100 Only populated when with_connection_status=True.""" 101 102 103class CloudSourceDetails(BaseModel): 104 """Detailed information about a deployed source connector in Airbyte Cloud.""" 105 106 source_id: str 107 """The source ID.""" 108 source_name: str 109 """Display name of the source.""" 110 source_url: str 111 """Web URL for managing this source in Airbyte Cloud.""" 112 connector_definition_id: str 113 """The connector definition ID (e.g., the ID for 'source-postgres').""" 114 115 116class CloudDestinationDetails(BaseModel): 117 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 118 119 destination_id: str 120 """The destination ID.""" 121 destination_name: str 122 """Display name of the destination.""" 123 destination_url: str 124 """Web URL for managing this destination in Airbyte Cloud.""" 125 connector_definition_id: str 126 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 127 128 129class CloudConnectionDetails(BaseModel): 130 """Detailed information about a deployed connection in Airbyte Cloud.""" 131 132 connection_id: str 133 """The connection ID.""" 134 connection_name: str 135 """Display name of the connection.""" 136 connection_url: str 137 """Web URL for managing this connection in Airbyte Cloud.""" 138 source_id: str 139 """ID of the source used by this connection.""" 140 source_name: str 141 """Display name of the source.""" 142 destination_id: str 143 """ID of the destination used by this connection.""" 144 destination_name: str 145 """Display name of the destination.""" 146 selected_streams: list[str] 147 """List of stream names selected for syncing.""" 148 table_prefix: str | None 149 """Table prefix applied when syncing to the destination.""" 150 151 152class CloudOrganizationResult(BaseModel): 153 """Information about an organization in Airbyte Cloud.""" 154 155 id: str 156 """The organization ID.""" 157 name: str 158 """Display name of the organization.""" 159 email: str 160 """Email associated with the organization.""" 161 payment_status: str | None = None 162 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 163 When 'disabled', syncs are blocked due to unpaid invoices.""" 164 subscription_status: str | None = None 165 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 166 'unsubscribed').""" 167 is_account_locked: bool = False 168 """Whether the account is locked due to billing issues. 169 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 170 Defaults to False unless we have affirmative evidence of a locked state.""" 171 172 173class CloudWorkspaceResult(BaseModel): 174 """Information about a workspace in Airbyte Cloud.""" 175 176 workspace_id: str 177 """The workspace ID.""" 178 workspace_name: str 179 """Display name of the workspace.""" 180 workspace_url: str | None = None 181 """URL to access the workspace in Airbyte Cloud.""" 182 organization_id: str 183 """ID of the organization (requires ORGANIZATION_READER permission).""" 184 organization_name: str | None = None 185 """Name of the organization (requires ORGANIZATION_READER permission).""" 186 payment_status: str | None = None 187 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 188 When 'disabled', syncs are blocked due to unpaid invoices. 189 Requires ORGANIZATION_READER permission.""" 190 subscription_status: str | None = None 191 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 192 'unsubscribed'). Requires ORGANIZATION_READER permission.""" 193 is_account_locked: bool = False 194 """Whether the account is locked due to billing issues. 195 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 196 Defaults to False unless we have affirmative evidence of a locked state. 197 Requires ORGANIZATION_READER permission.""" 198 199 200class LogReadResult(BaseModel): 201 """Result of reading sync logs with pagination support.""" 202 203 job_id: int 204 """The job ID the logs belong to.""" 205 attempt_number: int 206 """The attempt number the logs belong to.""" 207 log_text: str 208 """The string containing the log text we are returning.""" 209 log_text_start_line: int 210 """1-based line index of the first line returned.""" 211 log_text_line_count: int 212 """Count of lines we are returning.""" 213 total_log_lines_available: int 214 """Total number of log lines available, shows if any lines were missed due to the limit.""" 215 216 217class SyncJobResult(BaseModel): 218 """Information about a sync job.""" 219 220 job_id: int 221 """The job ID.""" 222 status: str 223 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 224 bytes_synced: int 225 """Number of bytes synced in this job.""" 226 records_synced: int 227 """Number of records synced in this job.""" 228 start_time: str 229 """ISO 8601 timestamp of when the job started.""" 230 job_url: str 231 """URL to view the job in Airbyte Cloud.""" 232 233 234class SyncJobListResult(BaseModel): 235 """Result of listing sync jobs with pagination support.""" 236 237 jobs: list[SyncJobResult] 238 """List of sync jobs.""" 239 jobs_count: int 240 """Number of jobs returned in this response.""" 241 jobs_offset: int 242 """Offset used for this request (0 if not specified).""" 243 from_tail: bool 244 """Whether jobs are ordered newest-first (True) or oldest-first (False).""" 245 246 247def _get_cloud_workspace( 248 ctx: Context, 249 workspace_id: str | None = None, 250) -> CloudWorkspace: 251 """Get an authenticated CloudWorkspace. 252 253 Resolves credentials from multiple sources via MCP config args in order: 254 1. HTTP headers (when running as MCP server with HTTP/SSE transport) 255 2. Environment variables 256 257 The ctx parameter provides access to MCP config values that are resolved 258 from HTTP headers or environment variables based on the config args 259 defined in server.py. 260 """ 261 resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID) 262 if not resolved_workspace_id: 263 raise PyAirbyteInputError( 264 message="Workspace ID is required but not provided.", 265 guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.", 266 ) 267 268 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 269 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 270 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 271 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 272 273 return CloudWorkspace( 274 workspace_id=resolved_workspace_id, 275 client_id=SecretString(client_id) if client_id else None, 276 client_secret=SecretString(client_secret) if client_secret else None, 277 bearer_token=SecretString(bearer_token) if bearer_token else None, 278 api_root=api_url, 279 ) 280 281 282@mcp_tool( 283 open_world=True, 284 extra_help_text=CLOUD_AUTH_TIP_TEXT, 285) 286def deploy_source_to_cloud( 287 ctx: Context, 288 source_name: Annotated[ 289 str, 290 Field(description="The name to use when deploying the source."), 291 ], 292 source_connector_name: Annotated[ 293 str, 294 Field(description="The name of the source connector (e.g., 'source-faker')."), 295 ], 296 *, 297 workspace_id: Annotated[ 298 str | None, 299 Field( 300 description=WORKSPACE_ID_TIP_TEXT, 301 default=None, 302 ), 303 ], 304 config: Annotated[ 305 dict | str | None, 306 Field( 307 description="The configuration for the source connector.", 308 default=None, 309 ), 310 ], 311 config_secret_name: Annotated[ 312 str | None, 313 Field( 314 description="The name of the secret containing the configuration.", 315 default=None, 316 ), 317 ], 318 unique: Annotated[ 319 bool, 320 Field( 321 description="Whether to require a unique name.", 322 default=True, 323 ), 324 ], 325) -> str: 326 """Deploy a source connector to Airbyte Cloud.""" 327 source = get_source( 328 source_connector_name, 329 no_executor=True, 330 ) 331 config_dict = resolve_connector_config( 332 config=config, 333 config_secret_name=config_secret_name, 334 config_spec_jsonschema=source.config_spec, 335 ) 336 source.set_config(config_dict, validate=True) 337 338 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 339 deployed_source = workspace.deploy_source( 340 name=source_name, 341 source=source, 342 unique=unique, 343 ) 344 345 register_guid_created_in_session(deployed_source.connector_id) 346 return ( 347 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 348 f" and URL: {deployed_source.connector_url}" 349 ) 350 351 352@mcp_tool( 353 open_world=True, 354 extra_help_text=CLOUD_AUTH_TIP_TEXT, 355) 356def deploy_destination_to_cloud( 357 ctx: Context, 358 destination_name: Annotated[ 359 str, 360 Field(description="The name to use when deploying the destination."), 361 ], 362 destination_connector_name: Annotated[ 363 str, 364 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 365 ], 366 *, 367 workspace_id: Annotated[ 368 str | None, 369 Field( 370 description=WORKSPACE_ID_TIP_TEXT, 371 default=None, 372 ), 373 ], 374 config: Annotated[ 375 dict | str | None, 376 Field( 377 description="The configuration for the destination connector.", 378 default=None, 379 ), 380 ], 381 config_secret_name: Annotated[ 382 str | None, 383 Field( 384 description="The name of the secret containing the configuration.", 385 default=None, 386 ), 387 ], 388 unique: Annotated[ 389 bool, 390 Field( 391 description="Whether to require a unique name.", 392 default=True, 393 ), 394 ], 395) -> str: 396 """Deploy a destination connector to Airbyte Cloud.""" 397 destination = get_destination( 398 destination_connector_name, 399 no_executor=True, 400 ) 401 config_dict = resolve_connector_config( 402 config=config, 403 config_secret_name=config_secret_name, 404 config_spec_jsonschema=destination.config_spec, 405 ) 406 destination.set_config(config_dict, validate=True) 407 408 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 409 deployed_destination = workspace.deploy_destination( 410 name=destination_name, 411 destination=destination, 412 unique=unique, 413 ) 414 415 register_guid_created_in_session(deployed_destination.connector_id) 416 return ( 417 f"Successfully deployed destination '{destination_name}' " 418 f"with ID: {deployed_destination.connector_id}" 419 ) 420 421 422@mcp_tool( 423 open_world=True, 424 extra_help_text=CLOUD_AUTH_TIP_TEXT, 425) 426def create_connection_on_cloud( 427 ctx: Context, 428 connection_name: Annotated[ 429 str, 430 Field(description="The name of the connection."), 431 ], 432 source_id: Annotated[ 433 str, 434 Field(description="The ID of the deployed source."), 435 ], 436 destination_id: Annotated[ 437 str, 438 Field(description="The ID of the deployed destination."), 439 ], 440 selected_streams: Annotated[ 441 str | list[str], 442 Field( 443 description=( 444 "The selected stream names to sync within the connection. " 445 "Must be an explicit stream name or list of streams. " 446 "Cannot be empty or '*'." 447 ) 448 ), 449 ], 450 *, 451 workspace_id: Annotated[ 452 str | None, 453 Field( 454 description=WORKSPACE_ID_TIP_TEXT, 455 default=None, 456 ), 457 ], 458 table_prefix: Annotated[ 459 str | None, 460 Field( 461 description="Optional table prefix to use when syncing to the destination.", 462 default=None, 463 ), 464 ], 465) -> str: 466 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 467 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 468 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 469 deployed_connection = workspace.deploy_connection( 470 connection_name=connection_name, 471 source=source_id, 472 destination=destination_id, 473 selected_streams=resolved_streams_list, 474 table_prefix=table_prefix, 475 ) 476 477 register_guid_created_in_session(deployed_connection.connection_id) 478 return ( 479 f"Successfully created connection '{connection_name}' " 480 f"with ID '{deployed_connection.connection_id}' and " 481 f"URL: {deployed_connection.connection_url}" 482 ) 483 484 485@mcp_tool( 486 open_world=True, 487 extra_help_text=CLOUD_AUTH_TIP_TEXT, 488) 489def run_cloud_sync( 490 ctx: Context, 491 connection_id: Annotated[ 492 str, 493 Field(description="The ID of the Airbyte Cloud connection."), 494 ], 495 *, 496 workspace_id: Annotated[ 497 str | None, 498 Field( 499 description=WORKSPACE_ID_TIP_TEXT, 500 default=None, 501 ), 502 ], 503 wait: Annotated[ 504 bool, 505 Field( 506 description=( 507 "Whether to wait for the sync to complete. Since a sync can take between several " 508 "minutes and several hours, this option is not recommended for most " 509 "scenarios." 510 ), 511 default=False, 512 ), 513 ], 514 wait_timeout: Annotated[ 515 int, 516 Field( 517 description="Maximum time to wait for sync completion (seconds).", 518 default=300, 519 ), 520 ], 521) -> str: 522 """Run a sync job on Airbyte Cloud.""" 523 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 524 connection = workspace.get_connection(connection_id=connection_id) 525 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 526 527 if wait: 528 status = sync_result.get_job_status() 529 return ( 530 f"Sync completed with status: {status}. " 531 f"Job ID is '{sync_result.job_id}' and " 532 f"job URL is: {sync_result.job_url}" 533 ) 534 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 535 536 537@mcp_tool( 538 read_only=True, 539 idempotent=True, 540 open_world=True, 541 extra_help_text=CLOUD_AUTH_TIP_TEXT, 542) 543def check_airbyte_cloud_workspace( 544 ctx: Context, 545 *, 546 workspace_id: Annotated[ 547 str | None, 548 Field( 549 description=WORKSPACE_ID_TIP_TEXT, 550 default=None, 551 ), 552 ], 553) -> CloudWorkspaceResult: 554 """Check if we have a valid Airbyte Cloud connection and return workspace info. 555 556 Returns workspace details including workspace ID, name, organization info, and billing status. 557 """ 558 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 559 560 # Get workspace details from the public API using workspace's credentials 561 workspace_response = api_util.get_workspace( 562 workspace_id=workspace.workspace_id, 563 api_root=workspace.api_root, 564 client_id=workspace.client_id, 565 client_secret=workspace.client_secret, 566 bearer_token=workspace.bearer_token, 567 ) 568 569 # Try to get organization info (including billing), but fail gracefully if we don't have 570 # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the 571 # organization, which may not be available with workspace-scoped credentials. 572 organization = workspace.get_organization(raise_on_error=False) 573 574 return CloudWorkspaceResult( 575 workspace_id=workspace_response.workspace_id, 576 workspace_name=workspace_response.name, 577 workspace_url=workspace.workspace_url, 578 organization_id=( 579 organization.organization_id 580 if organization 581 else "[unavailable - requires ORGANIZATION_READER permission]" 582 ), 583 organization_name=organization.organization_name if organization else None, 584 payment_status=organization.payment_status if organization else None, 585 subscription_status=organization.subscription_status if organization else None, 586 is_account_locked=organization.is_account_locked if organization else False, 587 ) 588 589 590@mcp_tool( 591 open_world=True, 592 extra_help_text=CLOUD_AUTH_TIP_TEXT, 593) 594def deploy_noop_destination_to_cloud( 595 ctx: Context, 596 name: str = "No-op Destination", 597 *, 598 workspace_id: Annotated[ 599 str | None, 600 Field( 601 description=WORKSPACE_ID_TIP_TEXT, 602 default=None, 603 ), 604 ], 605 unique: bool = True, 606) -> str: 607 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 608 destination = get_noop_destination() 609 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 610 deployed_destination = workspace.deploy_destination( 611 name=name, 612 destination=destination, 613 unique=unique, 614 ) 615 register_guid_created_in_session(deployed_destination.connector_id) 616 return ( 617 f"Successfully deployed No-op Destination " 618 f"with ID '{deployed_destination.connector_id}' and " 619 f"URL: {deployed_destination.connector_url}" 620 ) 621 622 623@mcp_tool( 624 read_only=True, 625 idempotent=True, 626 open_world=True, 627 extra_help_text=CLOUD_AUTH_TIP_TEXT, 628) 629def get_cloud_sync_status( 630 ctx: Context, 631 connection_id: Annotated[ 632 str, 633 Field( 634 description="The ID of the Airbyte Cloud connection.", 635 ), 636 ], 637 job_id: Annotated[ 638 int | None, 639 Field( 640 description="Optional job ID. If not provided, the latest job will be used.", 641 default=None, 642 ), 643 ], 644 *, 645 workspace_id: Annotated[ 646 str | None, 647 Field( 648 description=WORKSPACE_ID_TIP_TEXT, 649 default=None, 650 ), 651 ], 652 include_attempts: Annotated[ 653 bool, 654 Field( 655 description="Whether to include detailed attempts information.", 656 default=False, 657 ), 658 ], 659) -> dict[str, Any]: 660 """Get the status of a sync job from the Airbyte Cloud.""" 661 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 662 connection = workspace.get_connection(connection_id=connection_id) 663 664 # If a job ID is provided, get the job by ID. 665 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 666 667 if not sync_result: 668 return {"status": None, "job_id": None, "attempts": []} 669 670 result = { 671 "status": sync_result.get_job_status(), 672 "job_id": sync_result.job_id, 673 "bytes_synced": sync_result.bytes_synced, 674 "records_synced": sync_result.records_synced, 675 "start_time": sync_result.start_time.isoformat(), 676 "job_url": sync_result.job_url, 677 "attempts": [], 678 } 679 680 if include_attempts: 681 attempts = sync_result.get_attempts() 682 result["attempts"] = [ 683 { 684 "attempt_number": attempt.attempt_number, 685 "attempt_id": attempt.attempt_id, 686 "status": attempt.status, 687 "bytes_synced": attempt.bytes_synced, 688 "records_synced": attempt.records_synced, 689 "created_at": attempt.created_at.isoformat(), 690 } 691 for attempt in attempts 692 ] 693 694 return result 695 696 697@mcp_tool( 698 read_only=True, 699 idempotent=True, 700 open_world=True, 701 extra_help_text=CLOUD_AUTH_TIP_TEXT, 702) 703def list_cloud_sync_jobs( 704 ctx: Context, 705 connection_id: Annotated[ 706 str, 707 Field(description="The ID of the Airbyte Cloud connection."), 708 ], 709 *, 710 workspace_id: Annotated[ 711 str | None, 712 Field( 713 description=WORKSPACE_ID_TIP_TEXT, 714 default=None, 715 ), 716 ], 717 max_jobs: Annotated[ 718 int, 719 Field( 720 description=( 721 "Maximum number of jobs to return. " 722 "Defaults to 20 if not specified. " 723 "Maximum allowed value is 500." 724 ), 725 default=20, 726 ), 727 ], 728 from_tail: Annotated[ 729 bool | None, 730 Field( 731 description=( 732 "When True, jobs are ordered newest-first (createdAt DESC). " 733 "When False, jobs are ordered oldest-first (createdAt ASC). " 734 "Defaults to True if `jobs_offset` is not specified. " 735 "Cannot combine `from_tail=True` with `jobs_offset`." 736 ), 737 default=None, 738 ), 739 ], 740 jobs_offset: Annotated[ 741 int | None, 742 Field( 743 description=( 744 "Number of jobs to skip from the beginning. " 745 "Cannot be combined with `from_tail=True`." 746 ), 747 default=None, 748 ), 749 ], 750 job_type: Annotated[ 751 JobTypeEnum | None, 752 Field( 753 description=( 754 "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. " 755 "If not specified, defaults to sync and reset jobs only (API default). " 756 "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs." 757 ), 758 default=None, 759 ), 760 ], 761) -> SyncJobListResult: 762 """List sync jobs for a connection with pagination support. 763 764 This tool allows you to retrieve a list of sync jobs for a connection, 765 with control over ordering and pagination. By default, jobs are returned 766 newest-first (from_tail=True). 767 """ 768 # Validate that jobs_offset and from_tail are not both set 769 if jobs_offset is not None and from_tail is True: 770 raise PyAirbyteInputError( 771 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 772 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 773 ) 774 775 # Default to from_tail=True if neither is specified 776 if from_tail is None and jobs_offset is None: 777 from_tail = True 778 elif from_tail is None: 779 from_tail = False 780 781 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 782 connection = workspace.get_connection(connection_id=connection_id) 783 784 # Cap at 500 to avoid overloading agent context 785 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 786 787 sync_results = connection.get_previous_sync_logs( 788 limit=effective_limit, 789 offset=jobs_offset, 790 from_tail=from_tail, 791 job_type=job_type, 792 ) 793 794 jobs = [ 795 SyncJobResult( 796 job_id=sync_result.job_id, 797 status=str(sync_result.get_job_status()), 798 bytes_synced=sync_result.bytes_synced, 799 records_synced=sync_result.records_synced, 800 start_time=sync_result.start_time.isoformat(), 801 job_url=sync_result.job_url, 802 ) 803 for sync_result in sync_results 804 ] 805 806 return SyncJobListResult( 807 jobs=jobs, 808 jobs_count=len(jobs), 809 jobs_offset=jobs_offset or 0, 810 from_tail=from_tail, 811 ) 812 813 814@mcp_tool( 815 read_only=True, 816 idempotent=True, 817 open_world=True, 818 extra_help_text=CLOUD_AUTH_TIP_TEXT, 819) 820def list_deployed_cloud_source_connectors( 821 ctx: Context, 822 *, 823 workspace_id: Annotated[ 824 str | None, 825 Field( 826 description=WORKSPACE_ID_TIP_TEXT, 827 default=None, 828 ), 829 ], 830 name_contains: Annotated[ 831 str | None, 832 Field( 833 description="Optional case-insensitive substring to filter sources by name", 834 default=None, 835 ), 836 ], 837 max_items_limit: Annotated[ 838 int | None, 839 Field( 840 description="Optional maximum number of items to return (default: no limit)", 841 default=None, 842 ), 843 ], 844) -> list[CloudSourceResult]: 845 """List all deployed source connectors in the Airbyte Cloud workspace.""" 846 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 847 sources = workspace.list_sources() 848 849 # Filter by name if requested 850 if name_contains: 851 needle = name_contains.lower() 852 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 853 854 # Apply limit if requested 855 if max_items_limit is not None: 856 sources = sources[:max_items_limit] 857 858 # Note: name and url are guaranteed non-null from list API responses 859 return [ 860 CloudSourceResult( 861 id=source.source_id, 862 name=cast(str, source.name), 863 url=cast(str, source.connector_url), 864 ) 865 for source in sources 866 ] 867 868 869@mcp_tool( 870 read_only=True, 871 idempotent=True, 872 open_world=True, 873 extra_help_text=CLOUD_AUTH_TIP_TEXT, 874) 875def list_deployed_cloud_destination_connectors( 876 ctx: Context, 877 *, 878 workspace_id: Annotated[ 879 str | None, 880 Field( 881 description=WORKSPACE_ID_TIP_TEXT, 882 default=None, 883 ), 884 ], 885 name_contains: Annotated[ 886 str | None, 887 Field( 888 description="Optional case-insensitive substring to filter destinations by name", 889 default=None, 890 ), 891 ], 892 max_items_limit: Annotated[ 893 int | None, 894 Field( 895 description="Optional maximum number of items to return (default: no limit)", 896 default=None, 897 ), 898 ], 899) -> list[CloudDestinationResult]: 900 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 901 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 902 destinations = workspace.list_destinations() 903 904 # Filter by name if requested 905 if name_contains: 906 needle = name_contains.lower() 907 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 908 909 # Apply limit if requested 910 if max_items_limit is not None: 911 destinations = destinations[:max_items_limit] 912 913 # Note: name and url are guaranteed non-null from list API responses 914 return [ 915 CloudDestinationResult( 916 id=destination.destination_id, 917 name=cast(str, destination.name), 918 url=cast(str, destination.connector_url), 919 ) 920 for destination in destinations 921 ] 922 923 924@mcp_tool( 925 read_only=True, 926 idempotent=True, 927 open_world=True, 928 extra_help_text=CLOUD_AUTH_TIP_TEXT, 929) 930def describe_cloud_source( 931 ctx: Context, 932 source_id: Annotated[ 933 str, 934 Field(description="The ID of the source to describe."), 935 ], 936 *, 937 workspace_id: Annotated[ 938 str | None, 939 Field( 940 description=WORKSPACE_ID_TIP_TEXT, 941 default=None, 942 ), 943 ], 944) -> CloudSourceDetails: 945 """Get detailed information about a specific deployed source connector.""" 946 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 947 source = workspace.get_source(source_id=source_id) 948 949 # Access name property to ensure _connector_info is populated 950 source_name = cast(str, source.name) 951 952 return CloudSourceDetails( 953 source_id=source.source_id, 954 source_name=source_name, 955 source_url=source.connector_url, 956 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 957 ) 958 959 960@mcp_tool( 961 read_only=True, 962 idempotent=True, 963 open_world=True, 964 extra_help_text=CLOUD_AUTH_TIP_TEXT, 965) 966def describe_cloud_destination( 967 ctx: Context, 968 destination_id: Annotated[ 969 str, 970 Field(description="The ID of the destination to describe."), 971 ], 972 *, 973 workspace_id: Annotated[ 974 str | None, 975 Field( 976 description=WORKSPACE_ID_TIP_TEXT, 977 default=None, 978 ), 979 ], 980) -> CloudDestinationDetails: 981 """Get detailed information about a specific deployed destination connector.""" 982 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 983 destination = workspace.get_destination(destination_id=destination_id) 984 985 # Access name property to ensure _connector_info is populated 986 destination_name = cast(str, destination.name) 987 988 return CloudDestinationDetails( 989 destination_id=destination.destination_id, 990 destination_name=destination_name, 991 destination_url=destination.connector_url, 992 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 993 ) 994 995 996@mcp_tool( 997 read_only=True, 998 idempotent=True, 999 open_world=True, 1000 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1001) 1002def describe_cloud_connection( 1003 ctx: Context, 1004 connection_id: Annotated[ 1005 str, 1006 Field(description="The ID of the connection to describe."), 1007 ], 1008 *, 1009 workspace_id: Annotated[ 1010 str | None, 1011 Field( 1012 description=WORKSPACE_ID_TIP_TEXT, 1013 default=None, 1014 ), 1015 ], 1016) -> CloudConnectionDetails: 1017 """Get detailed information about a specific deployed connection.""" 1018 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1019 connection = workspace.get_connection(connection_id=connection_id) 1020 1021 return CloudConnectionDetails( 1022 connection_id=connection.connection_id, 1023 connection_name=cast(str, connection.name), 1024 connection_url=cast(str, connection.connection_url), 1025 source_id=connection.source_id, 1026 source_name=cast(str, connection.source.name), 1027 destination_id=connection.destination_id, 1028 destination_name=cast(str, connection.destination.name), 1029 selected_streams=connection.stream_names, 1030 table_prefix=connection.table_prefix, 1031 ) 1032 1033 1034@mcp_tool( 1035 read_only=True, 1036 idempotent=True, 1037 open_world=True, 1038 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1039) 1040def get_cloud_sync_logs( 1041 ctx: Context, 1042 connection_id: Annotated[ 1043 str, 1044 Field(description="The ID of the Airbyte Cloud connection."), 1045 ], 1046 job_id: Annotated[ 1047 int | None, 1048 Field(description="Optional job ID. If not provided, the latest job will be used."), 1049 ] = None, 1050 attempt_number: Annotated[ 1051 int | None, 1052 Field( 1053 description="Optional attempt number. If not provided, the latest attempt will be used." 1054 ), 1055 ] = None, 1056 *, 1057 workspace_id: Annotated[ 1058 str | None, 1059 Field( 1060 description=WORKSPACE_ID_TIP_TEXT, 1061 default=None, 1062 ), 1063 ], 1064 max_lines: Annotated[ 1065 int, 1066 Field( 1067 description=( 1068 "Maximum number of lines to return. " 1069 "Defaults to 4000 if not specified. " 1070 "If '0' is provided, no limit is applied." 1071 ), 1072 default=4000, 1073 ), 1074 ], 1075 from_tail: Annotated[ 1076 bool | None, 1077 Field( 1078 description=( 1079 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1080 "Defaults to True if `line_offset` is not specified. " 1081 "Cannot combine `from_tail=True` with `line_offset`." 1082 ), 1083 default=None, 1084 ), 1085 ], 1086 line_offset: Annotated[ 1087 int | None, 1088 Field( 1089 description=( 1090 "Number of lines to skip from the beginning of the logs. " 1091 "Cannot be combined with `from_tail=True`." 1092 ), 1093 default=None, 1094 ), 1095 ], 1096) -> LogReadResult: 1097 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1098 # Validate that line_offset and from_tail are not both set 1099 if line_offset is not None and from_tail: 1100 raise PyAirbyteInputError( 1101 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1102 context={"line_offset": line_offset, "from_tail": from_tail}, 1103 ) 1104 1105 if from_tail is None and line_offset is None: 1106 from_tail = True 1107 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1108 connection = workspace.get_connection(connection_id=connection_id) 1109 1110 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1111 1112 if not sync_result: 1113 raise AirbyteMissingResourceError( 1114 resource_type="sync job", 1115 resource_name_or_id=connection_id, 1116 ) 1117 1118 attempts = sync_result.get_attempts() 1119 1120 if not attempts: 1121 raise AirbyteMissingResourceError( 1122 resource_type="sync attempt", 1123 resource_name_or_id=str(sync_result.job_id), 1124 ) 1125 1126 if attempt_number is not None: 1127 target_attempt = None 1128 for attempt in attempts: 1129 if attempt.attempt_number == attempt_number: 1130 target_attempt = attempt 1131 break 1132 1133 if target_attempt is None: 1134 raise AirbyteMissingResourceError( 1135 resource_type="sync attempt", 1136 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1137 ) 1138 else: 1139 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1140 1141 logs = target_attempt.get_full_log_text() 1142 1143 if not logs: 1144 # Return empty result with zero lines 1145 return LogReadResult( 1146 log_text=( 1147 f"[No logs available for job '{sync_result.job_id}', " 1148 f"attempt {target_attempt.attempt_number}.]" 1149 ), 1150 log_text_start_line=1, 1151 log_text_line_count=0, 1152 total_log_lines_available=0, 1153 job_id=sync_result.job_id, 1154 attempt_number=target_attempt.attempt_number, 1155 ) 1156 1157 # Apply line limiting 1158 log_lines = logs.splitlines() 1159 total_lines = len(log_lines) 1160 1161 # Determine effective max_lines (0 means no limit) 1162 effective_max = total_lines if max_lines == 0 else max_lines 1163 1164 # Calculate start_index and slice based on from_tail or line_offset 1165 if from_tail: 1166 start_index = max(0, total_lines - effective_max) 1167 selected_lines = log_lines[start_index:][:effective_max] 1168 else: 1169 start_index = line_offset or 0 1170 selected_lines = log_lines[start_index : start_index + effective_max] 1171 1172 return LogReadResult( 1173 log_text="\n".join(selected_lines), 1174 log_text_start_line=start_index + 1, # Convert to 1-based index 1175 log_text_line_count=len(selected_lines), 1176 total_log_lines_available=total_lines, 1177 job_id=sync_result.job_id, 1178 attempt_number=target_attempt.attempt_number, 1179 ) 1180 1181 1182@mcp_tool( 1183 read_only=True, 1184 idempotent=True, 1185 open_world=True, 1186 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1187) 1188def list_deployed_cloud_connections( 1189 ctx: Context, 1190 *, 1191 workspace_id: Annotated[ 1192 str | None, 1193 Field( 1194 description=WORKSPACE_ID_TIP_TEXT, 1195 default=None, 1196 ), 1197 ], 1198 name_contains: Annotated[ 1199 str | None, 1200 Field( 1201 description="Optional case-insensitive substring to filter connections by name", 1202 default=None, 1203 ), 1204 ], 1205 max_items_limit: Annotated[ 1206 int | None, 1207 Field( 1208 description="Optional maximum number of items to return (default: no limit)", 1209 default=None, 1210 ), 1211 ], 1212 with_connection_status: Annotated[ 1213 bool | None, 1214 Field( 1215 description="If True, include status info for each connection's most recent sync job", 1216 default=False, 1217 ), 1218 ], 1219 failing_connections_only: Annotated[ 1220 bool | None, 1221 Field( 1222 description="If True, only return connections with failed/cancelled last sync", 1223 default=False, 1224 ), 1225 ], 1226) -> list[CloudConnectionResult]: 1227 """List all deployed connections in the Airbyte Cloud workspace. 1228 1229 When with_connection_status is True, each connection result will include 1230 information about the most recent sync job status, skipping over any 1231 currently in-progress syncs to find the last completed job. 1232 1233 When failing_connections_only is True, only connections where the most 1234 recent completed sync job failed or was cancelled will be returned. 1235 This implicitly enables with_connection_status. 1236 """ 1237 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1238 connections = workspace.list_connections() 1239 1240 # Filter by name if requested 1241 if name_contains: 1242 needle = name_contains.lower() 1243 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1244 1245 # If failing_connections_only is True, implicitly enable with_connection_status 1246 if failing_connections_only: 1247 with_connection_status = True 1248 1249 results: list[CloudConnectionResult] = [] 1250 1251 for connection in connections: 1252 last_job_status: str | None = None 1253 last_job_id: int | None = None 1254 last_job_time: str | None = None 1255 currently_running_job_id: int | None = None 1256 currently_running_job_start_time: str | None = None 1257 1258 if with_connection_status: 1259 sync_logs = connection.get_previous_sync_logs(limit=5) 1260 last_completed_job_status = None # Keep enum for comparison 1261 1262 for sync_result in sync_logs: 1263 job_status = sync_result.get_job_status() 1264 1265 if not sync_result.is_job_complete(): 1266 currently_running_job_id = sync_result.job_id 1267 currently_running_job_start_time = sync_result.start_time.isoformat() 1268 continue 1269 1270 last_completed_job_status = job_status 1271 last_job_status = str(job_status.value) if job_status else None 1272 last_job_id = sync_result.job_id 1273 last_job_time = sync_result.start_time.isoformat() 1274 break 1275 1276 if failing_connections_only and ( 1277 last_completed_job_status is None 1278 or last_completed_job_status not in FAILED_STATUSES 1279 ): 1280 continue 1281 1282 results.append( 1283 CloudConnectionResult( 1284 id=connection.connection_id, 1285 name=cast(str, connection.name), 1286 url=cast(str, connection.connection_url), 1287 source_id=connection.source_id, 1288 destination_id=connection.destination_id, 1289 last_job_status=last_job_status, 1290 last_job_id=last_job_id, 1291 last_job_time=last_job_time, 1292 currently_running_job_id=currently_running_job_id, 1293 currently_running_job_start_time=currently_running_job_start_time, 1294 ) 1295 ) 1296 1297 if max_items_limit is not None and len(results) >= max_items_limit: 1298 break 1299 1300 return results 1301 1302 1303def _resolve_organization( 1304 organization_id: str | None, 1305 organization_name: str | None, 1306 *, 1307 api_root: str, 1308 client_id: SecretString | None, 1309 client_secret: SecretString | None, 1310 bearer_token: SecretString | None = None, 1311) -> CloudOrganization: 1312 """Resolve organization from either ID or exact name match. 1313 1314 Args: 1315 organization_id: The organization ID (if provided directly) 1316 organization_name: The organization name (exact match required) 1317 api_root: The API root URL 1318 client_id: OAuth client ID (optional if bearer_token is provided) 1319 client_secret: OAuth client secret (optional if bearer_token is provided) 1320 bearer_token: Bearer token for authentication (optional if client credentials provided) 1321 1322 Returns: 1323 A CloudOrganization object with credentials for lazy loading of billing info. 1324 1325 Raises: 1326 PyAirbyteInputError: If neither or both parameters are provided, 1327 or if no organization matches the exact name 1328 AirbyteMissingResourceError: If the organization is not found 1329 """ 1330 if organization_id and organization_name: 1331 raise PyAirbyteInputError( 1332 message="Provide either 'organization_id' or 'organization_name', not both." 1333 ) 1334 if not organization_id and not organization_name: 1335 raise PyAirbyteInputError( 1336 message="Either 'organization_id' or 'organization_name' must be provided." 1337 ) 1338 1339 # Get all organizations for the user 1340 orgs = api_util.list_organizations_for_user( 1341 api_root=api_root, 1342 client_id=client_id, 1343 client_secret=client_secret, 1344 bearer_token=bearer_token, 1345 ) 1346 1347 org_response: api_util.models.OrganizationResponse | None = None 1348 1349 if organization_id: 1350 # Find by ID 1351 matching_orgs = [org for org in orgs if org.organization_id == organization_id] 1352 if not matching_orgs: 1353 raise AirbyteMissingResourceError( 1354 resource_type="organization", 1355 context={ 1356 "organization_id": organization_id, 1357 "message": f"No organization found with ID '{organization_id}' " 1358 "for the current user.", 1359 }, 1360 ) 1361 org_response = matching_orgs[0] 1362 else: 1363 # Find by exact name match (case-sensitive) 1364 matching_orgs = [org for org in orgs if org.organization_name == organization_name] 1365 1366 if not matching_orgs: 1367 raise AirbyteMissingResourceError( 1368 resource_type="organization", 1369 context={ 1370 "organization_name": organization_name, 1371 "message": f"No organization found with exact name '{organization_name}' " 1372 "for the current user.", 1373 }, 1374 ) 1375 1376 if len(matching_orgs) > 1: 1377 raise PyAirbyteInputError( 1378 message=f"Multiple organizations found with name '{organization_name}'. " 1379 "Please use 'organization_id' instead to specify the exact organization." 1380 ) 1381 1382 org_response = matching_orgs[0] 1383 1384 # Return a CloudOrganization with credentials for lazy loading of billing info 1385 return CloudOrganization( 1386 organization_id=org_response.organization_id, 1387 organization_name=org_response.organization_name, 1388 email=org_response.email, 1389 api_root=api_root, 1390 client_id=client_id, 1391 client_secret=client_secret, 1392 bearer_token=bearer_token, 1393 ) 1394 1395 1396def _resolve_organization_id( 1397 organization_id: str | None, 1398 organization_name: str | None, 1399 *, 1400 api_root: str, 1401 client_id: SecretString | None, 1402 client_secret: SecretString | None, 1403 bearer_token: SecretString | None = None, 1404) -> str: 1405 """Resolve organization ID from either ID or exact name match. 1406 1407 This is a convenience wrapper around _resolve_organization that returns just the ID. 1408 """ 1409 org = _resolve_organization( 1410 organization_id=organization_id, 1411 organization_name=organization_name, 1412 api_root=api_root, 1413 client_id=client_id, 1414 client_secret=client_secret, 1415 bearer_token=bearer_token, 1416 ) 1417 return org.organization_id 1418 1419 1420@mcp_tool( 1421 read_only=True, 1422 idempotent=True, 1423 open_world=True, 1424 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1425) 1426def list_cloud_workspaces( 1427 ctx: Context, 1428 *, 1429 organization_id: Annotated[ 1430 str | None, 1431 Field( 1432 description="Organization ID. Required if organization_name is not provided.", 1433 default=None, 1434 ), 1435 ], 1436 organization_name: Annotated[ 1437 str | None, 1438 Field( 1439 description=( 1440 "Organization name (exact match). " "Required if organization_id is not provided." 1441 ), 1442 default=None, 1443 ), 1444 ], 1445 name_contains: Annotated[ 1446 str | None, 1447 Field( 1448 description="Optional substring to filter workspaces by name (server-side filtering)", 1449 default=None, 1450 ), 1451 ], 1452 max_items_limit: Annotated[ 1453 int | None, 1454 Field( 1455 description="Optional maximum number of items to return (default: no limit)", 1456 default=None, 1457 ), 1458 ], 1459) -> list[CloudWorkspaceResult]: 1460 """List all workspaces in a specific organization. 1461 1462 Requires either organization_id OR organization_name (exact match) to be provided. 1463 This tool will NOT list workspaces across all organizations - you must specify 1464 which organization to list workspaces from. 1465 """ 1466 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1467 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1468 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1469 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1470 1471 resolved_org_id = _resolve_organization_id( 1472 organization_id=organization_id, 1473 organization_name=organization_name, 1474 api_root=api_url, 1475 client_id=SecretString(client_id) if client_id else None, 1476 client_secret=SecretString(client_secret) if client_secret else None, 1477 bearer_token=SecretString(bearer_token) if bearer_token else None, 1478 ) 1479 1480 workspaces = api_util.list_workspaces_in_organization( 1481 organization_id=resolved_org_id, 1482 api_root=api_url, 1483 client_id=SecretString(client_id) if client_id else None, 1484 client_secret=SecretString(client_secret) if client_secret else None, 1485 bearer_token=SecretString(bearer_token) if bearer_token else None, 1486 name_contains=name_contains, 1487 max_items_limit=max_items_limit, 1488 ) 1489 1490 return [ 1491 CloudWorkspaceResult( 1492 workspace_id=ws.get("workspaceId", ""), 1493 workspace_name=ws.get("name", ""), 1494 organization_id=ws.get("organizationId", ""), 1495 ) 1496 for ws in workspaces 1497 ] 1498 1499 1500@mcp_tool( 1501 read_only=True, 1502 idempotent=True, 1503 open_world=True, 1504 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1505) 1506def describe_cloud_organization( 1507 ctx: Context, 1508 *, 1509 organization_id: Annotated[ 1510 str | None, 1511 Field( 1512 description="Organization ID. Required if organization_name is not provided.", 1513 default=None, 1514 ), 1515 ], 1516 organization_name: Annotated[ 1517 str | None, 1518 Field( 1519 description=( 1520 "Organization name (exact match). " "Required if organization_id is not provided." 1521 ), 1522 default=None, 1523 ), 1524 ], 1525) -> CloudOrganizationResult: 1526 """Get details about a specific organization including billing status. 1527 1528 Requires either organization_id OR organization_name (exact match) to be provided. 1529 This tool is useful for looking up an organization's ID from its name, or vice versa. 1530 """ 1531 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1532 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1533 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1534 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1535 1536 org = _resolve_organization( 1537 organization_id=organization_id, 1538 organization_name=organization_name, 1539 api_root=api_url, 1540 client_id=SecretString(client_id) if client_id else None, 1541 client_secret=SecretString(client_secret) if client_secret else None, 1542 bearer_token=SecretString(bearer_token) if bearer_token else None, 1543 ) 1544 1545 # CloudOrganization has lazy loading of billing properties 1546 return CloudOrganizationResult( 1547 id=org.organization_id, 1548 name=org.organization_name, 1549 email=org.email, 1550 payment_status=org.payment_status, 1551 subscription_status=org.subscription_status, 1552 is_account_locked=org.is_account_locked, 1553 ) 1554 1555 1556def _get_custom_source_definition_description( 1557 custom_source: CustomCloudSourceDefinition, 1558) -> str: 1559 return "\n".join( 1560 [ 1561 f" - Custom Source Name: {custom_source.name}", 1562 f" - Definition ID: {custom_source.definition_id}", 1563 f" - Definition Version: {custom_source.version}", 1564 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1565 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1566 ] 1567 ) 1568 1569 1570@mcp_tool( 1571 open_world=True, 1572 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1573) 1574def publish_custom_source_definition( 1575 ctx: Context, 1576 name: Annotated[ 1577 str, 1578 Field(description="The name for the custom connector definition."), 1579 ], 1580 *, 1581 workspace_id: Annotated[ 1582 str | None, 1583 Field( 1584 description=WORKSPACE_ID_TIP_TEXT, 1585 default=None, 1586 ), 1587 ], 1588 manifest_yaml: Annotated[ 1589 str | Path | None, 1590 Field( 1591 description=( 1592 "The Low-code CDK manifest as a YAML string or file path. " 1593 "Required for YAML connectors." 1594 ), 1595 default=None, 1596 ), 1597 ] = None, 1598 unique: Annotated[ 1599 bool, 1600 Field( 1601 description="Whether to require a unique name.", 1602 default=True, 1603 ), 1604 ] = True, 1605 pre_validate: Annotated[ 1606 bool, 1607 Field( 1608 description="Whether to validate the manifest client-side before publishing.", 1609 default=True, 1610 ), 1611 ] = True, 1612 testing_values: Annotated[ 1613 dict | str | None, 1614 Field( 1615 description=( 1616 "Optional testing configuration values for the Builder UI. " 1617 "Can be provided as a JSON object or JSON string. " 1618 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1619 "If provided, these values replace any existing testing values " 1620 "for the connector builder project, allowing immediate test read operations." 1621 ), 1622 default=None, 1623 ), 1624 ], 1625 testing_values_secret_name: Annotated[ 1626 str | None, 1627 Field( 1628 description=( 1629 "Optional name of a secret containing testing configuration values " 1630 "in JSON or YAML format. The secret will be resolved by the MCP " 1631 "server and merged into testing_values, with secret values taking " 1632 "precedence. This lets the agent reference secrets without sending " 1633 "raw values as tool arguments." 1634 ), 1635 default=None, 1636 ), 1637 ], 1638) -> str: 1639 """Publish a custom YAML source connector definition to Airbyte Cloud. 1640 1641 Note: Only YAML (declarative) connectors are currently supported. 1642 Docker-based custom sources are not yet available. 1643 """ 1644 processed_manifest = manifest_yaml 1645 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1646 processed_manifest = Path(manifest_yaml) 1647 1648 # Resolve testing values from inline config and/or secret 1649 testing_values_dict: dict[str, Any] | None = None 1650 if testing_values is not None or testing_values_secret_name is not None: 1651 testing_values_dict = ( 1652 resolve_connector_config( 1653 config=testing_values, 1654 config_secret_name=testing_values_secret_name, 1655 ) 1656 or None 1657 ) 1658 1659 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1660 custom_source = workspace.publish_custom_source_definition( 1661 name=name, 1662 manifest_yaml=processed_manifest, 1663 unique=unique, 1664 pre_validate=pre_validate, 1665 testing_values=testing_values_dict, 1666 ) 1667 register_guid_created_in_session(custom_source.definition_id) 1668 return ( 1669 "Successfully published custom YAML source definition:\n" 1670 + _get_custom_source_definition_description( 1671 custom_source=custom_source, 1672 ) 1673 + "\n" 1674 ) 1675 1676 1677@mcp_tool( 1678 read_only=True, 1679 idempotent=True, 1680 open_world=True, 1681) 1682def list_custom_source_definitions( 1683 ctx: Context, 1684 *, 1685 workspace_id: Annotated[ 1686 str | None, 1687 Field( 1688 description=WORKSPACE_ID_TIP_TEXT, 1689 default=None, 1690 ), 1691 ], 1692) -> list[dict[str, Any]]: 1693 """List custom YAML source definitions in the Airbyte Cloud workspace. 1694 1695 Note: Only YAML (declarative) connectors are currently supported. 1696 Docker-based custom sources are not yet available. 1697 """ 1698 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1699 definitions = workspace.list_custom_source_definitions( 1700 definition_type="yaml", 1701 ) 1702 1703 return [ 1704 { 1705 "definition_id": d.definition_id, 1706 "name": d.name, 1707 "version": d.version, 1708 "connector_builder_project_url": d.connector_builder_project_url, 1709 } 1710 for d in definitions 1711 ] 1712 1713 1714@mcp_tool( 1715 read_only=True, 1716 idempotent=True, 1717 open_world=True, 1718) 1719def get_custom_source_definition( 1720 ctx: Context, 1721 definition_id: Annotated[ 1722 str, 1723 Field(description="The ID of the custom source definition to retrieve."), 1724 ], 1725 *, 1726 workspace_id: Annotated[ 1727 str | None, 1728 Field( 1729 description=WORKSPACE_ID_TIP_TEXT, 1730 default=None, 1731 ), 1732 ], 1733 include_draft: Annotated[ 1734 bool, 1735 Field( 1736 description=( 1737 "Whether to include the Connector Builder draft manifest in the response. " 1738 "If True and a draft exists, the response will include 'has_draft' and " 1739 "'draft_manifest' fields. Defaults to False." 1740 ), 1741 default=False, 1742 ), 1743 ] = False, 1744) -> dict[str, Any]: 1745 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1746 1747 Returns the full definition details including the published manifest YAML content. 1748 Optionally includes the Connector Builder draft manifest (unpublished changes) 1749 when include_draft=True. 1750 1751 Note: Only YAML (declarative) connectors are currently supported. 1752 Docker-based custom sources are not yet available. 1753 """ 1754 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1755 definition = workspace.get_custom_source_definition( 1756 definition_id=definition_id, 1757 definition_type="yaml", 1758 ) 1759 1760 result: dict[str, Any] = { 1761 "definition_id": definition.definition_id, 1762 "name": definition.name, 1763 "version": definition.version, 1764 "connector_builder_project_id": definition.connector_builder_project_id, 1765 "connector_builder_project_url": definition.connector_builder_project_url, 1766 "manifest": definition.manifest, 1767 } 1768 1769 if include_draft: 1770 result["has_draft"] = definition.has_draft 1771 result["draft_manifest"] = definition.draft_manifest 1772 1773 return result 1774 1775 1776@mcp_tool( 1777 read_only=True, 1778 idempotent=True, 1779 open_world=True, 1780) 1781def get_connector_builder_draft_manifest( 1782 ctx: Context, 1783 definition_id: Annotated[ 1784 str, 1785 Field(description="The ID of the custom source definition to retrieve the draft for."), 1786 ], 1787 *, 1788 workspace_id: Annotated[ 1789 str | None, 1790 Field( 1791 description=WORKSPACE_ID_TIP_TEXT, 1792 default=None, 1793 ), 1794 ], 1795) -> dict[str, Any]: 1796 """Get the Connector Builder draft manifest for a custom source definition. 1797 1798 Returns the working draft manifest that has been saved in the Connector Builder UI 1799 but not yet published. This is useful for inspecting what a user is currently working 1800 on before they publish their changes. 1801 1802 If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. 1803 The published manifest is always included for comparison. 1804 """ 1805 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1806 definition = workspace.get_custom_source_definition( 1807 definition_id=definition_id, 1808 definition_type="yaml", 1809 ) 1810 1811 return { 1812 "definition_id": definition.definition_id, 1813 "name": definition.name, 1814 "connector_builder_project_id": definition.connector_builder_project_id, 1815 "connector_builder_project_url": definition.connector_builder_project_url, 1816 "has_draft": definition.has_draft, 1817 "draft_manifest": definition.draft_manifest, 1818 "published_manifest": definition.manifest, 1819 } 1820 1821 1822@mcp_tool( 1823 destructive=True, 1824 open_world=True, 1825) 1826def update_custom_source_definition( 1827 ctx: Context, 1828 definition_id: Annotated[ 1829 str, 1830 Field(description="The ID of the definition to update."), 1831 ], 1832 manifest_yaml: Annotated[ 1833 str | Path | None, 1834 Field( 1835 description=( 1836 "New manifest as YAML string or file path. " 1837 "Optional; omit to update only testing values." 1838 ), 1839 default=None, 1840 ), 1841 ] = None, 1842 *, 1843 workspace_id: Annotated[ 1844 str | None, 1845 Field( 1846 description=WORKSPACE_ID_TIP_TEXT, 1847 default=None, 1848 ), 1849 ], 1850 pre_validate: Annotated[ 1851 bool, 1852 Field( 1853 description="Whether to validate the manifest client-side before updating.", 1854 default=True, 1855 ), 1856 ] = True, 1857 testing_values: Annotated[ 1858 dict | str | None, 1859 Field( 1860 description=( 1861 "Optional testing configuration values for the Builder UI. " 1862 "Can be provided as a JSON object or JSON string. " 1863 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1864 "If provided, these values replace any existing testing values " 1865 "for the connector builder project. The entire testing values object " 1866 "is overwritten, so pass the full set of values you want to persist." 1867 ), 1868 default=None, 1869 ), 1870 ], 1871 testing_values_secret_name: Annotated[ 1872 str | None, 1873 Field( 1874 description=( 1875 "Optional name of a secret containing testing configuration values " 1876 "in JSON or YAML format. The secret will be resolved by the MCP " 1877 "server and merged into testing_values, with secret values taking " 1878 "precedence. This lets the agent reference secrets without sending " 1879 "raw values as tool arguments." 1880 ), 1881 default=None, 1882 ), 1883 ], 1884) -> str: 1885 """Update a custom YAML source definition in Airbyte Cloud. 1886 1887 Updates the manifest and/or testing values for an existing custom source definition. 1888 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1889 """ 1890 check_guid_created_in_session(definition_id) 1891 1892 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1893 1894 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1895 raise PyAirbyteInputError( 1896 message=( 1897 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1898 "must be provided to update a custom source definition." 1899 ), 1900 context={ 1901 "definition_id": definition_id, 1902 "workspace_id": workspace.workspace_id, 1903 }, 1904 ) 1905 1906 processed_manifest: str | Path | None = manifest_yaml 1907 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1908 processed_manifest = Path(manifest_yaml) 1909 1910 # Resolve testing values from inline config and/or secret 1911 testing_values_dict: dict[str, Any] | None = None 1912 if testing_values is not None or testing_values_secret_name is not None: 1913 testing_values_dict = ( 1914 resolve_connector_config( 1915 config=testing_values, 1916 config_secret_name=testing_values_secret_name, 1917 ) 1918 or None 1919 ) 1920 1921 definition = workspace.get_custom_source_definition( 1922 definition_id=definition_id, 1923 definition_type="yaml", 1924 ) 1925 custom_source: CustomCloudSourceDefinition = definition 1926 1927 if processed_manifest is not None: 1928 custom_source = definition.update_definition( 1929 manifest_yaml=processed_manifest, 1930 pre_validate=pre_validate, 1931 ) 1932 1933 if testing_values_dict is not None: 1934 custom_source.set_testing_values(testing_values_dict) 1935 1936 return ( 1937 "Successfully updated custom YAML source definition:\n" 1938 + _get_custom_source_definition_description( 1939 custom_source=custom_source, 1940 ) 1941 ) 1942 1943 1944@mcp_tool( 1945 destructive=True, 1946 open_world=True, 1947) 1948def permanently_delete_custom_source_definition( 1949 ctx: Context, 1950 definition_id: Annotated[ 1951 str, 1952 Field(description="The ID of the custom source definition to delete."), 1953 ], 1954 name: Annotated[ 1955 str, 1956 Field(description="The expected name of the custom source definition (for verification)."), 1957 ], 1958 *, 1959 workspace_id: Annotated[ 1960 str | None, 1961 Field( 1962 description=WORKSPACE_ID_TIP_TEXT, 1963 default=None, 1964 ), 1965 ], 1966) -> str: 1967 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1968 1969 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1970 (case insensitive). 1971 1972 If the connector does not meet this requirement, the deletion will be rejected with a 1973 helpful error message. Instruct the user to rename the connector appropriately to authorize 1974 the deletion. 1975 1976 The provided name must match the actual name of the definition for the operation to proceed. 1977 This is a safety measure to ensure you are deleting the correct resource. 1978 1979 Note: Only YAML (declarative) connectors are currently supported. 1980 Docker-based custom sources are not yet available. 1981 """ 1982 check_guid_created_in_session(definition_id) 1983 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1984 definition = workspace.get_custom_source_definition( 1985 definition_id=definition_id, 1986 definition_type="yaml", 1987 ) 1988 actual_name: str = definition.name 1989 1990 # Verify the name matches 1991 if actual_name != name: 1992 raise PyAirbyteInputError( 1993 message=( 1994 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1995 "The provided name must exactly match the definition's actual name. " 1996 "This is a safety measure to prevent accidental deletion." 1997 ), 1998 context={ 1999 "definition_id": definition_id, 2000 "expected_name": name, 2001 "actual_name": actual_name, 2002 }, 2003 ) 2004 2005 definition.permanently_delete( 2006 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 2007 ) 2008 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 2009 2010 2011@mcp_tool( 2012 destructive=True, 2013 open_world=True, 2014 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2015) 2016def permanently_delete_cloud_source( 2017 ctx: Context, 2018 source_id: Annotated[ 2019 str, 2020 Field(description="The ID of the deployed source to delete."), 2021 ], 2022 name: Annotated[ 2023 str, 2024 Field(description="The expected name of the source (for verification)."), 2025 ], 2026) -> str: 2027 """Permanently delete a deployed source connector from Airbyte Cloud. 2028 2029 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 2030 (case insensitive). 2031 2032 If the source does not meet this requirement, the deletion will be rejected with a 2033 helpful error message. Instruct the user to rename the source appropriately to authorize 2034 the deletion. 2035 2036 The provided name must match the actual name of the source for the operation to proceed. 2037 This is a safety measure to ensure you are deleting the correct resource. 2038 """ 2039 check_guid_created_in_session(source_id) 2040 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2041 source = workspace.get_source(source_id=source_id) 2042 actual_name: str = cast(str, source.name) 2043 2044 # Verify the name matches 2045 if actual_name != name: 2046 raise PyAirbyteInputError( 2047 message=( 2048 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2049 "The provided name must exactly match the source's actual name. " 2050 "This is a safety measure to prevent accidental deletion." 2051 ), 2052 context={ 2053 "source_id": source_id, 2054 "expected_name": name, 2055 "actual_name": actual_name, 2056 }, 2057 ) 2058 2059 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2060 workspace.permanently_delete_source( 2061 source=source_id, 2062 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 2063 ) 2064 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 2065 2066 2067@mcp_tool( 2068 destructive=True, 2069 open_world=True, 2070 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2071) 2072def permanently_delete_cloud_destination( 2073 ctx: Context, 2074 destination_id: Annotated[ 2075 str, 2076 Field(description="The ID of the deployed destination to delete."), 2077 ], 2078 name: Annotated[ 2079 str, 2080 Field(description="The expected name of the destination (for verification)."), 2081 ], 2082) -> str: 2083 """Permanently delete a deployed destination connector from Airbyte Cloud. 2084 2085 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 2086 (case insensitive). 2087 2088 If the destination does not meet this requirement, the deletion will be rejected with a 2089 helpful error message. Instruct the user to rename the destination appropriately to authorize 2090 the deletion. 2091 2092 The provided name must match the actual name of the destination for the operation to proceed. 2093 This is a safety measure to ensure you are deleting the correct resource. 2094 """ 2095 check_guid_created_in_session(destination_id) 2096 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2097 destination = workspace.get_destination(destination_id=destination_id) 2098 actual_name: str = cast(str, destination.name) 2099 2100 # Verify the name matches 2101 if actual_name != name: 2102 raise PyAirbyteInputError( 2103 message=( 2104 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2105 "The provided name must exactly match the destination's actual name. " 2106 "This is a safety measure to prevent accidental deletion." 2107 ), 2108 context={ 2109 "destination_id": destination_id, 2110 "expected_name": name, 2111 "actual_name": actual_name, 2112 }, 2113 ) 2114 2115 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2116 workspace.permanently_delete_destination( 2117 destination=destination_id, 2118 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2119 ) 2120 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 2121 2122 2123@mcp_tool( 2124 destructive=True, 2125 open_world=True, 2126 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2127) 2128def permanently_delete_cloud_connection( 2129 ctx: Context, 2130 connection_id: Annotated[ 2131 str, 2132 Field(description="The ID of the connection to delete."), 2133 ], 2134 name: Annotated[ 2135 str, 2136 Field(description="The expected name of the connection (for verification)."), 2137 ], 2138 *, 2139 cascade_delete_source: Annotated[ 2140 bool, 2141 Field( 2142 description=( 2143 "Whether to also delete the source connector associated with this connection." 2144 ), 2145 default=False, 2146 ), 2147 ] = False, 2148 cascade_delete_destination: Annotated[ 2149 bool, 2150 Field( 2151 description=( 2152 "Whether to also delete the destination connector associated with this connection." 2153 ), 2154 default=False, 2155 ), 2156 ] = False, 2157) -> str: 2158 """Permanently delete a connection from Airbyte Cloud. 2159 2160 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2161 (case insensitive). 2162 2163 If the connection does not meet this requirement, the deletion will be rejected with a 2164 helpful error message. Instruct the user to rename the connection appropriately to authorize 2165 the deletion. 2166 2167 The provided name must match the actual name of the connection for the operation to proceed. 2168 This is a safety measure to ensure you are deleting the correct resource. 2169 """ 2170 check_guid_created_in_session(connection_id) 2171 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2172 connection = workspace.get_connection(connection_id=connection_id) 2173 actual_name: str = cast(str, connection.name) 2174 2175 # Verify the name matches 2176 if actual_name != name: 2177 raise PyAirbyteInputError( 2178 message=( 2179 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2180 "The provided name must exactly match the connection's actual name. " 2181 "This is a safety measure to prevent accidental deletion." 2182 ), 2183 context={ 2184 "connection_id": connection_id, 2185 "expected_name": name, 2186 "actual_name": actual_name, 2187 }, 2188 ) 2189 2190 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2191 workspace.permanently_delete_connection( 2192 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2193 connection=connection_id, 2194 cascade_delete_source=cascade_delete_source, 2195 cascade_delete_destination=cascade_delete_destination, 2196 ) 2197 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 2198 2199 2200@mcp_tool( 2201 open_world=True, 2202 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2203) 2204def rename_cloud_source( 2205 ctx: Context, 2206 source_id: Annotated[ 2207 str, 2208 Field(description="The ID of the deployed source to rename."), 2209 ], 2210 name: Annotated[ 2211 str, 2212 Field(description="New name for the source."), 2213 ], 2214 *, 2215 workspace_id: Annotated[ 2216 str | None, 2217 Field( 2218 description=WORKSPACE_ID_TIP_TEXT, 2219 default=None, 2220 ), 2221 ], 2222) -> str: 2223 """Rename a deployed source connector on Airbyte Cloud.""" 2224 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2225 source = workspace.get_source(source_id=source_id) 2226 source.rename(name=name) 2227 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 2228 2229 2230@mcp_tool( 2231 destructive=True, 2232 open_world=True, 2233 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2234) 2235def update_cloud_source_config( 2236 ctx: Context, 2237 source_id: Annotated[ 2238 str, 2239 Field(description="The ID of the deployed source to update."), 2240 ], 2241 config: Annotated[ 2242 dict | str, 2243 Field( 2244 description="New configuration for the source connector.", 2245 ), 2246 ], 2247 config_secret_name: Annotated[ 2248 str | None, 2249 Field( 2250 description="The name of the secret containing the configuration.", 2251 default=None, 2252 ), 2253 ] = None, 2254 *, 2255 workspace_id: Annotated[ 2256 str | None, 2257 Field( 2258 description=WORKSPACE_ID_TIP_TEXT, 2259 default=None, 2260 ), 2261 ], 2262) -> str: 2263 """Update a deployed source connector's configuration on Airbyte Cloud. 2264 2265 This is a destructive operation that can break existing connections if the 2266 configuration is changed incorrectly. Use with caution. 2267 """ 2268 check_guid_created_in_session(source_id) 2269 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2270 source = workspace.get_source(source_id=source_id) 2271 2272 config_dict = resolve_connector_config( 2273 config=config, 2274 config_secret_name=config_secret_name, 2275 config_spec_jsonschema=None, # We don't have the spec here 2276 ) 2277 2278 source.update_config(config=config_dict) 2279 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 2280 2281 2282@mcp_tool( 2283 open_world=True, 2284 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2285) 2286def rename_cloud_destination( 2287 ctx: Context, 2288 destination_id: Annotated[ 2289 str, 2290 Field(description="The ID of the deployed destination to rename."), 2291 ], 2292 name: Annotated[ 2293 str, 2294 Field(description="New name for the destination."), 2295 ], 2296 *, 2297 workspace_id: Annotated[ 2298 str | None, 2299 Field( 2300 description=WORKSPACE_ID_TIP_TEXT, 2301 default=None, 2302 ), 2303 ], 2304) -> str: 2305 """Rename a deployed destination connector on Airbyte Cloud.""" 2306 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2307 destination = workspace.get_destination(destination_id=destination_id) 2308 destination.rename(name=name) 2309 return ( 2310 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2311 f"URL: {destination.connector_url}" 2312 ) 2313 2314 2315@mcp_tool( 2316 destructive=True, 2317 open_world=True, 2318 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2319) 2320def update_cloud_destination_config( 2321 ctx: Context, 2322 destination_id: Annotated[ 2323 str, 2324 Field(description="The ID of the deployed destination to update."), 2325 ], 2326 config: Annotated[ 2327 dict | str, 2328 Field( 2329 description="New configuration for the destination connector.", 2330 ), 2331 ], 2332 config_secret_name: Annotated[ 2333 str | None, 2334 Field( 2335 description="The name of the secret containing the configuration.", 2336 default=None, 2337 ), 2338 ], 2339 *, 2340 workspace_id: Annotated[ 2341 str | None, 2342 Field( 2343 description=WORKSPACE_ID_TIP_TEXT, 2344 default=None, 2345 ), 2346 ], 2347) -> str: 2348 """Update a deployed destination connector's configuration on Airbyte Cloud. 2349 2350 This is a destructive operation that can break existing connections if the 2351 configuration is changed incorrectly. Use with caution. 2352 """ 2353 check_guid_created_in_session(destination_id) 2354 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2355 destination = workspace.get_destination(destination_id=destination_id) 2356 2357 config_dict = resolve_connector_config( 2358 config=config, 2359 config_secret_name=config_secret_name, 2360 config_spec_jsonschema=None, # We don't have the spec here 2361 ) 2362 2363 destination.update_config(config=config_dict) 2364 return ( 2365 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2366 ) 2367 2368 2369@mcp_tool( 2370 open_world=True, 2371 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2372) 2373def rename_cloud_connection( 2374 ctx: Context, 2375 connection_id: Annotated[ 2376 str, 2377 Field(description="The ID of the connection to rename."), 2378 ], 2379 name: Annotated[ 2380 str, 2381 Field(description="New name for the connection."), 2382 ], 2383 *, 2384 workspace_id: Annotated[ 2385 str | None, 2386 Field( 2387 description=WORKSPACE_ID_TIP_TEXT, 2388 default=None, 2389 ), 2390 ], 2391) -> str: 2392 """Rename a connection on Airbyte Cloud.""" 2393 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2394 connection = workspace.get_connection(connection_id=connection_id) 2395 connection.rename(name=name) 2396 return ( 2397 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2398 f"URL: {connection.connection_url}" 2399 ) 2400 2401 2402@mcp_tool( 2403 destructive=True, 2404 open_world=True, 2405 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2406) 2407def set_cloud_connection_table_prefix( 2408 ctx: Context, 2409 connection_id: Annotated[ 2410 str, 2411 Field(description="The ID of the connection to update."), 2412 ], 2413 prefix: Annotated[ 2414 str, 2415 Field(description="New table prefix to use when syncing to the destination."), 2416 ], 2417 *, 2418 workspace_id: Annotated[ 2419 str | None, 2420 Field( 2421 description=WORKSPACE_ID_TIP_TEXT, 2422 default=None, 2423 ), 2424 ], 2425) -> str: 2426 """Set the table prefix for a connection on Airbyte Cloud. 2427 2428 This is a destructive operation that can break downstream dependencies if the 2429 table prefix is changed incorrectly. Use with caution. 2430 """ 2431 check_guid_created_in_session(connection_id) 2432 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2433 connection = workspace.get_connection(connection_id=connection_id) 2434 connection.set_table_prefix(prefix=prefix) 2435 return ( 2436 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2437 f"URL: {connection.connection_url}" 2438 ) 2439 2440 2441@mcp_tool( 2442 destructive=True, 2443 open_world=True, 2444 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2445) 2446def set_cloud_connection_selected_streams( 2447 ctx: Context, 2448 connection_id: Annotated[ 2449 str, 2450 Field(description="The ID of the connection to update."), 2451 ], 2452 stream_names: Annotated[ 2453 str | list[str], 2454 Field( 2455 description=( 2456 "The selected stream names to sync within the connection. " 2457 "Must be an explicit stream name or list of streams." 2458 ) 2459 ), 2460 ], 2461 *, 2462 workspace_id: Annotated[ 2463 str | None, 2464 Field( 2465 description=WORKSPACE_ID_TIP_TEXT, 2466 default=None, 2467 ), 2468 ], 2469) -> str: 2470 """Set the selected streams for a connection on Airbyte Cloud. 2471 2472 This is a destructive operation that can break existing connections if the 2473 stream selection is changed incorrectly. Use with caution. 2474 """ 2475 check_guid_created_in_session(connection_id) 2476 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2477 connection = workspace.get_connection(connection_id=connection_id) 2478 2479 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2480 connection.set_selected_streams(stream_names=resolved_streams_list) 2481 2482 return ( 2483 f"Successfully set selected streams for connection '{connection_id}' " 2484 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2485 ) 2486 2487 2488@mcp_tool( 2489 open_world=True, 2490 destructive=True, 2491 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2492) 2493def update_cloud_connection( 2494 ctx: Context, 2495 connection_id: Annotated[ 2496 str, 2497 Field(description="The ID of the connection to update."), 2498 ], 2499 *, 2500 enabled: Annotated[ 2501 bool | None, 2502 Field( 2503 description=( 2504 "Set the connection's enabled status. " 2505 "True enables the connection (status='active'), " 2506 "False disables it (status='inactive'). " 2507 "Leave unset to keep the current status." 2508 ), 2509 default=None, 2510 ), 2511 ], 2512 cron_expression: Annotated[ 2513 str | None, 2514 Field( 2515 description=( 2516 "A cron expression defining when syncs should run. " 2517 "Examples: '0 0 * * *' (daily at midnight UTC), " 2518 "'0 */6 * * *' (every 6 hours), " 2519 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2520 "Leave unset to keep the current schedule. " 2521 "Cannot be used together with 'manual_schedule'." 2522 ), 2523 default=None, 2524 ), 2525 ], 2526 manual_schedule: Annotated[ 2527 bool | None, 2528 Field( 2529 description=( 2530 "Set to True to disable automatic syncs (manual scheduling only). " 2531 "Syncs will only run when manually triggered. " 2532 "Cannot be used together with 'cron_expression'." 2533 ), 2534 default=None, 2535 ), 2536 ], 2537 workspace_id: Annotated[ 2538 str | None, 2539 Field( 2540 description=WORKSPACE_ID_TIP_TEXT, 2541 default=None, 2542 ), 2543 ], 2544) -> str: 2545 """Update a connection's settings on Airbyte Cloud. 2546 2547 This tool allows updating multiple connection settings in a single call: 2548 - Enable or disable the connection 2549 - Set a cron schedule for automatic syncs 2550 - Switch to manual scheduling (no automatic syncs) 2551 2552 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2553 parameters are mutually exclusive. 2554 """ 2555 check_guid_created_in_session(connection_id) 2556 2557 # Validate that at least one setting is provided 2558 if enabled is None and cron_expression is None and manual_schedule is None: 2559 raise ValueError( 2560 "At least one setting must be provided: 'enabled', 'cron_expression', " 2561 "or 'manual_schedule'." 2562 ) 2563 2564 # Validate mutually exclusive schedule options 2565 if cron_expression is not None and manual_schedule is True: 2566 raise ValueError( 2567 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2568 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2569 "for manual-only syncs." 2570 ) 2571 2572 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2573 connection = workspace.get_connection(connection_id=connection_id) 2574 2575 changes_made: list[str] = [] 2576 2577 # Apply enabled status change 2578 if enabled is not None: 2579 connection.set_enabled(enabled=enabled) 2580 status_str = "enabled" if enabled else "disabled" 2581 changes_made.append(f"status set to '{status_str}'") 2582 2583 # Apply schedule change 2584 if cron_expression is not None: 2585 connection.set_schedule(cron_expression=cron_expression) 2586 changes_made.append(f"schedule set to '{cron_expression}'") 2587 elif manual_schedule is True: 2588 connection.set_manual_schedule() 2589 changes_made.append("schedule set to 'manual'") 2590 2591 changes_summary = ", ".join(changes_made) 2592 return ( 2593 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2594 f"URL: {connection.connection_url}" 2595 ) 2596 2597 2598@mcp_tool( 2599 read_only=True, 2600 idempotent=True, 2601 open_world=True, 2602 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2603) 2604def get_connection_artifact( 2605 ctx: Context, 2606 connection_id: Annotated[ 2607 str, 2608 Field(description="The ID of the Airbyte Cloud connection."), 2609 ], 2610 artifact_type: Annotated[ 2611 Literal["state", "catalog"], 2612 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2613 ], 2614 *, 2615 workspace_id: Annotated[ 2616 str | None, 2617 Field( 2618 description=WORKSPACE_ID_TIP_TEXT, 2619 default=None, 2620 ), 2621 ], 2622) -> dict[str, Any]: 2623 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2624 2625 Retrieves the specified artifact for a connection: 2626 - 'state': Returns the full raw connection state including stateType and all 2627 state data, or {"ERROR": "..."} if no state is set. 2628 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2629 or {"ERROR": "..."} if not found. 2630 """ 2631 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2632 connection = workspace.get_connection(connection_id=connection_id) 2633 2634 if artifact_type == "state": 2635 result = connection.dump_raw_state() 2636 if result.get("stateType") == "not_set": 2637 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2638 return result 2639 2640 # artifact_type == "catalog" 2641 result = connection.dump_raw_catalog() 2642 if result is None: 2643 return {"ERROR": "No catalog found for this connection"} 2644 return result 2645 2646 2647def _add_defaults_for_exclude_args( 2648 exclude_args: list[str], 2649) -> None: 2650 """Patch registered tool functions to add Python-level defaults for excluded args. 2651 2652 FastMCP requires that excluded args have Python-level default values, but MCP tool 2653 functions should only use Field(default=...) in their Annotated type hints (not 2654 Python-level `= None`). This function bridges the gap by dynamically adding Python 2655 defaults to the function signatures at registration time, so the source code stays 2656 clean while satisfying FastMCP's requirement. 2657 2658 Args: 2659 exclude_args: List of argument names that will be excluded from the tool schema. 2660 """ 2661 import inspect # noqa: PLC0415 # Local import for optional patching logic 2662 2663 from fastmcp_extensions.decorators import ( # noqa: PLC0415 2664 _REGISTERED_TOOLS, # noqa: PLC2701 2665 ) 2666 2667 for func, _annotations in _REGISTERED_TOOLS: 2668 sig = inspect.signature(func) 2669 needs_patch = any( 2670 arg_name in sig.parameters 2671 and sig.parameters[arg_name].default is inspect.Parameter.empty 2672 for arg_name in exclude_args 2673 ) 2674 if needs_patch: 2675 new_params = [ 2676 p.replace(default=None) 2677 if name in exclude_args and p.default is inspect.Parameter.empty 2678 else p 2679 for name, p in sig.parameters.items() 2680 ] 2681 func.__signature__ = sig.replace(parameters=new_params) # type: ignore[attr-defined] 2682 2683 2684def register_cloud_tools(app: FastMCP) -> None: 2685 """Register cloud tools with the FastMCP app. 2686 2687 Args: 2688 app: FastMCP application instance 2689 """ 2690 exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None 2691 if exclude_args: 2692 _add_defaults_for_exclude_args(exclude_args) 2693 register_mcp_tools( 2694 app, 2695 mcp_module=__name__, 2696 exclude_args=exclude_args, 2697 )