airbyte.mcp.cloud
Airbyte Cloud MCP operations.
cloud module
MCP primitives registered by the cloud module of the airbyte-mcp server: 35 tool(s), 0 prompt(s), 0 resource(s).
Tools (35)
check_airbyte_cloud_workspace
Hints: read-only · idempotent · open-world
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace details including workspace ID, name, organization info, and billing status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Information about a workspace in Airbyte Cloud.",
"properties": {
"workspace_id": {
"type": "string"
},
"workspace_name": {
"type": "string"
},
"workspace_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"organization_id": {
"type": "string"
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"payment_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"subscription_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"is_account_locked": {
"default": false,
"type": "boolean"
}
},
"required": [
"workspace_id",
"workspace_name",
"organization_id"
],
"type": "object"
}
create_connection_on_cloud
Hints: open-world
Create a connection between a deployed source and destination on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_name |
string |
yes | — | The name of the connection. |
source_id |
string |
yes | — | The ID of the deployed source. |
destination_id |
string |
yes | — | The ID of the deployed destination. |
selected_streams |
string | array<string> |
yes | — | The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
table_prefix |
string | null |
no | null |
Optional table prefix to use when syncing to the destination. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_name": {
"description": "The name of the connection.",
"type": "string"
},
"source_id": {
"description": "The ID of the deployed source.",
"type": "string"
},
"destination_id": {
"description": "The ID of the deployed destination.",
"type": "string"
},
"selected_streams": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
],
"description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"table_prefix": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional table prefix to use when syncing to the destination."
}
},
"required": [
"connection_name",
"source_id",
"destination_id",
"selected_streams"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
deploy_destination_to_cloud
Hints: open-world
Deploy a destination connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_name |
string |
yes | — | The name to use when deploying the destination. |
destination_connector_name |
string |
yes | — | The name of the destination connector (e.g., 'destination-postgres'). |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
config |
object | string | null |
no | null |
The configuration for the destination connector. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
unique |
boolean |
no | true |
Whether to require a unique name. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_name": {
"description": "The name to use when deploying the destination.",
"type": "string"
},
"destination_connector_name": {
"description": "The name of the destination connector (e.g., 'destination-postgres').",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the destination connector."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"unique": {
"default": true,
"description": "Whether to require a unique name.",
"type": "boolean"
}
},
"required": [
"destination_name",
"destination_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
deploy_noop_destination_to_cloud
Hints: open-world
Deploy the No-op destination to Airbyte Cloud for testing purposes.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
no | "No-op Destination" |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
unique |
boolean |
no | true |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"name": {
"default": "No-op Destination",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"unique": {
"default": true,
"type": "boolean"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
deploy_source_to_cloud
Hints: open-world
Deploy a source connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_name |
string |
yes | — | The name to use when deploying the source. |
source_connector_name |
string |
yes | — | The name of the source connector (e.g., 'source-faker'). |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
config |
object | string | null |
no | null |
The configuration for the source connector. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
unique |
boolean |
no | true |
Whether to require a unique name. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_name": {
"description": "The name to use when deploying the source.",
"type": "string"
},
"source_connector_name": {
"description": "The name of the source connector (e.g., 'source-faker').",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The configuration for the source connector."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"unique": {
"default": true,
"description": "Whether to require a unique name.",
"type": "boolean"
}
},
"required": [
"source_name",
"source_connector_name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
describe_cloud_connection
Hints: read-only · idempotent · open-world
Get detailed information about a specific deployed connection.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to describe. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to describe.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Detailed information about a deployed connection in Airbyte Cloud.",
"properties": {
"connection_id": {
"type": "string"
},
"connection_name": {
"type": "string"
},
"connection_url": {
"type": "string"
},
"source_id": {
"type": "string"
},
"source_name": {
"type": "string"
},
"destination_id": {
"type": "string"
},
"destination_name": {
"type": "string"
},
"selected_streams": {
"items": {
"type": "string"
},
"type": "array"
},
"table_prefix": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
]
}
},
"required": [
"connection_id",
"connection_name",
"connection_url",
"source_id",
"source_name",
"destination_id",
"destination_name",
"selected_streams",
"table_prefix"
],
"type": "object"
}
describe_cloud_destination
Hints: read-only · idempotent · open-world
Get detailed information about a specific deployed destination connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the destination to describe. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the destination to describe.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"destination_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Detailed information about a deployed destination connector in Airbyte Cloud.",
"properties": {
"destination_id": {
"type": "string"
},
"destination_name": {
"type": "string"
},
"destination_url": {
"type": "string"
},
"connector_definition_id": {
"type": "string"
}
},
"required": [
"destination_id",
"destination_name",
"destination_url",
"connector_definition_id"
],
"type": "object"
}
describe_cloud_organization
Hints: read-only · idempotent · open-world
Get details about a specific organization including billing status.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
organization_id |
string | null |
no | null |
Organization ID. Required if organization_name is not provided. |
organization_name |
string | null |
no | null |
Organization name (exact match). Required if organization_id is not provided. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization ID. Required if organization_name is not provided."
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization name (exact match). Required if organization_id is not provided."
}
},
"type": "object"
}
Show output JSON schema
{
"description": "Information about an organization in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"email": {
"type": "string"
},
"payment_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"subscription_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"is_account_locked": {
"default": false,
"type": "boolean"
}
},
"required": [
"id",
"name",
"email"
],
"type": "object"
}
describe_cloud_source
Hints: read-only · idempotent · open-world
Get detailed information about a specific deployed source connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the source to describe. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the source to describe.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"source_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Detailed information about a deployed source connector in Airbyte Cloud.",
"properties": {
"source_id": {
"type": "string"
},
"source_name": {
"type": "string"
},
"source_url": {
"type": "string"
},
"connector_definition_id": {
"type": "string"
}
},
"required": [
"source_id",
"source_name",
"source_url",
"connector_definition_id"
],
"type": "object"
}
get_cloud_sync_logs
Hints: read-only · idempotent · open-world
Get the logs from a sync job attempt on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
job_id |
integer | null | null |
no | null |
|
attempt_number |
integer | null | null |
no | null |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
max_lines |
integer |
no | 4000 |
Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied. |
from_tail |
boolean | null |
no | null |
Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if line_offset is not specified. Cannot combine from_tail=True with line_offset. |
line_offset |
integer | null |
no | null |
Number of lines to skip from the beginning of the logs. Cannot be combined with from_tail=True. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"job_id": {
"anyOf": [
{
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"description": "Optional job ID. If not provided, the latest job will be used."
},
{
"type": "null"
}
],
"default": null
},
"attempt_number": {
"anyOf": [
{
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"description": "Optional attempt number. If not provided, the latest attempt will be used."
},
{
"type": "null"
}
],
"default": null
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"max_lines": {
"default": 4000,
"description": "Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.",
"type": "integer"
},
"from_tail": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`."
},
"line_offset": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of reading sync logs with pagination support.",
"properties": {
"job_id": {
"type": "integer"
},
"attempt_number": {
"type": "integer"
},
"log_text": {
"type": "string"
},
"log_text_start_line": {
"type": "integer"
},
"log_text_line_count": {
"type": "integer"
},
"total_log_lines_available": {
"type": "integer"
}
},
"required": [
"job_id",
"attempt_number",
"log_text",
"log_text_start_line",
"log_text_line_count",
"total_log_lines_available"
],
"type": "object"
}
get_cloud_sync_status
Hints: read-only · idempotent · open-world
Get the status of a sync job from the Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
job_id |
integer | null |
no | null |
Optional job ID. If not provided, the latest job will be used. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
include_attempts |
boolean |
no | false |
Whether to include detailed attempts information. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"job_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional job ID. If not provided, the latest job will be used."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"include_attempts": {
"default": false,
"description": "Whether to include detailed attempts information.",
"type": "boolean"
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_connection_artifact
Hints: read-only · idempotent · open-world
Get a connection artifact (state or catalog) from Airbyte Cloud.
By default, returns artifacts in Airbyte protocol format (snake_case,
suitable for passing to connector CLI flags like `--state` or `--catalog`).
Retrieves the specified artifact for a connection:
- `state`: Returns a list of protocol-format `AirbyteStateMessage` dicts,
or `{"ERROR": "..."}` if no state is set.
- `catalog`: Returns the protocol-format `ConfiguredAirbyteCatalog` dict,
or `{"ERROR": "..."}` if not found.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
artifact_type |
enum("state", "catalog") |
yes | — | The type of artifact to retrieve: 'state' or 'catalog'. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"artifact_type": {
"description": "The type of artifact to retrieve: 'state' or 'catalog'.",
"enum": [
"state",
"catalog"
],
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"artifact_type"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
]
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
get_connector_builder_draft_manifest
Hints: read-only · idempotent · open-world
Get the Connector Builder draft manifest for a custom source definition.
Returns the working draft manifest that has been saved in the Connector Builder UI but not yet published. This is useful for inspecting what a user is currently working on before they publish their changes.
If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. The published manifest is always included for comparison.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the custom source definition to retrieve the draft for. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the custom source definition to retrieve the draft for.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"definition_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
get_custom_source_definition
Hints: read-only · idempotent · open-world
Get a custom YAML source definition from Airbyte Cloud, including its manifest.
Returns the full definition details including the published manifest YAML content. Optionally includes the Connector Builder draft manifest (unpublished changes) when include_draft=True.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the custom source definition to retrieve. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
include_draft |
boolean |
no | false |
Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the custom source definition to retrieve.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"include_draft": {
"default": false,
"description": "Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.",
"type": "boolean"
}
},
"required": [
"definition_id"
],
"type": "object"
}
Show output JSON schema
{
"additionalProperties": true,
"type": "object"
}
list_cloud_sync_jobs
Hints: read-only · idempotent · open-world
List sync jobs for a connection with limit support.
This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and result limit. By default, jobs are returned
newest-first (`from_tail=True`).
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
max_jobs |
integer |
no | 20 |
Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500. |
from_tail |
boolean | null |
no | null |
When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True. |
job_type |
enum("sync", "reset", "refresh", "clear") | null |
no | null |
Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"max_jobs": {
"default": 20,
"description": "Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.",
"type": "integer"
},
"from_tail": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True."
},
"job_type": {
"anyOf": [
{
"description": "Enum that describes the different types of jobs that the platform runs.",
"enum": [
"sync",
"reset",
"refresh",
"clear"
],
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"description": "Result of listing sync jobs with limit support.",
"properties": {
"jobs": {
"items": {
"description": "Information about a sync job.",
"properties": {
"job_id": {
"type": "integer"
},
"status": {
"type": "string"
},
"bytes_synced": {
"type": "integer"
},
"records_synced": {
"type": "integer"
},
"start_time": {
"type": "string"
},
"job_url": {
"type": "string"
}
},
"required": [
"job_id",
"status",
"bytes_synced",
"records_synced",
"start_time",
"job_url"
],
"type": "object"
},
"type": "array"
},
"jobs_count": {
"type": "integer"
},
"from_tail": {
"type": "boolean"
}
},
"required": [
"jobs",
"jobs_count",
"from_tail"
],
"type": "object"
}
list_cloud_workspaces
Hints: read-only · idempotent · open-world
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
organization_id |
string | null |
no | null |
Organization ID. Required if organization_name is not provided. |
organization_name |
string | null |
no | null |
Organization name (exact match). Required if organization_id is not provided. |
name_contains |
string | null |
no | null |
Optional substring to filter workspaces by name (server-side filtering) |
limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"organization_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization ID. Required if organization_name is not provided."
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Organization name (exact match). Required if organization_id is not provided."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional substring to filter workspaces by name (server-side filtering)"
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a workspace in Airbyte Cloud.",
"properties": {
"workspace_id": {
"type": "string"
},
"workspace_name": {
"type": "string"
},
"workspace_url": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"organization_id": {
"type": "string"
},
"organization_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"payment_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"subscription_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"is_account_locked": {
"default": false,
"type": "boolean"
}
},
"required": [
"workspace_id",
"workspace_name",
"organization_id"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_custom_source_definitions
Hints: read-only · idempotent · open-world
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_deployed_cloud_connections
Hints: read-only · idempotent · open-world
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
name_contains |
string | null |
no | null |
Optional case-insensitive substring to filter connections by name |
limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
with_connection_status |
boolean | null |
no | false |
If True, include status info for each connection's most recent sync job |
failing_connections_only |
boolean | null |
no | false |
If True, only return connections with failed/cancelled last sync |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional case-insensitive substring to filter connections by name"
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
},
"with_connection_status": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": false,
"description": "If True, include status info for each connection's most recent sync job"
},
"failing_connections_only": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": false,
"description": "If True, only return connections with failed/cancelled last sync"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a deployed connection in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"url": {
"type": "string"
},
"source_id": {
"type": "string"
},
"destination_id": {
"type": "string"
},
"last_job_status": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"last_job_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"last_job_time": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
},
"currently_running_job_id": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null
},
"currently_running_job_start_time": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null
}
},
"required": [
"id",
"name",
"url",
"source_id",
"destination_id"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_deployed_cloud_destination_connectors
Hints: read-only · idempotent · open-world
List all deployed destination connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
name_contains |
string | null |
no | null |
Optional case-insensitive substring to filter destinations by name |
limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional case-insensitive substring to filter destinations by name"
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a deployed destination connector in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"id",
"name",
"url"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
list_deployed_cloud_source_connectors
Hints: read-only · idempotent · open-world
List all deployed source connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
name_contains |
string | null |
no | null |
Optional case-insensitive substring to filter sources by name |
limit |
integer | null |
no | null |
Optional maximum number of items to return (default: no limit) |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"name_contains": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional case-insensitive substring to filter sources by name"
},
"limit": {
"anyOf": [
{
"type": "integer"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional maximum number of items to return (default: no limit)"
}
},
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"items": {
"description": "Information about a deployed source connector in Airbyte Cloud.",
"properties": {
"id": {
"type": "string"
},
"name": {
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"id",
"name",
"url"
],
"type": "object"
},
"type": "array"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_cloud_connection
Hints: destructive · open-world
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.
The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to delete. |
name |
string |
yes | — | The expected name of the connection (for verification). |
cascade_delete_source |
boolean |
no | false |
Whether to also delete the source connector associated with this connection. |
cascade_delete_destination |
boolean |
no | false |
Whether to also delete the destination connector associated with this connection. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the connection (for verification).",
"type": "string"
},
"cascade_delete_source": {
"default": false,
"description": "Whether to also delete the source connector associated with this connection.",
"type": "boolean"
},
"cascade_delete_destination": {
"default": false,
"description": "Whether to also delete the destination connector associated with this connection.",
"type": "boolean"
}
},
"required": [
"connection_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_cloud_destination
Hints: destructive · open-world
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.
The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the deployed destination to delete. |
name |
string |
yes | — | The expected name of the destination (for verification). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the deployed destination to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the destination (for verification).",
"type": "string"
}
},
"required": [
"destination_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_cloud_source
Hints: destructive · open-world
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.
The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the deployed source to delete. |
name |
string |
yes | — | The expected name of the source (for verification). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the deployed source to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the source (for verification).",
"type": "string"
}
},
"required": [
"source_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
permanently_delete_custom_source_definition
Hints: destructive · open-world
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the custom source definition to delete. |
name |
string |
yes | — | The expected name of the custom source definition (for verification). |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the custom source definition to delete.",
"type": "string"
},
"name": {
"description": "The expected name of the custom source definition (for verification).",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"definition_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
publish_custom_source_definition
Hints: open-world
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
name |
string |
yes | — | The name for the custom connector definition. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
manifest_yaml |
string | string | null | null |
no | null |
|
unique |
boolean |
no | true |
Whether to require a unique name. |
pre_validate |
boolean |
no | true |
Whether to validate the manifest client-side before publishing. |
testing_values |
object | string | null |
no | null |
Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations. |
testing_values_secret_name |
string | null |
no | null |
Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"name": {
"description": "The name for the custom connector definition.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"manifest_yaml": {
"anyOf": [
{
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors."
},
{
"type": "null"
}
],
"default": null
},
"unique": {
"default": true,
"description": "Whether to require a unique name.",
"type": "boolean"
},
"pre_validate": {
"default": true,
"description": "Whether to validate the manifest client-side before publishing.",
"type": "boolean"
},
"testing_values": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations."
},
"testing_values_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
}
},
"required": [
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
rename_cloud_connection
Hints: open-world
Rename a connection on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to rename. |
name |
string |
yes | — | New name for the connection. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to rename.",
"type": "string"
},
"name": {
"description": "New name for the connection.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
rename_cloud_destination
Hints: open-world
Rename a deployed destination connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the deployed destination to rename. |
name |
string |
yes | — | New name for the destination. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the deployed destination to rename.",
"type": "string"
},
"name": {
"description": "New name for the destination.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"destination_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
rename_cloud_source
Hints: open-world
Rename a deployed source connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the deployed source to rename. |
name |
string |
yes | — | New name for the source. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the deployed source to rename.",
"type": "string"
},
"name": {
"description": "New name for the source.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"source_id",
"name"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
run_cloud_sync
Hints: open-world
Run a sync job on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the Airbyte Cloud connection. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
wait |
boolean |
no | false |
Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios. |
wait_timeout |
integer |
no | 300 |
Maximum time to wait for sync completion (seconds). |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the Airbyte Cloud connection.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"wait": {
"default": false,
"description": "Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.",
"type": "boolean"
},
"wait_timeout": {
"default": 300,
"description": "Maximum time to wait for sync completion (seconds).",
"type": "integer"
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
set_cloud_connection_selected_streams
Hints: destructive · open-world
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to update. |
stream_names |
string | array<string> |
yes | — | The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to update.",
"type": "string"
},
"stream_names": {
"anyOf": [
{
"type": "string"
},
{
"items": {
"type": "string"
},
"type": "array"
}
],
"description": "The selected stream names to sync within the connection. Must be an explicit stream name or list of streams."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"stream_names"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
set_cloud_connection_table_prefix
Hints: destructive · open-world
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to update. |
prefix |
string |
yes | — | New table prefix to use when syncing to the destination. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to update.",
"type": "string"
},
"prefix": {
"description": "New table prefix to use when syncing to the destination.",
"type": "string"
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id",
"prefix"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_cloud_connection
Hints: destructive · open-world
Update a connection's settings on Airbyte Cloud.
This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)
At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
connection_id |
string |
yes | — | The ID of the connection to update. |
enabled |
boolean | null |
no | null |
Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status. |
cron_expression |
string | null |
no | null |
A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'. |
manual_schedule |
boolean | null |
no | null |
Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"connection_id": {
"description": "The ID of the connection to update.",
"type": "string"
},
"enabled": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status."
},
"cron_expression": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'."
},
"manual_schedule": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "null"
}
],
"default": null,
"description": "Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"connection_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_cloud_destination_config
Hints: destructive · open-world
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
destination_id |
string |
yes | — | The ID of the deployed destination to update. |
config |
object | string |
yes | — | New configuration for the destination connector. |
config_secret_name |
string | null |
no | null |
The name of the secret containing the configuration. |
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"destination_id": {
"description": "The ID of the deployed destination to update.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
}
],
"description": "New configuration for the destination connector."
},
"config_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"destination_id",
"config"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_cloud_source_config
Hints: destructive · open-world
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
source_id |
string |
yes | — | The ID of the deployed source to update. |
config |
object | string |
yes | — | New configuration for the source connector. |
config_secret_name |
string | null | null |
no | null |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"source_id": {
"description": "The ID of the deployed source to update.",
"type": "string"
},
"config": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
}
],
"description": "New configuration for the source connector."
},
"config_secret_name": {
"anyOf": [
{
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "The name of the secret containing the configuration."
},
{
"type": "null"
}
],
"default": null
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
}
},
"required": [
"source_id",
"config"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
update_custom_source_definition
Hints: destructive · open-world
Update a custom YAML source definition in Airbyte Cloud.
Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
Parameters
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
definition_id |
string |
yes | — | The ID of the definition to update. |
manifest_yaml |
string | string | null | null |
no | null |
|
workspace_id |
string | null |
no | null |
Workspace ID. Defaults to AIRBYTE_CLOUD_WORKSPACE_ID env var. |
pre_validate |
boolean |
no | true |
Whether to validate the manifest client-side before updating. |
testing_values |
object | string | null |
no | null |
Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist. |
testing_values_secret_name |
string | null |
no | null |
Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments. |
Show input JSON schema
{
"additionalProperties": false,
"properties": {
"definition_id": {
"description": "The ID of the definition to update.",
"type": "string"
},
"manifest_yaml": {
"anyOf": [
{
"anyOf": [
{
"type": "string"
},
{
"format": "path",
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "New manifest as YAML string or file path. Optional; omit to update only testing values."
},
{
"type": "null"
}
],
"default": null
},
"workspace_id": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
},
"pre_validate": {
"default": true,
"description": "Whether to validate the manifest client-side before updating.",
"type": "boolean"
},
"testing_values": {
"anyOf": [
{
"additionalProperties": true,
"type": "object"
},
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist."
},
"testing_values_secret_name": {
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"default": null,
"description": "Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments."
}
},
"required": [
"definition_id"
],
"type": "object"
}
Show output JSON schema
{
"properties": {
"result": {
"type": "string"
}
},
"required": [
"result"
],
"type": "object",
"x-fastmcp-wrap-result": true
}
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations. 3 4.. include:: ../../docs/mcp-generated/cloud.md 5""" 6 7# No public Python API — MCP primitives are registered via decorators and 8# documented via the generated Markdown include above. Setting `__all__` to an 9# empty list tells pdoc (and other doc tools) not to surface the individual 10# tool / helper definitions as a redundant "API Documentation" list. 11__all__: list[str] = [] 12 13from pathlib import Path 14from typing import Annotated, Any, Literal, cast 15 16from airbyte_api.models import JobTypeEnum 17from fastmcp import Context, FastMCP 18from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 19from pydantic import BaseModel, Field 20 21from airbyte import cloud, get_destination, get_source 22from airbyte._util import api_util 23from airbyte.cloud.client import CloudClient 24from airbyte.cloud.connectors import CustomCloudSourceDefinition 25from airbyte.cloud.constants import FAILED_STATUSES 26from airbyte.cloud.workspaces import CloudWorkspace 27from airbyte.constants import ( 28 MCP_CONFIG_API_URL, 29 MCP_CONFIG_BEARER_TOKEN, 30 MCP_CONFIG_CLIENT_ID, 31 MCP_CONFIG_CLIENT_SECRET, 32 MCP_CONFIG_CONFIG_API_URL, 33 MCP_CONFIG_WORKSPACE_ID, 34) 35from airbyte.destinations.util import get_noop_destination 36from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 37from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 38from airbyte.mcp._tool_utils import ( 39 AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET, 40 check_guid_created_in_session, 41 register_guid_created_in_session, 42) 43 44 45CLOUD_AUTH_TIP_TEXT = ( 46 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 47 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 48 "will be used to authenticate with the Airbyte Cloud API." 49) 50WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 51 52 53class CloudSourceResult(BaseModel): 54 """Information about a deployed source connector in Airbyte Cloud.""" 55 56 id: str 57 """The source ID.""" 58 name: str 59 """Display name of the source.""" 60 url: str 61 """Web URL for managing this source in Airbyte Cloud.""" 62 63 64class CloudDestinationResult(BaseModel): 65 """Information about a deployed destination connector in Airbyte Cloud.""" 66 67 id: str 68 """The destination ID.""" 69 name: str 70 """Display name of the destination.""" 71 url: str 72 """Web URL for managing this destination in Airbyte Cloud.""" 73 74 75class CloudConnectionResult(BaseModel): 76 """Information about a deployed connection in Airbyte Cloud.""" 77 78 id: str 79 """The connection ID.""" 80 name: str 81 """Display name of the connection.""" 82 url: str 83 """Web URL for managing this connection in Airbyte Cloud.""" 84 source_id: str 85 """ID of the source used by this connection.""" 86 destination_id: str 87 """ID of the destination used by this connection.""" 88 last_job_status: str | None = None 89 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 90 Only populated when with_connection_status=True.""" 91 last_job_id: int | None = None 92 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 93 last_job_time: str | None = None 94 """ISO 8601 timestamp of the most recent completed sync. 95 Only populated when with_connection_status=True.""" 96 currently_running_job_id: int | None = None 97 """Job ID of a currently running sync, if any. 98 Only populated when with_connection_status=True.""" 99 currently_running_job_start_time: str | None = None 100 """ISO 8601 timestamp of when the currently running sync started. 101 Only populated when with_connection_status=True.""" 102 103 104class CloudSourceDetails(BaseModel): 105 """Detailed information about a deployed source connector in Airbyte Cloud.""" 106 107 source_id: str 108 """The source ID.""" 109 source_name: str 110 """Display name of the source.""" 111 source_url: str 112 """Web URL for managing this source in Airbyte Cloud.""" 113 connector_definition_id: str 114 """The connector definition ID (e.g., the ID for 'source-postgres').""" 115 116 117class CloudDestinationDetails(BaseModel): 118 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 119 120 destination_id: str 121 """The destination ID.""" 122 destination_name: str 123 """Display name of the destination.""" 124 destination_url: str 125 """Web URL for managing this destination in Airbyte Cloud.""" 126 connector_definition_id: str 127 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 128 129 130class CloudConnectionDetails(BaseModel): 131 """Detailed information about a deployed connection in Airbyte Cloud.""" 132 133 connection_id: str 134 """The connection ID.""" 135 connection_name: str 136 """Display name of the connection.""" 137 connection_url: str 138 """Web URL for managing this connection in Airbyte Cloud.""" 139 source_id: str 140 """ID of the source used by this connection.""" 141 source_name: str 142 """Display name of the source.""" 143 destination_id: str 144 """ID of the destination used by this connection.""" 145 destination_name: str 146 """Display name of the destination.""" 147 selected_streams: list[str] 148 """List of stream names selected for syncing.""" 149 table_prefix: str | None 150 """Table prefix applied when syncing to the destination.""" 151 152 153class CloudOrganizationResult(BaseModel): 154 """Information about an organization in Airbyte Cloud.""" 155 156 id: str 157 """The organization ID.""" 158 name: str 159 """Display name of the organization.""" 160 email: str 161 """Email associated with the organization.""" 162 payment_status: str | None = None 163 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 164 When 'disabled', syncs are blocked due to unpaid invoices.""" 165 subscription_status: str | None = None 166 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 167 'unsubscribed').""" 168 is_account_locked: bool = False 169 """Whether the account is locked due to billing issues. 170 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 171 Defaults to False unless we have affirmative evidence of a locked state.""" 172 173 174class CloudWorkspaceResult(BaseModel): 175 """Information about a workspace in Airbyte Cloud.""" 176 177 workspace_id: str 178 """The workspace ID.""" 179 workspace_name: str 180 """Display name of the workspace.""" 181 workspace_url: str | None = None 182 """URL to access the workspace in Airbyte Cloud.""" 183 organization_id: str 184 """ID of the organization (requires ORGANIZATION_READER permission).""" 185 organization_name: str | None = None 186 """Name of the organization (requires ORGANIZATION_READER permission).""" 187 payment_status: str | None = None 188 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 189 When 'disabled', syncs are blocked due to unpaid invoices. 190 Requires ORGANIZATION_READER permission.""" 191 subscription_status: str | None = None 192 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 193 'unsubscribed'). Requires ORGANIZATION_READER permission.""" 194 is_account_locked: bool = False 195 """Whether the account is locked due to billing issues. 196 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 197 Defaults to False unless we have affirmative evidence of a locked state. 198 Requires ORGANIZATION_READER permission.""" 199 200 201class LogReadResult(BaseModel): 202 """Result of reading sync logs with pagination support.""" 203 204 job_id: int 205 """The job ID the logs belong to.""" 206 attempt_number: int 207 """The attempt number the logs belong to.""" 208 log_text: str 209 """The string containing the log text we are returning.""" 210 log_text_start_line: int 211 """1-based line index of the first line returned.""" 212 log_text_line_count: int 213 """Count of lines we are returning.""" 214 total_log_lines_available: int 215 """Total number of log lines available, shows if any lines were missed due to the limit.""" 216 217 218class SyncJobResult(BaseModel): 219 """Information about a sync job.""" 220 221 job_id: int 222 """The job ID.""" 223 status: str 224 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 225 bytes_synced: int 226 """Number of bytes synced in this job.""" 227 records_synced: int 228 """Number of records synced in this job.""" 229 start_time: str 230 """ISO 8601 timestamp of when the job started.""" 231 job_url: str 232 """URL to view the job in Airbyte Cloud.""" 233 234 235class SyncJobListResult(BaseModel): 236 """Result of listing sync jobs with limit support.""" 237 238 jobs: list[SyncJobResult] 239 """List of sync jobs.""" 240 jobs_count: int 241 """Number of jobs returned in this response.""" 242 from_tail: bool 243 """Whether jobs are ordered newest-first (True) or oldest-first (False).""" 244 245 246def _get_cloud_workspace( 247 ctx: Context, 248 workspace_id: str | None = None, 249) -> CloudWorkspace: 250 """Get an authenticated CloudWorkspace. 251 252 Resolves credentials from multiple sources via MCP config args in order: 253 1. HTTP headers (when running as MCP server with HTTP/SSE transport) 254 2. Environment variables 255 256 The ctx parameter provides access to MCP config values that are resolved 257 from HTTP headers or environment variables based on the config args 258 defined in server.py. 259 """ 260 resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID) 261 if not resolved_workspace_id: 262 raise PyAirbyteInputError( 263 message="Workspace ID is required but not provided.", 264 guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.", 265 ) 266 267 return _get_cloud_client(ctx).get_workspace(resolved_workspace_id) 268 269 270def _get_cloud_client( 271 ctx: Context, 272 *, 273 organization_id: str | None = None, 274) -> CloudClient: 275 """Get an authenticated `CloudClient` from MCP config.""" 276 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 277 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 278 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 279 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) 280 config_api_url = get_mcp_config(ctx, MCP_CONFIG_CONFIG_API_URL) 281 282 return CloudClient( 283 client_id=client_id, 284 client_secret=client_secret, 285 bearer_token=bearer_token, 286 public_api_root=api_url, 287 config_api_root=config_api_url, 288 organization_id=organization_id, 289 ) 290 291 292@mcp_tool( 293 open_world=True, 294 extra_help_text=CLOUD_AUTH_TIP_TEXT, 295) 296def deploy_source_to_cloud( 297 ctx: Context, 298 source_name: Annotated[ 299 str, 300 Field(description="The name to use when deploying the source."), 301 ], 302 source_connector_name: Annotated[ 303 str, 304 Field(description="The name of the source connector (e.g., 'source-faker')."), 305 ], 306 *, 307 workspace_id: Annotated[ 308 str | None, 309 Field( 310 description=WORKSPACE_ID_TIP_TEXT, 311 default=None, 312 ), 313 ], 314 config: Annotated[ 315 dict | str | None, 316 Field( 317 description="The configuration for the source connector.", 318 default=None, 319 ), 320 ], 321 config_secret_name: Annotated[ 322 str | None, 323 Field( 324 description="The name of the secret containing the configuration.", 325 default=None, 326 ), 327 ], 328 unique: Annotated[ 329 bool, 330 Field( 331 description="Whether to require a unique name.", 332 default=True, 333 ), 334 ], 335) -> str: 336 """Deploy a source connector to Airbyte Cloud.""" 337 source = get_source( 338 source_connector_name, 339 no_executor=True, 340 ) 341 config_dict = resolve_connector_config( 342 config=config, 343 config_secret_name=config_secret_name, 344 config_spec_jsonschema=source.config_spec, 345 ) 346 source.set_config(config_dict, validate=True) 347 348 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 349 deployed_source = workspace.deploy_source( 350 name=source_name, 351 source=source, 352 unique=unique, 353 ) 354 355 register_guid_created_in_session(deployed_source.connector_id) 356 return ( 357 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 358 f" and URL: {deployed_source.connector_url}" 359 ) 360 361 362@mcp_tool( 363 open_world=True, 364 extra_help_text=CLOUD_AUTH_TIP_TEXT, 365) 366def deploy_destination_to_cloud( 367 ctx: Context, 368 destination_name: Annotated[ 369 str, 370 Field(description="The name to use when deploying the destination."), 371 ], 372 destination_connector_name: Annotated[ 373 str, 374 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 375 ], 376 *, 377 workspace_id: Annotated[ 378 str | None, 379 Field( 380 description=WORKSPACE_ID_TIP_TEXT, 381 default=None, 382 ), 383 ], 384 config: Annotated[ 385 dict | str | None, 386 Field( 387 description="The configuration for the destination connector.", 388 default=None, 389 ), 390 ], 391 config_secret_name: Annotated[ 392 str | None, 393 Field( 394 description="The name of the secret containing the configuration.", 395 default=None, 396 ), 397 ], 398 unique: Annotated[ 399 bool, 400 Field( 401 description="Whether to require a unique name.", 402 default=True, 403 ), 404 ], 405) -> str: 406 """Deploy a destination connector to Airbyte Cloud.""" 407 destination = get_destination( 408 destination_connector_name, 409 no_executor=True, 410 ) 411 config_dict = resolve_connector_config( 412 config=config, 413 config_secret_name=config_secret_name, 414 config_spec_jsonschema=destination.config_spec, 415 ) 416 destination.set_config(config_dict, validate=True) 417 418 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 419 deployed_destination = workspace.deploy_destination( 420 name=destination_name, 421 destination=destination, 422 unique=unique, 423 ) 424 425 register_guid_created_in_session(deployed_destination.connector_id) 426 return ( 427 f"Successfully deployed destination '{destination_name}' " 428 f"with ID: {deployed_destination.connector_id}" 429 ) 430 431 432@mcp_tool( 433 open_world=True, 434 extra_help_text=CLOUD_AUTH_TIP_TEXT, 435) 436def create_connection_on_cloud( 437 ctx: Context, 438 connection_name: Annotated[ 439 str, 440 Field(description="The name of the connection."), 441 ], 442 source_id: Annotated[ 443 str, 444 Field(description="The ID of the deployed source."), 445 ], 446 destination_id: Annotated[ 447 str, 448 Field(description="The ID of the deployed destination."), 449 ], 450 selected_streams: Annotated[ 451 str | list[str], 452 Field( 453 description=( 454 "The selected stream names to sync within the connection. " 455 "Must be an explicit stream name or list of streams. " 456 "Cannot be empty or '*'." 457 ) 458 ), 459 ], 460 *, 461 workspace_id: Annotated[ 462 str | None, 463 Field( 464 description=WORKSPACE_ID_TIP_TEXT, 465 default=None, 466 ), 467 ], 468 table_prefix: Annotated[ 469 str | None, 470 Field( 471 description="Optional table prefix to use when syncing to the destination.", 472 default=None, 473 ), 474 ], 475) -> str: 476 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 477 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 478 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 479 deployed_connection = workspace.deploy_connection( 480 connection_name=connection_name, 481 source=source_id, 482 destination=destination_id, 483 selected_streams=resolved_streams_list, 484 table_prefix=table_prefix, 485 ) 486 487 register_guid_created_in_session(deployed_connection.connection_id) 488 return ( 489 f"Successfully created connection '{connection_name}' " 490 f"with ID '{deployed_connection.connection_id}' and " 491 f"URL: {deployed_connection.connection_url}" 492 ) 493 494 495@mcp_tool( 496 open_world=True, 497 extra_help_text=CLOUD_AUTH_TIP_TEXT, 498) 499def run_cloud_sync( 500 ctx: Context, 501 connection_id: Annotated[ 502 str, 503 Field(description="The ID of the Airbyte Cloud connection."), 504 ], 505 *, 506 workspace_id: Annotated[ 507 str | None, 508 Field( 509 description=WORKSPACE_ID_TIP_TEXT, 510 default=None, 511 ), 512 ], 513 wait: Annotated[ 514 bool, 515 Field( 516 description=( 517 "Whether to wait for the sync to complete. Since a sync can take between several " 518 "minutes and several hours, this option is not recommended for most " 519 "scenarios." 520 ), 521 default=False, 522 ), 523 ], 524 wait_timeout: Annotated[ 525 int, 526 Field( 527 description="Maximum time to wait for sync completion (seconds).", 528 default=300, 529 ), 530 ], 531) -> str: 532 """Run a sync job on Airbyte Cloud.""" 533 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 534 connection = workspace.get_connection(connection_id=connection_id) 535 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 536 537 if wait: 538 status = sync_result.get_job_status() 539 return ( 540 f"Sync completed with status: {status}. " 541 f"Job ID is '{sync_result.job_id}' and " 542 f"job URL is: {sync_result.job_url}" 543 ) 544 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 545 546 547@mcp_tool( 548 read_only=True, 549 idempotent=True, 550 open_world=True, 551 extra_help_text=CLOUD_AUTH_TIP_TEXT, 552) 553def check_airbyte_cloud_workspace( 554 ctx: Context, 555 *, 556 workspace_id: Annotated[ 557 str | None, 558 Field( 559 description=WORKSPACE_ID_TIP_TEXT, 560 default=None, 561 ), 562 ], 563) -> CloudWorkspaceResult: 564 """Check if we have a valid Airbyte Cloud connection and return workspace info. 565 566 Returns workspace details including workspace ID, name, organization info, and billing status. 567 """ 568 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 569 570 # Get workspace details from the public API using workspace's credentials 571 workspace_response = api_util.get_workspace( 572 workspace_id=workspace.workspace_id, 573 api_root=workspace.api_root, 574 client_id=workspace.client_id, 575 client_secret=workspace.client_secret, 576 bearer_token=workspace.bearer_token, 577 ) 578 579 # Try to get organization info (including billing), but fail gracefully if we don't have 580 # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the 581 # organization, which may not be available with workspace-scoped credentials. 582 organization = workspace.get_organization(raise_on_error=False) 583 584 return CloudWorkspaceResult( 585 workspace_id=workspace_response.workspace_id, 586 workspace_name=workspace_response.name, 587 workspace_url=workspace.workspace_url, 588 organization_id=( 589 organization.organization_id 590 if organization 591 else "[unavailable - requires ORGANIZATION_READER permission]" 592 ), 593 organization_name=organization.organization_name if organization else None, 594 payment_status=organization.payment_status if organization else None, 595 subscription_status=organization.subscription_status if organization else None, 596 is_account_locked=organization.is_account_locked if organization else False, 597 ) 598 599 600@mcp_tool( 601 open_world=True, 602 extra_help_text=CLOUD_AUTH_TIP_TEXT, 603) 604def deploy_noop_destination_to_cloud( 605 ctx: Context, 606 name: str = "No-op Destination", 607 *, 608 workspace_id: Annotated[ 609 str | None, 610 Field( 611 description=WORKSPACE_ID_TIP_TEXT, 612 default=None, 613 ), 614 ], 615 unique: bool = True, 616) -> str: 617 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 618 destination = get_noop_destination() 619 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 620 deployed_destination = workspace.deploy_destination( 621 name=name, 622 destination=destination, 623 unique=unique, 624 ) 625 register_guid_created_in_session(deployed_destination.connector_id) 626 return ( 627 f"Successfully deployed No-op Destination " 628 f"with ID '{deployed_destination.connector_id}' and " 629 f"URL: {deployed_destination.connector_url}" 630 ) 631 632 633@mcp_tool( 634 read_only=True, 635 idempotent=True, 636 open_world=True, 637 extra_help_text=CLOUD_AUTH_TIP_TEXT, 638) 639def get_cloud_sync_status( 640 ctx: Context, 641 connection_id: Annotated[ 642 str, 643 Field( 644 description="The ID of the Airbyte Cloud connection.", 645 ), 646 ], 647 job_id: Annotated[ 648 int | None, 649 Field( 650 description="Optional job ID. If not provided, the latest job will be used.", 651 default=None, 652 ), 653 ], 654 *, 655 workspace_id: Annotated[ 656 str | None, 657 Field( 658 description=WORKSPACE_ID_TIP_TEXT, 659 default=None, 660 ), 661 ], 662 include_attempts: Annotated[ 663 bool, 664 Field( 665 description="Whether to include detailed attempts information.", 666 default=False, 667 ), 668 ], 669) -> dict[str, Any]: 670 """Get the status of a sync job from the Airbyte Cloud.""" 671 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 672 connection = workspace.get_connection(connection_id=connection_id) 673 674 # If a job ID is provided, get the job by ID. 675 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 676 677 if not sync_result: 678 return {"status": None, "job_id": None, "attempts": []} 679 680 result = { 681 "status": sync_result.get_job_status(), 682 "job_id": sync_result.job_id, 683 "bytes_synced": sync_result.bytes_synced, 684 "records_synced": sync_result.records_synced, 685 "start_time": sync_result.start_time.isoformat(), 686 "job_url": sync_result.job_url, 687 "attempts": [], 688 } 689 690 if include_attempts: 691 attempts = sync_result.get_attempts() 692 result["attempts"] = [ 693 { 694 "attempt_number": attempt.attempt_number, 695 "attempt_id": attempt.attempt_id, 696 "status": attempt.status, 697 "bytes_synced": attempt.bytes_synced, 698 "records_synced": attempt.records_synced, 699 "created_at": attempt.created_at.isoformat(), 700 } 701 for attempt in attempts 702 ] 703 704 return result 705 706 707@mcp_tool( 708 read_only=True, 709 idempotent=True, 710 open_world=True, 711 extra_help_text=CLOUD_AUTH_TIP_TEXT, 712) 713def list_cloud_sync_jobs( 714 ctx: Context, 715 connection_id: Annotated[ 716 str, 717 Field(description="The ID of the Airbyte Cloud connection."), 718 ], 719 *, 720 workspace_id: Annotated[ 721 str | None, 722 Field( 723 description=WORKSPACE_ID_TIP_TEXT, 724 default=None, 725 ), 726 ], 727 max_jobs: Annotated[ 728 int, 729 Field( 730 description=( 731 "Maximum number of jobs to return. " 732 "Defaults to 20 if not specified. " 733 "Maximum allowed value is 500." 734 ), 735 default=20, 736 ), 737 ], 738 from_tail: Annotated[ 739 bool | None, 740 Field( 741 description=( 742 "When True, jobs are ordered newest-first (createdAt DESC). " 743 "When False, jobs are ordered oldest-first (createdAt ASC). " 744 "Defaults to True." 745 ), 746 default=None, 747 ), 748 ], 749 job_type: Annotated[ 750 JobTypeEnum | None, 751 Field( 752 description=( 753 "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. " 754 "If not specified, defaults to sync and reset jobs only (API default). " 755 "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs." 756 ), 757 default=None, 758 ), 759 ], 760) -> SyncJobListResult: 761 """List sync jobs for a connection with limit support. 762 763 This tool allows you to retrieve a list of sync jobs for a connection, 764 with control over ordering and result limit. By default, jobs are returned 765 newest-first (`from_tail=True`). 766 """ 767 if from_tail is None: 768 from_tail = True 769 770 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 771 connection = workspace.get_connection(connection_id=connection_id) 772 773 # Cap at 500 to avoid overloading agent context 774 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 775 776 sync_results = connection.get_previous_sync_logs( 777 limit=effective_limit, 778 from_tail=from_tail, 779 job_type=job_type, 780 ) 781 782 jobs = [ 783 SyncJobResult( 784 job_id=sync_result.job_id, 785 status=str(sync_result.get_job_status()), 786 bytes_synced=sync_result.bytes_synced, 787 records_synced=sync_result.records_synced, 788 start_time=sync_result.start_time.isoformat(), 789 job_url=sync_result.job_url, 790 ) 791 for sync_result in sync_results 792 ] 793 794 return SyncJobListResult( 795 jobs=jobs, 796 jobs_count=len(jobs), 797 from_tail=from_tail, 798 ) 799 800 801@mcp_tool( 802 read_only=True, 803 idempotent=True, 804 open_world=True, 805 extra_help_text=CLOUD_AUTH_TIP_TEXT, 806) 807def list_deployed_cloud_source_connectors( 808 ctx: Context, 809 *, 810 workspace_id: Annotated[ 811 str | None, 812 Field( 813 description=WORKSPACE_ID_TIP_TEXT, 814 default=None, 815 ), 816 ], 817 name_contains: Annotated[ 818 str | None, 819 Field( 820 description="Optional case-insensitive substring to filter sources by name", 821 default=None, 822 ), 823 ], 824 limit: Annotated[ 825 int | None, 826 Field( 827 description="Optional maximum number of items to return (default: no limit)", 828 default=None, 829 ), 830 ], 831) -> list[CloudSourceResult]: 832 """List all deployed source connectors in the Airbyte Cloud workspace.""" 833 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 834 sources = workspace.list_sources(limit=None if name_contains else limit) 835 836 # Filter by name if requested 837 if name_contains: 838 needle = name_contains.lower() 839 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 840 if limit is not None: 841 sources = sources[:limit] 842 843 # Note: name and url are guaranteed non-null from list API responses 844 return [ 845 CloudSourceResult( 846 id=source.source_id, 847 name=cast(str, source.name), 848 url=cast(str, source.connector_url), 849 ) 850 for source in sources 851 ] 852 853 854@mcp_tool( 855 read_only=True, 856 idempotent=True, 857 open_world=True, 858 extra_help_text=CLOUD_AUTH_TIP_TEXT, 859) 860def list_deployed_cloud_destination_connectors( 861 ctx: Context, 862 *, 863 workspace_id: Annotated[ 864 str | None, 865 Field( 866 description=WORKSPACE_ID_TIP_TEXT, 867 default=None, 868 ), 869 ], 870 name_contains: Annotated[ 871 str | None, 872 Field( 873 description="Optional case-insensitive substring to filter destinations by name", 874 default=None, 875 ), 876 ], 877 limit: Annotated[ 878 int | None, 879 Field( 880 description="Optional maximum number of items to return (default: no limit)", 881 default=None, 882 ), 883 ], 884) -> list[CloudDestinationResult]: 885 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 886 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 887 destinations = workspace.list_destinations(limit=None if name_contains else limit) 888 889 # Filter by name if requested 890 if name_contains: 891 needle = name_contains.lower() 892 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 893 if limit is not None: 894 destinations = destinations[:limit] 895 896 # Note: name and url are guaranteed non-null from list API responses 897 return [ 898 CloudDestinationResult( 899 id=destination.destination_id, 900 name=cast(str, destination.name), 901 url=cast(str, destination.connector_url), 902 ) 903 for destination in destinations 904 ] 905 906 907@mcp_tool( 908 read_only=True, 909 idempotent=True, 910 open_world=True, 911 extra_help_text=CLOUD_AUTH_TIP_TEXT, 912) 913def describe_cloud_source( 914 ctx: Context, 915 source_id: Annotated[ 916 str, 917 Field(description="The ID of the source to describe."), 918 ], 919 *, 920 workspace_id: Annotated[ 921 str | None, 922 Field( 923 description=WORKSPACE_ID_TIP_TEXT, 924 default=None, 925 ), 926 ], 927) -> CloudSourceDetails: 928 """Get detailed information about a specific deployed source connector.""" 929 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 930 source = workspace.get_source(source_id=source_id) 931 932 # Access name property to ensure _connector_info is populated 933 source_name = cast(str, source.name) 934 935 return CloudSourceDetails( 936 source_id=source.source_id, 937 source_name=source_name, 938 source_url=source.connector_url, 939 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 940 ) 941 942 943@mcp_tool( 944 read_only=True, 945 idempotent=True, 946 open_world=True, 947 extra_help_text=CLOUD_AUTH_TIP_TEXT, 948) 949def describe_cloud_destination( 950 ctx: Context, 951 destination_id: Annotated[ 952 str, 953 Field(description="The ID of the destination to describe."), 954 ], 955 *, 956 workspace_id: Annotated[ 957 str | None, 958 Field( 959 description=WORKSPACE_ID_TIP_TEXT, 960 default=None, 961 ), 962 ], 963) -> CloudDestinationDetails: 964 """Get detailed information about a specific deployed destination connector.""" 965 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 966 destination = workspace.get_destination(destination_id=destination_id) 967 968 # Access name property to ensure _connector_info is populated 969 destination_name = cast(str, destination.name) 970 971 return CloudDestinationDetails( 972 destination_id=destination.destination_id, 973 destination_name=destination_name, 974 destination_url=destination.connector_url, 975 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 976 ) 977 978 979@mcp_tool( 980 read_only=True, 981 idempotent=True, 982 open_world=True, 983 extra_help_text=CLOUD_AUTH_TIP_TEXT, 984) 985def describe_cloud_connection( 986 ctx: Context, 987 connection_id: Annotated[ 988 str, 989 Field(description="The ID of the connection to describe."), 990 ], 991 *, 992 workspace_id: Annotated[ 993 str | None, 994 Field( 995 description=WORKSPACE_ID_TIP_TEXT, 996 default=None, 997 ), 998 ], 999) -> CloudConnectionDetails: 1000 """Get detailed information about a specific deployed connection.""" 1001 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1002 connection = workspace.get_connection(connection_id=connection_id) 1003 1004 return CloudConnectionDetails( 1005 connection_id=connection.connection_id, 1006 connection_name=cast(str, connection.name), 1007 connection_url=cast(str, connection.connection_url), 1008 source_id=connection.source_id, 1009 source_name=cast(str, connection.source.name), 1010 destination_id=connection.destination_id, 1011 destination_name=cast(str, connection.destination.name), 1012 selected_streams=connection.stream_names, 1013 table_prefix=connection.table_prefix, 1014 ) 1015 1016 1017@mcp_tool( 1018 read_only=True, 1019 idempotent=True, 1020 open_world=True, 1021 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1022) 1023def get_cloud_sync_logs( 1024 ctx: Context, 1025 connection_id: Annotated[ 1026 str, 1027 Field(description="The ID of the Airbyte Cloud connection."), 1028 ], 1029 job_id: Annotated[ 1030 int | None, 1031 Field(description="Optional job ID. If not provided, the latest job will be used."), 1032 ] = None, 1033 attempt_number: Annotated[ 1034 int | None, 1035 Field( 1036 description="Optional attempt number. If not provided, the latest attempt will be used." 1037 ), 1038 ] = None, 1039 *, 1040 workspace_id: Annotated[ 1041 str | None, 1042 Field( 1043 description=WORKSPACE_ID_TIP_TEXT, 1044 default=None, 1045 ), 1046 ], 1047 max_lines: Annotated[ 1048 int, 1049 Field( 1050 description=( 1051 "Maximum number of lines to return. " 1052 "Defaults to 4000 if not specified. " 1053 "If '0' is provided, no limit is applied." 1054 ), 1055 default=4000, 1056 ), 1057 ], 1058 from_tail: Annotated[ 1059 bool | None, 1060 Field( 1061 description=( 1062 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1063 "Defaults to True if `line_offset` is not specified. " 1064 "Cannot combine `from_tail=True` with `line_offset`." 1065 ), 1066 default=None, 1067 ), 1068 ], 1069 line_offset: Annotated[ 1070 int | None, 1071 Field( 1072 description=( 1073 "Number of lines to skip from the beginning of the logs. " 1074 "Cannot be combined with `from_tail=True`." 1075 ), 1076 default=None, 1077 ), 1078 ], 1079) -> LogReadResult: 1080 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1081 # Validate that line_offset and from_tail are not both set 1082 if line_offset is not None and from_tail: 1083 raise PyAirbyteInputError( 1084 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1085 context={"line_offset": line_offset, "from_tail": from_tail}, 1086 ) 1087 1088 if from_tail is None and line_offset is None: 1089 from_tail = True 1090 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1091 connection = workspace.get_connection(connection_id=connection_id) 1092 1093 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1094 1095 if not sync_result: 1096 raise AirbyteMissingResourceError( 1097 resource_type="sync job", 1098 resource_name_or_id=connection_id, 1099 ) 1100 1101 attempts = sync_result.get_attempts() 1102 1103 if not attempts: 1104 raise AirbyteMissingResourceError( 1105 resource_type="sync attempt", 1106 resource_name_or_id=str(sync_result.job_id), 1107 ) 1108 1109 if attempt_number is not None: 1110 target_attempt = None 1111 for attempt in attempts: 1112 if attempt.attempt_number == attempt_number: 1113 target_attempt = attempt 1114 break 1115 1116 if target_attempt is None: 1117 raise AirbyteMissingResourceError( 1118 resource_type="sync attempt", 1119 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1120 ) 1121 else: 1122 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1123 1124 logs = target_attempt.get_full_log_text() 1125 1126 if not logs: 1127 # Return empty result with zero lines 1128 return LogReadResult( 1129 log_text=( 1130 f"[No logs available for job '{sync_result.job_id}', " 1131 f"attempt {target_attempt.attempt_number}.]" 1132 ), 1133 log_text_start_line=1, 1134 log_text_line_count=0, 1135 total_log_lines_available=0, 1136 job_id=sync_result.job_id, 1137 attempt_number=target_attempt.attempt_number, 1138 ) 1139 1140 # Apply line limiting 1141 log_lines = logs.splitlines() 1142 total_lines = len(log_lines) 1143 1144 # Determine effective max_lines (0 means no limit) 1145 effective_max = total_lines if max_lines == 0 else max_lines 1146 1147 # Calculate start_index and slice based on from_tail or line_offset 1148 if from_tail: 1149 start_index = max(0, total_lines - effective_max) 1150 selected_lines = log_lines[start_index:][:effective_max] 1151 else: 1152 start_index = line_offset or 0 1153 selected_lines = log_lines[start_index : start_index + effective_max] 1154 1155 return LogReadResult( 1156 log_text="\n".join(selected_lines), 1157 log_text_start_line=start_index + 1, # Convert to 1-based index 1158 log_text_line_count=len(selected_lines), 1159 total_log_lines_available=total_lines, 1160 job_id=sync_result.job_id, 1161 attempt_number=target_attempt.attempt_number, 1162 ) 1163 1164 1165@mcp_tool( 1166 read_only=True, 1167 idempotent=True, 1168 open_world=True, 1169 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1170) 1171def list_deployed_cloud_connections( 1172 ctx: Context, 1173 *, 1174 workspace_id: Annotated[ 1175 str | None, 1176 Field( 1177 description=WORKSPACE_ID_TIP_TEXT, 1178 default=None, 1179 ), 1180 ], 1181 name_contains: Annotated[ 1182 str | None, 1183 Field( 1184 description="Optional case-insensitive substring to filter connections by name", 1185 default=None, 1186 ), 1187 ], 1188 limit: Annotated[ 1189 int | None, 1190 Field( 1191 description="Optional maximum number of items to return (default: no limit)", 1192 default=None, 1193 ), 1194 ], 1195 with_connection_status: Annotated[ 1196 bool | None, 1197 Field( 1198 description="If True, include status info for each connection's most recent sync job", 1199 default=False, 1200 ), 1201 ], 1202 failing_connections_only: Annotated[ 1203 bool | None, 1204 Field( 1205 description="If True, only return connections with failed/cancelled last sync", 1206 default=False, 1207 ), 1208 ], 1209) -> list[CloudConnectionResult]: 1210 """List all deployed connections in the Airbyte Cloud workspace. 1211 1212 When with_connection_status is True, each connection result will include 1213 information about the most recent sync job status, skipping over any 1214 currently in-progress syncs to find the last completed job. 1215 1216 When failing_connections_only is True, only connections where the most 1217 recent completed sync job failed or was cancelled will be returned. 1218 This implicitly enables with_connection_status. 1219 """ 1220 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1221 connections = workspace.list_connections( 1222 limit=None if name_contains or failing_connections_only else limit 1223 ) 1224 1225 # Filter by name if requested 1226 if name_contains: 1227 needle = name_contains.lower() 1228 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1229 1230 # If failing_connections_only is True, implicitly enable with_connection_status 1231 if failing_connections_only: 1232 with_connection_status = True 1233 1234 results: list[CloudConnectionResult] = [] 1235 1236 for connection in connections: 1237 last_job_status: str | None = None 1238 last_job_id: int | None = None 1239 last_job_time: str | None = None 1240 currently_running_job_id: int | None = None 1241 currently_running_job_start_time: str | None = None 1242 1243 if with_connection_status: 1244 sync_logs = connection.get_previous_sync_logs(limit=5) 1245 last_completed_job_status = None # Keep enum for comparison 1246 1247 for sync_result in sync_logs: 1248 job_status = sync_result.get_job_status() 1249 1250 if not sync_result.is_job_complete(): 1251 currently_running_job_id = sync_result.job_id 1252 currently_running_job_start_time = sync_result.start_time.isoformat() 1253 continue 1254 1255 last_completed_job_status = job_status 1256 last_job_status = str(job_status.value) if job_status else None 1257 last_job_id = sync_result.job_id 1258 last_job_time = sync_result.start_time.isoformat() 1259 break 1260 1261 if failing_connections_only and ( 1262 last_completed_job_status is None 1263 or last_completed_job_status not in FAILED_STATUSES 1264 ): 1265 continue 1266 1267 results.append( 1268 CloudConnectionResult( 1269 id=connection.connection_id, 1270 name=cast(str, connection.name), 1271 url=cast(str, connection.connection_url), 1272 source_id=connection.source_id, 1273 destination_id=connection.destination_id, 1274 last_job_status=last_job_status, 1275 last_job_id=last_job_id, 1276 last_job_time=last_job_time, 1277 currently_running_job_id=currently_running_job_id, 1278 currently_running_job_start_time=currently_running_job_start_time, 1279 ) 1280 ) 1281 1282 if limit is not None and len(results) >= limit: 1283 break 1284 1285 return results 1286 1287 1288def _resolve_organization_id( 1289 organization_id: str | None, 1290 organization_name: str | None, 1291 *, 1292 client: CloudClient, 1293) -> str: 1294 """Resolve organization ID from either ID or exact name match.""" 1295 if organization_id is not None: 1296 return organization_id 1297 1298 org = client.get_organization( 1299 organization_id=organization_id, 1300 organization_name=organization_name, 1301 ) 1302 return org.organization_id 1303 1304 1305@mcp_tool( 1306 read_only=True, 1307 idempotent=True, 1308 open_world=True, 1309 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1310) 1311def list_cloud_workspaces( 1312 ctx: Context, 1313 *, 1314 organization_id: Annotated[ 1315 str | None, 1316 Field( 1317 description="Organization ID. Required if organization_name is not provided.", 1318 default=None, 1319 ), 1320 ], 1321 organization_name: Annotated[ 1322 str | None, 1323 Field( 1324 description=( 1325 "Organization name (exact match). " "Required if organization_id is not provided." 1326 ), 1327 default=None, 1328 ), 1329 ], 1330 name_contains: Annotated[ 1331 str | None, 1332 Field( 1333 description="Optional substring to filter workspaces by name (server-side filtering)", 1334 default=None, 1335 ), 1336 ], 1337 limit: Annotated[ 1338 int | None, 1339 Field( 1340 description="Optional maximum number of items to return (default: no limit)", 1341 default=None, 1342 ), 1343 ], 1344) -> list[CloudWorkspaceResult]: 1345 """List all workspaces in a specific organization. 1346 1347 Requires either organization_id OR organization_name (exact match) to be provided. 1348 This tool will NOT list workspaces across all organizations - you must specify 1349 which organization to list workspaces from. 1350 """ 1351 client = _get_cloud_client(ctx) 1352 1353 resolved_org_id = _resolve_organization_id( 1354 organization_id=organization_id, 1355 organization_name=organization_name, 1356 client=client, 1357 ) 1358 1359 workspaces = client.list_workspaces( 1360 organization_id=resolved_org_id, 1361 name_contains=name_contains, 1362 limit=limit, 1363 ) 1364 1365 return [ 1366 CloudWorkspaceResult( 1367 workspace_id=ws.get("workspaceId", ""), 1368 workspace_name=ws.get("name", ""), 1369 organization_id=ws.get("organizationId", ""), 1370 ) 1371 for ws in workspaces 1372 ] 1373 1374 1375@mcp_tool( 1376 read_only=True, 1377 idempotent=True, 1378 open_world=True, 1379 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1380) 1381def describe_cloud_organization( 1382 ctx: Context, 1383 *, 1384 organization_id: Annotated[ 1385 str | None, 1386 Field( 1387 description="Organization ID. Required if organization_name is not provided.", 1388 default=None, 1389 ), 1390 ], 1391 organization_name: Annotated[ 1392 str | None, 1393 Field( 1394 description=( 1395 "Organization name (exact match). " "Required if organization_id is not provided." 1396 ), 1397 default=None, 1398 ), 1399 ], 1400) -> CloudOrganizationResult: 1401 """Get details about a specific organization including billing status. 1402 1403 Requires either organization_id OR organization_name (exact match) to be provided. 1404 This tool is useful for looking up an organization's ID from its name, or vice versa. 1405 """ 1406 org = _get_cloud_client(ctx).get_organization( 1407 organization_id=organization_id, 1408 organization_name=organization_name, 1409 ) 1410 1411 # CloudOrganization has lazy loading of billing properties 1412 return CloudOrganizationResult( 1413 id=org.organization_id, 1414 name=org.organization_name, 1415 email=org.email, 1416 payment_status=org.payment_status, 1417 subscription_status=org.subscription_status, 1418 is_account_locked=org.is_account_locked, 1419 ) 1420 1421 1422def _get_custom_source_definition_description( 1423 custom_source: CustomCloudSourceDefinition, 1424) -> str: 1425 return "\n".join( 1426 [ 1427 f" - Custom Source Name: {custom_source.name}", 1428 f" - Definition ID: {custom_source.definition_id}", 1429 f" - Definition Version: {custom_source.version}", 1430 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1431 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1432 ] 1433 ) 1434 1435 1436@mcp_tool( 1437 open_world=True, 1438 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1439) 1440def publish_custom_source_definition( 1441 ctx: Context, 1442 name: Annotated[ 1443 str, 1444 Field(description="The name for the custom connector definition."), 1445 ], 1446 *, 1447 workspace_id: Annotated[ 1448 str | None, 1449 Field( 1450 description=WORKSPACE_ID_TIP_TEXT, 1451 default=None, 1452 ), 1453 ], 1454 manifest_yaml: Annotated[ 1455 str | Path | None, 1456 Field( 1457 description=( 1458 "The Low-code CDK manifest as a YAML string or file path. " 1459 "Required for YAML connectors." 1460 ), 1461 default=None, 1462 ), 1463 ] = None, 1464 unique: Annotated[ 1465 bool, 1466 Field( 1467 description="Whether to require a unique name.", 1468 default=True, 1469 ), 1470 ] = True, 1471 pre_validate: Annotated[ 1472 bool, 1473 Field( 1474 description="Whether to validate the manifest client-side before publishing.", 1475 default=True, 1476 ), 1477 ] = True, 1478 testing_values: Annotated[ 1479 dict | str | None, 1480 Field( 1481 description=( 1482 "Optional testing configuration values for the Builder UI. " 1483 "Can be provided as a JSON object or JSON string. " 1484 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1485 "If provided, these values replace any existing testing values " 1486 "for the connector builder project, allowing immediate test read operations." 1487 ), 1488 default=None, 1489 ), 1490 ], 1491 testing_values_secret_name: Annotated[ 1492 str | None, 1493 Field( 1494 description=( 1495 "Optional name of a secret containing testing configuration values " 1496 "in JSON or YAML format. The secret will be resolved by the MCP " 1497 "server and merged into testing_values, with secret values taking " 1498 "precedence. This lets the agent reference secrets without sending " 1499 "raw values as tool arguments." 1500 ), 1501 default=None, 1502 ), 1503 ], 1504) -> str: 1505 """Publish a custom YAML source connector definition to Airbyte Cloud. 1506 1507 Note: Only YAML (declarative) connectors are currently supported. 1508 Docker-based custom sources are not yet available. 1509 """ 1510 processed_manifest = manifest_yaml 1511 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1512 processed_manifest = Path(manifest_yaml) 1513 1514 # Resolve testing values from inline config and/or secret 1515 testing_values_dict: dict[str, Any] | None = None 1516 if testing_values is not None or testing_values_secret_name is not None: 1517 testing_values_dict = ( 1518 resolve_connector_config( 1519 config=testing_values, 1520 config_secret_name=testing_values_secret_name, 1521 ) 1522 or None 1523 ) 1524 1525 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1526 custom_source = workspace.publish_custom_source_definition( 1527 name=name, 1528 manifest_yaml=processed_manifest, 1529 unique=unique, 1530 pre_validate=pre_validate, 1531 testing_values=testing_values_dict, 1532 ) 1533 register_guid_created_in_session(custom_source.definition_id) 1534 return ( 1535 "Successfully published custom YAML source definition:\n" 1536 + _get_custom_source_definition_description( 1537 custom_source=custom_source, 1538 ) 1539 + "\n" 1540 ) 1541 1542 1543@mcp_tool( 1544 read_only=True, 1545 idempotent=True, 1546 open_world=True, 1547) 1548def list_custom_source_definitions( 1549 ctx: Context, 1550 *, 1551 workspace_id: Annotated[ 1552 str | None, 1553 Field( 1554 description=WORKSPACE_ID_TIP_TEXT, 1555 default=None, 1556 ), 1557 ], 1558) -> list[dict[str, Any]]: 1559 """List custom YAML source definitions in the Airbyte Cloud workspace. 1560 1561 Note: Only YAML (declarative) connectors are currently supported. 1562 Docker-based custom sources are not yet available. 1563 """ 1564 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1565 definitions = workspace.list_custom_source_definitions( 1566 definition_type="yaml", 1567 ) 1568 1569 return [ 1570 { 1571 "definition_id": d.definition_id, 1572 "name": d.name, 1573 "version": d.version, 1574 "connector_builder_project_url": d.connector_builder_project_url, 1575 } 1576 for d in definitions 1577 ] 1578 1579 1580@mcp_tool( 1581 read_only=True, 1582 idempotent=True, 1583 open_world=True, 1584) 1585def get_custom_source_definition( 1586 ctx: Context, 1587 definition_id: Annotated[ 1588 str, 1589 Field(description="The ID of the custom source definition to retrieve."), 1590 ], 1591 *, 1592 workspace_id: Annotated[ 1593 str | None, 1594 Field( 1595 description=WORKSPACE_ID_TIP_TEXT, 1596 default=None, 1597 ), 1598 ], 1599 include_draft: Annotated[ 1600 bool, 1601 Field( 1602 description=( 1603 "Whether to include the Connector Builder draft manifest in the response. " 1604 "If True and a draft exists, the response will include 'has_draft' and " 1605 "'draft_manifest' fields. Defaults to False." 1606 ), 1607 default=False, 1608 ), 1609 ] = False, 1610) -> dict[str, Any]: 1611 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1612 1613 Returns the full definition details including the published manifest YAML content. 1614 Optionally includes the Connector Builder draft manifest (unpublished changes) 1615 when include_draft=True. 1616 1617 Note: Only YAML (declarative) connectors are currently supported. 1618 Docker-based custom sources are not yet available. 1619 """ 1620 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1621 definition = workspace.get_custom_source_definition( 1622 definition_id=definition_id, 1623 definition_type="yaml", 1624 ) 1625 1626 result: dict[str, Any] = { 1627 "definition_id": definition.definition_id, 1628 "name": definition.name, 1629 "version": definition.version, 1630 "connector_builder_project_id": definition.connector_builder_project_id, 1631 "connector_builder_project_url": definition.connector_builder_project_url, 1632 "manifest": definition.manifest, 1633 } 1634 1635 if include_draft: 1636 result["has_draft"] = definition.has_draft 1637 result["draft_manifest"] = definition.draft_manifest 1638 1639 return result 1640 1641 1642@mcp_tool( 1643 read_only=True, 1644 idempotent=True, 1645 open_world=True, 1646) 1647def get_connector_builder_draft_manifest( 1648 ctx: Context, 1649 definition_id: Annotated[ 1650 str, 1651 Field(description="The ID of the custom source definition to retrieve the draft for."), 1652 ], 1653 *, 1654 workspace_id: Annotated[ 1655 str | None, 1656 Field( 1657 description=WORKSPACE_ID_TIP_TEXT, 1658 default=None, 1659 ), 1660 ], 1661) -> dict[str, Any]: 1662 """Get the Connector Builder draft manifest for a custom source definition. 1663 1664 Returns the working draft manifest that has been saved in the Connector Builder UI 1665 but not yet published. This is useful for inspecting what a user is currently working 1666 on before they publish their changes. 1667 1668 If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. 1669 The published manifest is always included for comparison. 1670 """ 1671 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1672 definition = workspace.get_custom_source_definition( 1673 definition_id=definition_id, 1674 definition_type="yaml", 1675 ) 1676 1677 return { 1678 "definition_id": definition.definition_id, 1679 "name": definition.name, 1680 "connector_builder_project_id": definition.connector_builder_project_id, 1681 "connector_builder_project_url": definition.connector_builder_project_url, 1682 "has_draft": definition.has_draft, 1683 "draft_manifest": definition.draft_manifest, 1684 "published_manifest": definition.manifest, 1685 } 1686 1687 1688@mcp_tool( 1689 destructive=True, 1690 open_world=True, 1691) 1692def update_custom_source_definition( 1693 ctx: Context, 1694 definition_id: Annotated[ 1695 str, 1696 Field(description="The ID of the definition to update."), 1697 ], 1698 manifest_yaml: Annotated[ 1699 str | Path | None, 1700 Field( 1701 description=( 1702 "New manifest as YAML string or file path. " 1703 "Optional; omit to update only testing values." 1704 ), 1705 default=None, 1706 ), 1707 ] = None, 1708 *, 1709 workspace_id: Annotated[ 1710 str | None, 1711 Field( 1712 description=WORKSPACE_ID_TIP_TEXT, 1713 default=None, 1714 ), 1715 ], 1716 pre_validate: Annotated[ 1717 bool, 1718 Field( 1719 description="Whether to validate the manifest client-side before updating.", 1720 default=True, 1721 ), 1722 ] = True, 1723 testing_values: Annotated[ 1724 dict | str | None, 1725 Field( 1726 description=( 1727 "Optional testing configuration values for the Builder UI. " 1728 "Can be provided as a JSON object or JSON string. " 1729 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1730 "If provided, these values replace any existing testing values " 1731 "for the connector builder project. The entire testing values object " 1732 "is overwritten, so pass the full set of values you want to persist." 1733 ), 1734 default=None, 1735 ), 1736 ], 1737 testing_values_secret_name: Annotated[ 1738 str | None, 1739 Field( 1740 description=( 1741 "Optional name of a secret containing testing configuration values " 1742 "in JSON or YAML format. The secret will be resolved by the MCP " 1743 "server and merged into testing_values, with secret values taking " 1744 "precedence. This lets the agent reference secrets without sending " 1745 "raw values as tool arguments." 1746 ), 1747 default=None, 1748 ), 1749 ], 1750) -> str: 1751 """Update a custom YAML source definition in Airbyte Cloud. 1752 1753 Updates the manifest and/or testing values for an existing custom source definition. 1754 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1755 """ 1756 check_guid_created_in_session(definition_id) 1757 1758 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1759 1760 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1761 raise PyAirbyteInputError( 1762 message=( 1763 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1764 "must be provided to update a custom source definition." 1765 ), 1766 context={ 1767 "definition_id": definition_id, 1768 "workspace_id": workspace.workspace_id, 1769 }, 1770 ) 1771 1772 processed_manifest: str | Path | None = manifest_yaml 1773 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1774 processed_manifest = Path(manifest_yaml) 1775 1776 # Resolve testing values from inline config and/or secret 1777 testing_values_dict: dict[str, Any] | None = None 1778 if testing_values is not None or testing_values_secret_name is not None: 1779 testing_values_dict = ( 1780 resolve_connector_config( 1781 config=testing_values, 1782 config_secret_name=testing_values_secret_name, 1783 ) 1784 or None 1785 ) 1786 1787 definition = workspace.get_custom_source_definition( 1788 definition_id=definition_id, 1789 definition_type="yaml", 1790 ) 1791 custom_source: CustomCloudSourceDefinition = definition 1792 1793 if processed_manifest is not None: 1794 custom_source = definition.update_definition( 1795 manifest_yaml=processed_manifest, 1796 pre_validate=pre_validate, 1797 ) 1798 1799 if testing_values_dict is not None: 1800 custom_source.set_testing_values(testing_values_dict) 1801 1802 return ( 1803 "Successfully updated custom YAML source definition:\n" 1804 + _get_custom_source_definition_description( 1805 custom_source=custom_source, 1806 ) 1807 ) 1808 1809 1810@mcp_tool( 1811 destructive=True, 1812 open_world=True, 1813) 1814def permanently_delete_custom_source_definition( 1815 ctx: Context, 1816 definition_id: Annotated[ 1817 str, 1818 Field(description="The ID of the custom source definition to delete."), 1819 ], 1820 name: Annotated[ 1821 str, 1822 Field(description="The expected name of the custom source definition (for verification)."), 1823 ], 1824 *, 1825 workspace_id: Annotated[ 1826 str | None, 1827 Field( 1828 description=WORKSPACE_ID_TIP_TEXT, 1829 default=None, 1830 ), 1831 ], 1832) -> str: 1833 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1834 1835 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1836 (case insensitive). 1837 1838 If the connector does not meet this requirement, the deletion will be rejected with a 1839 helpful error message. Instruct the user to rename the connector appropriately to authorize 1840 the deletion. 1841 1842 The provided name must match the actual name of the definition for the operation to proceed. 1843 This is a safety measure to ensure you are deleting the correct resource. 1844 1845 Note: Only YAML (declarative) connectors are currently supported. 1846 Docker-based custom sources are not yet available. 1847 """ 1848 check_guid_created_in_session(definition_id) 1849 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1850 definition = workspace.get_custom_source_definition( 1851 definition_id=definition_id, 1852 definition_type="yaml", 1853 ) 1854 actual_name: str = definition.name 1855 1856 # Verify the name matches 1857 if actual_name != name: 1858 raise PyAirbyteInputError( 1859 message=( 1860 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1861 "The provided name must exactly match the definition's actual name. " 1862 "This is a safety measure to prevent accidental deletion." 1863 ), 1864 context={ 1865 "definition_id": definition_id, 1866 "expected_name": name, 1867 "actual_name": actual_name, 1868 }, 1869 ) 1870 1871 definition.permanently_delete( 1872 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1873 ) 1874 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 1875 1876 1877@mcp_tool( 1878 destructive=True, 1879 open_world=True, 1880 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1881) 1882def permanently_delete_cloud_source( 1883 ctx: Context, 1884 source_id: Annotated[ 1885 str, 1886 Field(description="The ID of the deployed source to delete."), 1887 ], 1888 name: Annotated[ 1889 str, 1890 Field(description="The expected name of the source (for verification)."), 1891 ], 1892) -> str: 1893 """Permanently delete a deployed source connector from Airbyte Cloud. 1894 1895 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1896 (case insensitive). 1897 1898 If the source does not meet this requirement, the deletion will be rejected with a 1899 helpful error message. Instruct the user to rename the source appropriately to authorize 1900 the deletion. 1901 1902 The provided name must match the actual name of the source for the operation to proceed. 1903 This is a safety measure to ensure you are deleting the correct resource. 1904 """ 1905 check_guid_created_in_session(source_id) 1906 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 1907 source = workspace.get_source(source_id=source_id) 1908 actual_name: str = cast(str, source.name) 1909 1910 # Verify the name matches 1911 if actual_name != name: 1912 raise PyAirbyteInputError( 1913 message=( 1914 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1915 "The provided name must exactly match the source's actual name. " 1916 "This is a safety measure to prevent accidental deletion." 1917 ), 1918 context={ 1919 "source_id": source_id, 1920 "expected_name": name, 1921 "actual_name": actual_name, 1922 }, 1923 ) 1924 1925 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1926 workspace.permanently_delete_source( 1927 source=source_id, 1928 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1929 ) 1930 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 1931 1932 1933@mcp_tool( 1934 destructive=True, 1935 open_world=True, 1936 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1937) 1938def permanently_delete_cloud_destination( 1939 ctx: Context, 1940 destination_id: Annotated[ 1941 str, 1942 Field(description="The ID of the deployed destination to delete."), 1943 ], 1944 name: Annotated[ 1945 str, 1946 Field(description="The expected name of the destination (for verification)."), 1947 ], 1948) -> str: 1949 """Permanently delete a deployed destination connector from Airbyte Cloud. 1950 1951 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1952 (case insensitive). 1953 1954 If the destination does not meet this requirement, the deletion will be rejected with a 1955 helpful error message. Instruct the user to rename the destination appropriately to authorize 1956 the deletion. 1957 1958 The provided name must match the actual name of the destination for the operation to proceed. 1959 This is a safety measure to ensure you are deleting the correct resource. 1960 """ 1961 check_guid_created_in_session(destination_id) 1962 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 1963 destination = workspace.get_destination(destination_id=destination_id) 1964 actual_name: str = cast(str, destination.name) 1965 1966 # Verify the name matches 1967 if actual_name != name: 1968 raise PyAirbyteInputError( 1969 message=( 1970 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1971 "The provided name must exactly match the destination's actual name. " 1972 "This is a safety measure to prevent accidental deletion." 1973 ), 1974 context={ 1975 "destination_id": destination_id, 1976 "expected_name": name, 1977 "actual_name": actual_name, 1978 }, 1979 ) 1980 1981 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1982 workspace.permanently_delete_destination( 1983 destination=destination_id, 1984 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1985 ) 1986 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 1987 1988 1989@mcp_tool( 1990 destructive=True, 1991 open_world=True, 1992 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1993) 1994def permanently_delete_cloud_connection( 1995 ctx: Context, 1996 connection_id: Annotated[ 1997 str, 1998 Field(description="The ID of the connection to delete."), 1999 ], 2000 name: Annotated[ 2001 str, 2002 Field(description="The expected name of the connection (for verification)."), 2003 ], 2004 *, 2005 cascade_delete_source: Annotated[ 2006 bool, 2007 Field( 2008 description=( 2009 "Whether to also delete the source connector associated with this connection." 2010 ), 2011 default=False, 2012 ), 2013 ] = False, 2014 cascade_delete_destination: Annotated[ 2015 bool, 2016 Field( 2017 description=( 2018 "Whether to also delete the destination connector associated with this connection." 2019 ), 2020 default=False, 2021 ), 2022 ] = False, 2023) -> str: 2024 """Permanently delete a connection from Airbyte Cloud. 2025 2026 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2027 (case insensitive). 2028 2029 If the connection does not meet this requirement, the deletion will be rejected with a 2030 helpful error message. Instruct the user to rename the connection appropriately to authorize 2031 the deletion. 2032 2033 The provided name must match the actual name of the connection for the operation to proceed. 2034 This is a safety measure to ensure you are deleting the correct resource. 2035 """ 2036 check_guid_created_in_session(connection_id) 2037 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2038 connection = workspace.get_connection(connection_id=connection_id) 2039 actual_name: str = cast(str, connection.name) 2040 2041 # Verify the name matches 2042 if actual_name != name: 2043 raise PyAirbyteInputError( 2044 message=( 2045 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2046 "The provided name must exactly match the connection's actual name. " 2047 "This is a safety measure to prevent accidental deletion." 2048 ), 2049 context={ 2050 "connection_id": connection_id, 2051 "expected_name": name, 2052 "actual_name": actual_name, 2053 }, 2054 ) 2055 2056 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2057 workspace.permanently_delete_connection( 2058 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2059 connection=connection_id, 2060 cascade_delete_source=cascade_delete_source, 2061 cascade_delete_destination=cascade_delete_destination, 2062 ) 2063 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 2064 2065 2066@mcp_tool( 2067 open_world=True, 2068 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2069) 2070def rename_cloud_source( 2071 ctx: Context, 2072 source_id: Annotated[ 2073 str, 2074 Field(description="The ID of the deployed source to rename."), 2075 ], 2076 name: Annotated[ 2077 str, 2078 Field(description="New name for the source."), 2079 ], 2080 *, 2081 workspace_id: Annotated[ 2082 str | None, 2083 Field( 2084 description=WORKSPACE_ID_TIP_TEXT, 2085 default=None, 2086 ), 2087 ], 2088) -> str: 2089 """Rename a deployed source connector on Airbyte Cloud.""" 2090 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2091 source = workspace.get_source(source_id=source_id) 2092 source.rename(name=name) 2093 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 2094 2095 2096@mcp_tool( 2097 destructive=True, 2098 open_world=True, 2099 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2100) 2101def update_cloud_source_config( 2102 ctx: Context, 2103 source_id: Annotated[ 2104 str, 2105 Field(description="The ID of the deployed source to update."), 2106 ], 2107 config: Annotated[ 2108 dict | str, 2109 Field( 2110 description="New configuration for the source connector.", 2111 ), 2112 ], 2113 config_secret_name: Annotated[ 2114 str | None, 2115 Field( 2116 description="The name of the secret containing the configuration.", 2117 default=None, 2118 ), 2119 ] = None, 2120 *, 2121 workspace_id: Annotated[ 2122 str | None, 2123 Field( 2124 description=WORKSPACE_ID_TIP_TEXT, 2125 default=None, 2126 ), 2127 ], 2128) -> str: 2129 """Update a deployed source connector's configuration on Airbyte Cloud. 2130 2131 This is a destructive operation that can break existing connections if the 2132 configuration is changed incorrectly. Use with caution. 2133 """ 2134 check_guid_created_in_session(source_id) 2135 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2136 source = workspace.get_source(source_id=source_id) 2137 2138 config_dict = resolve_connector_config( 2139 config=config, 2140 config_secret_name=config_secret_name, 2141 config_spec_jsonschema=None, # We don't have the spec here 2142 ) 2143 2144 source.update_config(config=config_dict) 2145 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 2146 2147 2148@mcp_tool( 2149 open_world=True, 2150 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2151) 2152def rename_cloud_destination( 2153 ctx: Context, 2154 destination_id: Annotated[ 2155 str, 2156 Field(description="The ID of the deployed destination to rename."), 2157 ], 2158 name: Annotated[ 2159 str, 2160 Field(description="New name for the destination."), 2161 ], 2162 *, 2163 workspace_id: Annotated[ 2164 str | None, 2165 Field( 2166 description=WORKSPACE_ID_TIP_TEXT, 2167 default=None, 2168 ), 2169 ], 2170) -> str: 2171 """Rename a deployed destination connector on Airbyte Cloud.""" 2172 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2173 destination = workspace.get_destination(destination_id=destination_id) 2174 destination.rename(name=name) 2175 return ( 2176 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2177 f"URL: {destination.connector_url}" 2178 ) 2179 2180 2181@mcp_tool( 2182 destructive=True, 2183 open_world=True, 2184 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2185) 2186def update_cloud_destination_config( 2187 ctx: Context, 2188 destination_id: Annotated[ 2189 str, 2190 Field(description="The ID of the deployed destination to update."), 2191 ], 2192 config: Annotated[ 2193 dict | str, 2194 Field( 2195 description="New configuration for the destination connector.", 2196 ), 2197 ], 2198 config_secret_name: Annotated[ 2199 str | None, 2200 Field( 2201 description="The name of the secret containing the configuration.", 2202 default=None, 2203 ), 2204 ], 2205 *, 2206 workspace_id: Annotated[ 2207 str | None, 2208 Field( 2209 description=WORKSPACE_ID_TIP_TEXT, 2210 default=None, 2211 ), 2212 ], 2213) -> str: 2214 """Update a deployed destination connector's configuration on Airbyte Cloud. 2215 2216 This is a destructive operation that can break existing connections if the 2217 configuration is changed incorrectly. Use with caution. 2218 """ 2219 check_guid_created_in_session(destination_id) 2220 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2221 destination = workspace.get_destination(destination_id=destination_id) 2222 2223 config_dict = resolve_connector_config( 2224 config=config, 2225 config_secret_name=config_secret_name, 2226 config_spec_jsonschema=None, # We don't have the spec here 2227 ) 2228 2229 destination.update_config(config=config_dict) 2230 return ( 2231 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2232 ) 2233 2234 2235@mcp_tool( 2236 open_world=True, 2237 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2238) 2239def rename_cloud_connection( 2240 ctx: Context, 2241 connection_id: Annotated[ 2242 str, 2243 Field(description="The ID of the connection to rename."), 2244 ], 2245 name: Annotated[ 2246 str, 2247 Field(description="New name for the connection."), 2248 ], 2249 *, 2250 workspace_id: Annotated[ 2251 str | None, 2252 Field( 2253 description=WORKSPACE_ID_TIP_TEXT, 2254 default=None, 2255 ), 2256 ], 2257) -> str: 2258 """Rename a connection on Airbyte Cloud.""" 2259 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2260 connection = workspace.get_connection(connection_id=connection_id) 2261 connection.rename(name=name) 2262 return ( 2263 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2264 f"URL: {connection.connection_url}" 2265 ) 2266 2267 2268@mcp_tool( 2269 destructive=True, 2270 open_world=True, 2271 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2272) 2273def set_cloud_connection_table_prefix( 2274 ctx: Context, 2275 connection_id: Annotated[ 2276 str, 2277 Field(description="The ID of the connection to update."), 2278 ], 2279 prefix: Annotated[ 2280 str, 2281 Field(description="New table prefix to use when syncing to the destination."), 2282 ], 2283 *, 2284 workspace_id: Annotated[ 2285 str | None, 2286 Field( 2287 description=WORKSPACE_ID_TIP_TEXT, 2288 default=None, 2289 ), 2290 ], 2291) -> str: 2292 """Set the table prefix for a connection on Airbyte Cloud. 2293 2294 This is a destructive operation that can break downstream dependencies if the 2295 table prefix is changed incorrectly. Use with caution. 2296 """ 2297 check_guid_created_in_session(connection_id) 2298 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2299 connection = workspace.get_connection(connection_id=connection_id) 2300 connection.set_table_prefix(prefix=prefix) 2301 return ( 2302 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2303 f"URL: {connection.connection_url}" 2304 ) 2305 2306 2307@mcp_tool( 2308 destructive=True, 2309 open_world=True, 2310 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2311) 2312def set_cloud_connection_selected_streams( 2313 ctx: Context, 2314 connection_id: Annotated[ 2315 str, 2316 Field(description="The ID of the connection to update."), 2317 ], 2318 stream_names: Annotated[ 2319 str | list[str], 2320 Field( 2321 description=( 2322 "The selected stream names to sync within the connection. " 2323 "Must be an explicit stream name or list of streams." 2324 ) 2325 ), 2326 ], 2327 *, 2328 workspace_id: Annotated[ 2329 str | None, 2330 Field( 2331 description=WORKSPACE_ID_TIP_TEXT, 2332 default=None, 2333 ), 2334 ], 2335) -> str: 2336 """Set the selected streams for a connection on Airbyte Cloud. 2337 2338 This is a destructive operation that can break existing connections if the 2339 stream selection is changed incorrectly. Use with caution. 2340 """ 2341 check_guid_created_in_session(connection_id) 2342 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2343 connection = workspace.get_connection(connection_id=connection_id) 2344 2345 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2346 connection.set_selected_streams(stream_names=resolved_streams_list) 2347 2348 return ( 2349 f"Successfully set selected streams for connection '{connection_id}' " 2350 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2351 ) 2352 2353 2354@mcp_tool( 2355 open_world=True, 2356 destructive=True, 2357 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2358) 2359def update_cloud_connection( 2360 ctx: Context, 2361 connection_id: Annotated[ 2362 str, 2363 Field(description="The ID of the connection to update."), 2364 ], 2365 *, 2366 enabled: Annotated[ 2367 bool | None, 2368 Field( 2369 description=( 2370 "Set the connection's enabled status. " 2371 "True enables the connection (status='active'), " 2372 "False disables it (status='inactive'). " 2373 "Leave unset to keep the current status." 2374 ), 2375 default=None, 2376 ), 2377 ], 2378 cron_expression: Annotated[ 2379 str | None, 2380 Field( 2381 description=( 2382 "A cron expression defining when syncs should run. " 2383 "Examples: '0 0 * * *' (daily at midnight UTC), " 2384 "'0 */6 * * *' (every 6 hours), " 2385 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2386 "Leave unset to keep the current schedule. " 2387 "Cannot be used together with 'manual_schedule'." 2388 ), 2389 default=None, 2390 ), 2391 ], 2392 manual_schedule: Annotated[ 2393 bool | None, 2394 Field( 2395 description=( 2396 "Set to True to disable automatic syncs (manual scheduling only). " 2397 "Syncs will only run when manually triggered. " 2398 "Cannot be used together with 'cron_expression'." 2399 ), 2400 default=None, 2401 ), 2402 ], 2403 workspace_id: Annotated[ 2404 str | None, 2405 Field( 2406 description=WORKSPACE_ID_TIP_TEXT, 2407 default=None, 2408 ), 2409 ], 2410) -> str: 2411 """Update a connection's settings on Airbyte Cloud. 2412 2413 This tool allows updating multiple connection settings in a single call: 2414 - Enable or disable the connection 2415 - Set a cron schedule for automatic syncs 2416 - Switch to manual scheduling (no automatic syncs) 2417 2418 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2419 parameters are mutually exclusive. 2420 """ 2421 check_guid_created_in_session(connection_id) 2422 2423 # Validate that at least one setting is provided 2424 if enabled is None and cron_expression is None and manual_schedule is None: 2425 raise ValueError( 2426 "At least one setting must be provided: 'enabled', 'cron_expression', " 2427 "or 'manual_schedule'." 2428 ) 2429 2430 # Validate mutually exclusive schedule options 2431 if cron_expression is not None and manual_schedule is True: 2432 raise ValueError( 2433 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2434 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2435 "for manual-only syncs." 2436 ) 2437 2438 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2439 connection = workspace.get_connection(connection_id=connection_id) 2440 2441 changes_made: list[str] = [] 2442 2443 # Apply enabled status change 2444 if enabled is not None: 2445 connection.set_enabled(enabled=enabled) 2446 status_str = "enabled" if enabled else "disabled" 2447 changes_made.append(f"status set to '{status_str}'") 2448 2449 # Apply schedule change 2450 if cron_expression is not None: 2451 connection.set_schedule(cron_expression=cron_expression) 2452 changes_made.append(f"schedule set to '{cron_expression}'") 2453 elif manual_schedule is True: 2454 connection.set_manual_schedule() 2455 changes_made.append("schedule set to 'manual'") 2456 2457 changes_summary = ", ".join(changes_made) 2458 return ( 2459 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2460 f"URL: {connection.connection_url}" 2461 ) 2462 2463 2464@mcp_tool( 2465 read_only=True, 2466 idempotent=True, 2467 open_world=True, 2468 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2469) 2470def get_connection_artifact( 2471 ctx: Context, 2472 connection_id: Annotated[ 2473 str, 2474 Field(description="The ID of the Airbyte Cloud connection."), 2475 ], 2476 artifact_type: Annotated[ 2477 Literal["state", "catalog"], 2478 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2479 ], 2480 *, 2481 workspace_id: Annotated[ 2482 str | None, 2483 Field( 2484 description=WORKSPACE_ID_TIP_TEXT, 2485 default=None, 2486 ), 2487 ], 2488) -> dict[str, Any] | list[dict[str, Any]]: 2489 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2490 2491 By default, returns artifacts in Airbyte protocol format (snake_case, 2492 suitable for passing to connector CLI flags like `--state` or `--catalog`). 2493 2494 Retrieves the specified artifact for a connection: 2495 - `state`: Returns a list of protocol-format `AirbyteStateMessage` dicts, 2496 or `{"ERROR": "..."}` if no state is set. 2497 - `catalog`: Returns the protocol-format `ConfiguredAirbyteCatalog` dict, 2498 or `{"ERROR": "..."}` if not found. 2499 """ 2500 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2501 connection = workspace.get_connection(connection_id=connection_id) 2502 2503 if artifact_type == "state": 2504 state = connection.dump_raw_state() 2505 if not state: 2506 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2507 return state 2508 2509 # artifact_type == "catalog" 2510 catalog = connection.dump_raw_catalog() 2511 if catalog is None: 2512 return {"ERROR": "No catalog found for this connection"} 2513 return catalog 2514 2515 2516def _add_defaults_for_exclude_args( 2517 exclude_args: list[str], 2518) -> None: 2519 """Patch registered tool functions to add Python-level defaults for excluded args. 2520 2521 FastMCP requires that excluded args have Python-level default values, but MCP tool 2522 functions should only use Field(default=...) in their Annotated type hints (not 2523 Python-level `= None`). This function bridges the gap by dynamically adding Python 2524 defaults to the function signatures at registration time, so the source code stays 2525 clean while satisfying FastMCP's requirement. 2526 2527 Args: 2528 exclude_args: List of argument names that will be excluded from the tool schema. 2529 """ 2530 import inspect # noqa: PLC0415 # Local import for optional patching logic 2531 2532 from fastmcp_extensions.decorators import ( # noqa: PLC0415 2533 _REGISTERED_TOOLS, # noqa: PLC2701 2534 ) 2535 2536 for func, _annotations in _REGISTERED_TOOLS: 2537 sig = inspect.signature(func) 2538 needs_patch = any( 2539 arg_name in sig.parameters 2540 and sig.parameters[arg_name].default is inspect.Parameter.empty 2541 for arg_name in exclude_args 2542 ) 2543 if needs_patch: 2544 new_params = [ 2545 p.replace(default=None) 2546 if name in exclude_args and p.default is inspect.Parameter.empty 2547 else p 2548 for name, p in sig.parameters.items() 2549 ] 2550 func.__signature__ = sig.replace(parameters=new_params) # type: ignore[attr-defined] 2551 2552 2553def register_cloud_tools(app: FastMCP) -> None: 2554 """Register cloud tools with the FastMCP app. 2555 2556 Args: 2557 app: FastMCP application instance 2558 """ 2559 exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None 2560 if exclude_args: 2561 _add_defaults_for_exclude_args(exclude_args) 2562 register_mcp_tools( 2563 app, 2564 mcp_module=__name__, 2565 exclude_args=exclude_args, 2566 )