airbyte.mcp.cloud_ops

Airbyte Cloud MCP operations.

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations."""
   3
   4from pathlib import Path
   5from typing import Annotated, Any, Literal, cast
   6
   7from fastmcp import FastMCP
   8from pydantic import BaseModel, Field
   9
  10from airbyte import cloud, get_destination, get_source
  11from airbyte._util import api_util
  12from airbyte.cloud.connectors import CustomCloudSourceDefinition
  13from airbyte.cloud.constants import FAILED_STATUSES
  14from airbyte.cloud.workspaces import CloudWorkspace
  15from airbyte.destinations.util import get_noop_destination
  16from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  17from airbyte.mcp._tool_utils import (
  18    check_guid_created_in_session,
  19    mcp_tool,
  20    register_guid_created_in_session,
  21    register_tools,
  22)
  23from airbyte.mcp._util import (
  24    resolve_cloud_credentials,
  25    resolve_config,
  26    resolve_list_of_strings,
  27    resolve_workspace_id,
  28)
  29from airbyte.secrets import SecretString
  30
  31
  32CLOUD_AUTH_TIP_TEXT = (
  33    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  34    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  35    "will be used to authenticate with the Airbyte Cloud API."
  36)
  37WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  38
  39
  40class CloudSourceResult(BaseModel):
  41    """Information about a deployed source connector in Airbyte Cloud."""
  42
  43    id: str
  44    """The source ID."""
  45    name: str
  46    """Display name of the source."""
  47    url: str
  48    """Web URL for managing this source in Airbyte Cloud."""
  49
  50
  51class CloudDestinationResult(BaseModel):
  52    """Information about a deployed destination connector in Airbyte Cloud."""
  53
  54    id: str
  55    """The destination ID."""
  56    name: str
  57    """Display name of the destination."""
  58    url: str
  59    """Web URL for managing this destination in Airbyte Cloud."""
  60
  61
  62class CloudConnectionResult(BaseModel):
  63    """Information about a deployed connection in Airbyte Cloud."""
  64
  65    id: str
  66    """The connection ID."""
  67    name: str
  68    """Display name of the connection."""
  69    url: str
  70    """Web URL for managing this connection in Airbyte Cloud."""
  71    source_id: str
  72    """ID of the source used by this connection."""
  73    destination_id: str
  74    """ID of the destination used by this connection."""
  75    last_job_status: str | None = None
  76    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  77    Only populated when with_connection_status=True."""
  78    last_job_id: int | None = None
  79    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  80    last_job_time: str | None = None
  81    """ISO 8601 timestamp of the most recent completed sync.
  82    Only populated when with_connection_status=True."""
  83    currently_running_job_id: int | None = None
  84    """Job ID of a currently running sync, if any.
  85    Only populated when with_connection_status=True."""
  86    currently_running_job_start_time: str | None = None
  87    """ISO 8601 timestamp of when the currently running sync started.
  88    Only populated when with_connection_status=True."""
  89
  90
  91class CloudSourceDetails(BaseModel):
  92    """Detailed information about a deployed source connector in Airbyte Cloud."""
  93
  94    source_id: str
  95    """The source ID."""
  96    source_name: str
  97    """Display name of the source."""
  98    source_url: str
  99    """Web URL for managing this source in Airbyte Cloud."""
 100    connector_definition_id: str
 101    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 102
 103
 104class CloudDestinationDetails(BaseModel):
 105    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 106
 107    destination_id: str
 108    """The destination ID."""
 109    destination_name: str
 110    """Display name of the destination."""
 111    destination_url: str
 112    """Web URL for managing this destination in Airbyte Cloud."""
 113    connector_definition_id: str
 114    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 115
 116
 117class CloudConnectionDetails(BaseModel):
 118    """Detailed information about a deployed connection in Airbyte Cloud."""
 119
 120    connection_id: str
 121    """The connection ID."""
 122    connection_name: str
 123    """Display name of the connection."""
 124    connection_url: str
 125    """Web URL for managing this connection in Airbyte Cloud."""
 126    source_id: str
 127    """ID of the source used by this connection."""
 128    source_name: str
 129    """Display name of the source."""
 130    destination_id: str
 131    """ID of the destination used by this connection."""
 132    destination_name: str
 133    """Display name of the destination."""
 134    selected_streams: list[str]
 135    """List of stream names selected for syncing."""
 136    table_prefix: str | None
 137    """Table prefix applied when syncing to the destination."""
 138
 139
 140class CloudOrganizationResult(BaseModel):
 141    """Information about an organization in Airbyte Cloud."""
 142
 143    id: str
 144    """The organization ID."""
 145    name: str
 146    """Display name of the organization."""
 147    email: str
 148    """Email associated with the organization."""
 149
 150
 151class CloudWorkspaceResult(BaseModel):
 152    """Information about a workspace in Airbyte Cloud."""
 153
 154    workspace_id: str
 155    """The workspace ID."""
 156    workspace_name: str
 157    """Display name of the workspace."""
 158    workspace_url: str | None = None
 159    """URL to access the workspace in Airbyte Cloud."""
 160    organization_id: str
 161    """ID of the organization (requires ORGANIZATION_READER permission)."""
 162    organization_name: str | None = None
 163    """Name of the organization (requires ORGANIZATION_READER permission)."""
 164
 165
 166class LogReadResult(BaseModel):
 167    """Result of reading sync logs with pagination support."""
 168
 169    job_id: int
 170    """The job ID the logs belong to."""
 171    attempt_number: int
 172    """The attempt number the logs belong to."""
 173    log_text: str
 174    """The string containing the log text we are returning."""
 175    log_text_start_line: int
 176    """1-based line index of the first line returned."""
 177    log_text_line_count: int
 178    """Count of lines we are returning."""
 179    total_log_lines_available: int
 180    """Total number of log lines available, shows if any lines were missed due to the limit."""
 181
 182
 183class SyncJobResult(BaseModel):
 184    """Information about a sync job."""
 185
 186    job_id: int
 187    """The job ID."""
 188    status: str
 189    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
 190    bytes_synced: int
 191    """Number of bytes synced in this job."""
 192    records_synced: int
 193    """Number of records synced in this job."""
 194    start_time: str
 195    """ISO 8601 timestamp of when the job started."""
 196    job_url: str
 197    """URL to view the job in Airbyte Cloud."""
 198
 199
 200class SyncJobListResult(BaseModel):
 201    """Result of listing sync jobs with pagination support."""
 202
 203    jobs: list[SyncJobResult]
 204    """List of sync jobs."""
 205    jobs_count: int
 206    """Number of jobs returned in this response."""
 207    jobs_offset: int
 208    """Offset used for this request (0 if not specified)."""
 209    from_tail: bool
 210    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
 211
 212
 213def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
 214    """Get an authenticated CloudWorkspace.
 215
 216    Resolves credentials from multiple sources in order:
 217    1. HTTP headers (when running as MCP server with HTTP/SSE transport)
 218    2. Environment variables
 219
 220    Args:
 221        workspace_id: Optional workspace ID. If not provided, uses HTTP headers
 222            or the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
 223    """
 224    credentials = resolve_cloud_credentials()
 225    resolved_workspace_id = resolve_workspace_id(workspace_id)
 226
 227    return CloudWorkspace(
 228        workspace_id=resolved_workspace_id,
 229        client_id=credentials.client_id,
 230        client_secret=credentials.client_secret,
 231        bearer_token=credentials.bearer_token,
 232        api_root=credentials.api_root,
 233    )
 234
 235
 236@mcp_tool(
 237    domain="cloud",
 238    open_world=True,
 239    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 240)
 241def deploy_source_to_cloud(
 242    source_name: Annotated[
 243        str,
 244        Field(description="The name to use when deploying the source."),
 245    ],
 246    source_connector_name: Annotated[
 247        str,
 248        Field(description="The name of the source connector (e.g., 'source-faker')."),
 249    ],
 250    *,
 251    workspace_id: Annotated[
 252        str | None,
 253        Field(
 254            description=WORKSPACE_ID_TIP_TEXT,
 255            default=None,
 256        ),
 257    ],
 258    config: Annotated[
 259        dict | str | None,
 260        Field(
 261            description="The configuration for the source connector.",
 262            default=None,
 263        ),
 264    ],
 265    config_secret_name: Annotated[
 266        str | None,
 267        Field(
 268            description="The name of the secret containing the configuration.",
 269            default=None,
 270        ),
 271    ],
 272    unique: Annotated[
 273        bool,
 274        Field(
 275            description="Whether to require a unique name.",
 276            default=True,
 277        ),
 278    ],
 279) -> str:
 280    """Deploy a source connector to Airbyte Cloud."""
 281    source = get_source(
 282        source_connector_name,
 283        no_executor=True,
 284    )
 285    config_dict = resolve_config(
 286        config=config,
 287        config_secret_name=config_secret_name,
 288        config_spec_jsonschema=source.config_spec,
 289    )
 290    source.set_config(config_dict, validate=True)
 291
 292    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 293    deployed_source = workspace.deploy_source(
 294        name=source_name,
 295        source=source,
 296        unique=unique,
 297    )
 298
 299    register_guid_created_in_session(deployed_source.connector_id)
 300    return (
 301        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 302        f" and URL: {deployed_source.connector_url}"
 303    )
 304
 305
 306@mcp_tool(
 307    domain="cloud",
 308    open_world=True,
 309    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 310)
 311def deploy_destination_to_cloud(
 312    destination_name: Annotated[
 313        str,
 314        Field(description="The name to use when deploying the destination."),
 315    ],
 316    destination_connector_name: Annotated[
 317        str,
 318        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 319    ],
 320    *,
 321    workspace_id: Annotated[
 322        str | None,
 323        Field(
 324            description=WORKSPACE_ID_TIP_TEXT,
 325            default=None,
 326        ),
 327    ],
 328    config: Annotated[
 329        dict | str | None,
 330        Field(
 331            description="The configuration for the destination connector.",
 332            default=None,
 333        ),
 334    ],
 335    config_secret_name: Annotated[
 336        str | None,
 337        Field(
 338            description="The name of the secret containing the configuration.",
 339            default=None,
 340        ),
 341    ],
 342    unique: Annotated[
 343        bool,
 344        Field(
 345            description="Whether to require a unique name.",
 346            default=True,
 347        ),
 348    ],
 349) -> str:
 350    """Deploy a destination connector to Airbyte Cloud."""
 351    destination = get_destination(
 352        destination_connector_name,
 353        no_executor=True,
 354    )
 355    config_dict = resolve_config(
 356        config=config,
 357        config_secret_name=config_secret_name,
 358        config_spec_jsonschema=destination.config_spec,
 359    )
 360    destination.set_config(config_dict, validate=True)
 361
 362    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 363    deployed_destination = workspace.deploy_destination(
 364        name=destination_name,
 365        destination=destination,
 366        unique=unique,
 367    )
 368
 369    register_guid_created_in_session(deployed_destination.connector_id)
 370    return (
 371        f"Successfully deployed destination '{destination_name}' "
 372        f"with ID: {deployed_destination.connector_id}"
 373    )
 374
 375
 376@mcp_tool(
 377    domain="cloud",
 378    open_world=True,
 379    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 380)
 381def create_connection_on_cloud(
 382    connection_name: Annotated[
 383        str,
 384        Field(description="The name of the connection."),
 385    ],
 386    source_id: Annotated[
 387        str,
 388        Field(description="The ID of the deployed source."),
 389    ],
 390    destination_id: Annotated[
 391        str,
 392        Field(description="The ID of the deployed destination."),
 393    ],
 394    selected_streams: Annotated[
 395        str | list[str],
 396        Field(
 397            description=(
 398                "The selected stream names to sync within the connection. "
 399                "Must be an explicit stream name or list of streams. "
 400                "Cannot be empty or '*'."
 401            )
 402        ),
 403    ],
 404    *,
 405    workspace_id: Annotated[
 406        str | None,
 407        Field(
 408            description=WORKSPACE_ID_TIP_TEXT,
 409            default=None,
 410        ),
 411    ],
 412    table_prefix: Annotated[
 413        str | None,
 414        Field(
 415            description="Optional table prefix to use when syncing to the destination.",
 416            default=None,
 417        ),
 418    ],
 419) -> str:
 420    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 421    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 422    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 423    deployed_connection = workspace.deploy_connection(
 424        connection_name=connection_name,
 425        source=source_id,
 426        destination=destination_id,
 427        selected_streams=resolved_streams_list,
 428        table_prefix=table_prefix,
 429    )
 430
 431    register_guid_created_in_session(deployed_connection.connection_id)
 432    return (
 433        f"Successfully created connection '{connection_name}' "
 434        f"with ID '{deployed_connection.connection_id}' and "
 435        f"URL: {deployed_connection.connection_url}"
 436    )
 437
 438
 439@mcp_tool(
 440    domain="cloud",
 441    open_world=True,
 442    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 443)
 444def run_cloud_sync(
 445    connection_id: Annotated[
 446        str,
 447        Field(description="The ID of the Airbyte Cloud connection."),
 448    ],
 449    *,
 450    workspace_id: Annotated[
 451        str | None,
 452        Field(
 453            description=WORKSPACE_ID_TIP_TEXT,
 454            default=None,
 455        ),
 456    ],
 457    wait: Annotated[
 458        bool,
 459        Field(
 460            description=(
 461                "Whether to wait for the sync to complete. Since a sync can take between several "
 462                "minutes and several hours, this option is not recommended for most "
 463                "scenarios."
 464            ),
 465            default=False,
 466        ),
 467    ],
 468    wait_timeout: Annotated[
 469        int,
 470        Field(
 471            description="Maximum time to wait for sync completion (seconds).",
 472            default=300,
 473        ),
 474    ],
 475) -> str:
 476    """Run a sync job on Airbyte Cloud."""
 477    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 478    connection = workspace.get_connection(connection_id=connection_id)
 479    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 480
 481    if wait:
 482        status = sync_result.get_job_status()
 483        return (
 484            f"Sync completed with status: {status}. "
 485            f"Job ID is '{sync_result.job_id}' and "
 486            f"job URL is: {sync_result.job_url}"
 487        )
 488    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 489
 490
 491@mcp_tool(
 492    domain="cloud",
 493    read_only=True,
 494    idempotent=True,
 495    open_world=True,
 496    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 497)
 498def check_airbyte_cloud_workspace(
 499    *,
 500    workspace_id: Annotated[
 501        str | None,
 502        Field(
 503            description=WORKSPACE_ID_TIP_TEXT,
 504            default=None,
 505        ),
 506    ],
 507) -> CloudWorkspaceResult:
 508    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 509
 510    Returns workspace details including workspace ID, name, and organization info.
 511    """
 512    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 513
 514    # Get workspace details from the public API using workspace's credentials
 515    workspace_response = api_util.get_workspace(
 516        workspace_id=workspace.workspace_id,
 517        api_root=workspace.api_root,
 518        client_id=workspace.client_id,
 519        client_secret=workspace.client_secret,
 520        bearer_token=workspace.bearer_token,
 521    )
 522
 523    # Try to get organization info, but fail gracefully if we don't have permissions.
 524    # Fetching organization info requires ORGANIZATION_READER permissions on the organization,
 525    # which may not be available with workspace-scoped credentials.
 526    organization = workspace.get_organization(raise_on_error=False)
 527
 528    return CloudWorkspaceResult(
 529        workspace_id=workspace_response.workspace_id,
 530        workspace_name=workspace_response.name,
 531        workspace_url=workspace.workspace_url,
 532        organization_id=(
 533            organization.organization_id
 534            if organization
 535            else "[unavailable - requires ORGANIZATION_READER permission]"
 536        ),
 537        organization_name=organization.organization_name if organization else None,
 538    )
 539
 540
 541@mcp_tool(
 542    domain="cloud",
 543    open_world=True,
 544    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 545)
 546def deploy_noop_destination_to_cloud(
 547    name: str = "No-op Destination",
 548    *,
 549    workspace_id: Annotated[
 550        str | None,
 551        Field(
 552            description=WORKSPACE_ID_TIP_TEXT,
 553            default=None,
 554        ),
 555    ],
 556    unique: bool = True,
 557) -> str:
 558    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 559    destination = get_noop_destination()
 560    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 561    deployed_destination = workspace.deploy_destination(
 562        name=name,
 563        destination=destination,
 564        unique=unique,
 565    )
 566    register_guid_created_in_session(deployed_destination.connector_id)
 567    return (
 568        f"Successfully deployed No-op Destination "
 569        f"with ID '{deployed_destination.connector_id}' and "
 570        f"URL: {deployed_destination.connector_url}"
 571    )
 572
 573
 574@mcp_tool(
 575    domain="cloud",
 576    read_only=True,
 577    idempotent=True,
 578    open_world=True,
 579    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 580)
 581def get_cloud_sync_status(
 582    connection_id: Annotated[
 583        str,
 584        Field(
 585            description="The ID of the Airbyte Cloud connection.",
 586        ),
 587    ],
 588    job_id: Annotated[
 589        int | None,
 590        Field(
 591            description="Optional job ID. If not provided, the latest job will be used.",
 592            default=None,
 593        ),
 594    ],
 595    *,
 596    workspace_id: Annotated[
 597        str | None,
 598        Field(
 599            description=WORKSPACE_ID_TIP_TEXT,
 600            default=None,
 601        ),
 602    ],
 603    include_attempts: Annotated[
 604        bool,
 605        Field(
 606            description="Whether to include detailed attempts information.",
 607            default=False,
 608        ),
 609    ],
 610) -> dict[str, Any]:
 611    """Get the status of a sync job from the Airbyte Cloud."""
 612    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 613    connection = workspace.get_connection(connection_id=connection_id)
 614
 615    # If a job ID is provided, get the job by ID.
 616    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 617
 618    if not sync_result:
 619        return {"status": None, "job_id": None, "attempts": []}
 620
 621    result = {
 622        "status": sync_result.get_job_status(),
 623        "job_id": sync_result.job_id,
 624        "bytes_synced": sync_result.bytes_synced,
 625        "records_synced": sync_result.records_synced,
 626        "start_time": sync_result.start_time.isoformat(),
 627        "job_url": sync_result.job_url,
 628        "attempts": [],
 629    }
 630
 631    if include_attempts:
 632        attempts = sync_result.get_attempts()
 633        result["attempts"] = [
 634            {
 635                "attempt_number": attempt.attempt_number,
 636                "attempt_id": attempt.attempt_id,
 637                "status": attempt.status,
 638                "bytes_synced": attempt.bytes_synced,
 639                "records_synced": attempt.records_synced,
 640                "created_at": attempt.created_at.isoformat(),
 641            }
 642            for attempt in attempts
 643        ]
 644
 645    return result
 646
 647
 648@mcp_tool(
 649    domain="cloud",
 650    read_only=True,
 651    idempotent=True,
 652    open_world=True,
 653    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 654)
 655def list_cloud_sync_jobs(
 656    connection_id: Annotated[
 657        str,
 658        Field(description="The ID of the Airbyte Cloud connection."),
 659    ],
 660    *,
 661    workspace_id: Annotated[
 662        str | None,
 663        Field(
 664            description=WORKSPACE_ID_TIP_TEXT,
 665            default=None,
 666        ),
 667    ],
 668    max_jobs: Annotated[
 669        int,
 670        Field(
 671            description=(
 672                "Maximum number of jobs to return. "
 673                "Defaults to 20 if not specified. "
 674                "Maximum allowed value is 500."
 675            ),
 676            default=20,
 677        ),
 678    ],
 679    from_tail: Annotated[
 680        bool | None,
 681        Field(
 682            description=(
 683                "When True, jobs are ordered newest-first (createdAt DESC). "
 684                "When False, jobs are ordered oldest-first (createdAt ASC). "
 685                "Defaults to True if `jobs_offset` is not specified. "
 686                "Cannot combine `from_tail=True` with `jobs_offset`."
 687            ),
 688            default=None,
 689        ),
 690    ],
 691    jobs_offset: Annotated[
 692        int | None,
 693        Field(
 694            description=(
 695                "Number of jobs to skip from the beginning. "
 696                "Cannot be combined with `from_tail=True`."
 697            ),
 698            default=None,
 699        ),
 700    ],
 701) -> SyncJobListResult:
 702    """List sync jobs for a connection with pagination support.
 703
 704    This tool allows you to retrieve a list of sync jobs for a connection,
 705    with control over ordering and pagination. By default, jobs are returned
 706    newest-first (from_tail=True).
 707    """
 708    # Validate that jobs_offset and from_tail are not both set
 709    if jobs_offset is not None and from_tail is True:
 710        raise PyAirbyteInputError(
 711            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
 712            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
 713        )
 714
 715    # Default to from_tail=True if neither is specified
 716    if from_tail is None and jobs_offset is None:
 717        from_tail = True
 718    elif from_tail is None:
 719        from_tail = False
 720
 721    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 722    connection = workspace.get_connection(connection_id=connection_id)
 723
 724    # Cap at 500 to avoid overloading agent context
 725    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
 726
 727    sync_results = connection.get_previous_sync_logs(
 728        limit=effective_limit,
 729        offset=jobs_offset,
 730        from_tail=from_tail,
 731    )
 732
 733    jobs = [
 734        SyncJobResult(
 735            job_id=sync_result.job_id,
 736            status=str(sync_result.get_job_status()),
 737            bytes_synced=sync_result.bytes_synced,
 738            records_synced=sync_result.records_synced,
 739            start_time=sync_result.start_time.isoformat(),
 740            job_url=sync_result.job_url,
 741        )
 742        for sync_result in sync_results
 743    ]
 744
 745    return SyncJobListResult(
 746        jobs=jobs,
 747        jobs_count=len(jobs),
 748        jobs_offset=jobs_offset or 0,
 749        from_tail=from_tail,
 750    )
 751
 752
 753@mcp_tool(
 754    domain="cloud",
 755    read_only=True,
 756    idempotent=True,
 757    open_world=True,
 758    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 759)
 760def list_deployed_cloud_source_connectors(
 761    *,
 762    workspace_id: Annotated[
 763        str | None,
 764        Field(
 765            description=WORKSPACE_ID_TIP_TEXT,
 766            default=None,
 767        ),
 768    ],
 769    name_contains: Annotated[
 770        str | None,
 771        Field(
 772            description="Optional case-insensitive substring to filter sources by name",
 773            default=None,
 774        ),
 775    ],
 776    max_items_limit: Annotated[
 777        int | None,
 778        Field(
 779            description="Optional maximum number of items to return (default: no limit)",
 780            default=None,
 781        ),
 782    ],
 783) -> list[CloudSourceResult]:
 784    """List all deployed source connectors in the Airbyte Cloud workspace."""
 785    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 786    sources = workspace.list_sources()
 787
 788    # Filter by name if requested
 789    if name_contains:
 790        needle = name_contains.lower()
 791        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 792
 793    # Apply limit if requested
 794    if max_items_limit is not None:
 795        sources = sources[:max_items_limit]
 796
 797    # Note: name and url are guaranteed non-null from list API responses
 798    return [
 799        CloudSourceResult(
 800            id=source.source_id,
 801            name=cast(str, source.name),
 802            url=cast(str, source.connector_url),
 803        )
 804        for source in sources
 805    ]
 806
 807
 808@mcp_tool(
 809    domain="cloud",
 810    read_only=True,
 811    idempotent=True,
 812    open_world=True,
 813    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 814)
 815def list_deployed_cloud_destination_connectors(
 816    *,
 817    workspace_id: Annotated[
 818        str | None,
 819        Field(
 820            description=WORKSPACE_ID_TIP_TEXT,
 821            default=None,
 822        ),
 823    ],
 824    name_contains: Annotated[
 825        str | None,
 826        Field(
 827            description="Optional case-insensitive substring to filter destinations by name",
 828            default=None,
 829        ),
 830    ],
 831    max_items_limit: Annotated[
 832        int | None,
 833        Field(
 834            description="Optional maximum number of items to return (default: no limit)",
 835            default=None,
 836        ),
 837    ],
 838) -> list[CloudDestinationResult]:
 839    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 840    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 841    destinations = workspace.list_destinations()
 842
 843    # Filter by name if requested
 844    if name_contains:
 845        needle = name_contains.lower()
 846        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 847
 848    # Apply limit if requested
 849    if max_items_limit is not None:
 850        destinations = destinations[:max_items_limit]
 851
 852    # Note: name and url are guaranteed non-null from list API responses
 853    return [
 854        CloudDestinationResult(
 855            id=destination.destination_id,
 856            name=cast(str, destination.name),
 857            url=cast(str, destination.connector_url),
 858        )
 859        for destination in destinations
 860    ]
 861
 862
 863@mcp_tool(
 864    domain="cloud",
 865    read_only=True,
 866    idempotent=True,
 867    open_world=True,
 868    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 869)
 870def describe_cloud_source(
 871    source_id: Annotated[
 872        str,
 873        Field(description="The ID of the source to describe."),
 874    ],
 875    *,
 876    workspace_id: Annotated[
 877        str | None,
 878        Field(
 879            description=WORKSPACE_ID_TIP_TEXT,
 880            default=None,
 881        ),
 882    ],
 883) -> CloudSourceDetails:
 884    """Get detailed information about a specific deployed source connector."""
 885    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 886    source = workspace.get_source(source_id=source_id)
 887
 888    # Access name property to ensure _connector_info is populated
 889    source_name = cast(str, source.name)
 890
 891    return CloudSourceDetails(
 892        source_id=source.source_id,
 893        source_name=source_name,
 894        source_url=source.connector_url,
 895        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 896    )
 897
 898
 899@mcp_tool(
 900    domain="cloud",
 901    read_only=True,
 902    idempotent=True,
 903    open_world=True,
 904    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 905)
 906def describe_cloud_destination(
 907    destination_id: Annotated[
 908        str,
 909        Field(description="The ID of the destination to describe."),
 910    ],
 911    *,
 912    workspace_id: Annotated[
 913        str | None,
 914        Field(
 915            description=WORKSPACE_ID_TIP_TEXT,
 916            default=None,
 917        ),
 918    ],
 919) -> CloudDestinationDetails:
 920    """Get detailed information about a specific deployed destination connector."""
 921    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 922    destination = workspace.get_destination(destination_id=destination_id)
 923
 924    # Access name property to ensure _connector_info is populated
 925    destination_name = cast(str, destination.name)
 926
 927    return CloudDestinationDetails(
 928        destination_id=destination.destination_id,
 929        destination_name=destination_name,
 930        destination_url=destination.connector_url,
 931        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 932    )
 933
 934
 935@mcp_tool(
 936    domain="cloud",
 937    read_only=True,
 938    idempotent=True,
 939    open_world=True,
 940    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 941)
 942def describe_cloud_connection(
 943    connection_id: Annotated[
 944        str,
 945        Field(description="The ID of the connection to describe."),
 946    ],
 947    *,
 948    workspace_id: Annotated[
 949        str | None,
 950        Field(
 951            description=WORKSPACE_ID_TIP_TEXT,
 952            default=None,
 953        ),
 954    ],
 955) -> CloudConnectionDetails:
 956    """Get detailed information about a specific deployed connection."""
 957    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 958    connection = workspace.get_connection(connection_id=connection_id)
 959
 960    return CloudConnectionDetails(
 961        connection_id=connection.connection_id,
 962        connection_name=cast(str, connection.name),
 963        connection_url=cast(str, connection.connection_url),
 964        source_id=connection.source_id,
 965        source_name=cast(str, connection.source.name),
 966        destination_id=connection.destination_id,
 967        destination_name=cast(str, connection.destination.name),
 968        selected_streams=connection.stream_names,
 969        table_prefix=connection.table_prefix,
 970    )
 971
 972
 973@mcp_tool(
 974    domain="cloud",
 975    read_only=True,
 976    idempotent=True,
 977    open_world=True,
 978    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 979)
 980def get_cloud_sync_logs(
 981    connection_id: Annotated[
 982        str,
 983        Field(description="The ID of the Airbyte Cloud connection."),
 984    ],
 985    job_id: Annotated[
 986        int | None,
 987        Field(description="Optional job ID. If not provided, the latest job will be used."),
 988    ] = None,
 989    attempt_number: Annotated[
 990        int | None,
 991        Field(
 992            description="Optional attempt number. If not provided, the latest attempt will be used."
 993        ),
 994    ] = None,
 995    *,
 996    workspace_id: Annotated[
 997        str | None,
 998        Field(
 999            description=WORKSPACE_ID_TIP_TEXT,
1000            default=None,
1001        ),
1002    ],
1003    max_lines: Annotated[
1004        int,
1005        Field(
1006            description=(
1007                "Maximum number of lines to return. "
1008                "Defaults to 4000 if not specified. "
1009                "If '0' is provided, no limit is applied."
1010            ),
1011            default=4000,
1012        ),
1013    ],
1014    from_tail: Annotated[
1015        bool | None,
1016        Field(
1017            description=(
1018                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1019                "Defaults to True if `line_offset` is not specified. "
1020                "Cannot combine `from_tail=True` with `line_offset`."
1021            ),
1022            default=None,
1023        ),
1024    ],
1025    line_offset: Annotated[
1026        int | None,
1027        Field(
1028            description=(
1029                "Number of lines to skip from the beginning of the logs. "
1030                "Cannot be combined with `from_tail=True`."
1031            ),
1032            default=None,
1033        ),
1034    ],
1035) -> LogReadResult:
1036    """Get the logs from a sync job attempt on Airbyte Cloud."""
1037    # Validate that line_offset and from_tail are not both set
1038    if line_offset is not None and from_tail:
1039        raise PyAirbyteInputError(
1040            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1041            context={"line_offset": line_offset, "from_tail": from_tail},
1042        )
1043
1044    if from_tail is None and line_offset is None:
1045        from_tail = True
1046    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1047    connection = workspace.get_connection(connection_id=connection_id)
1048
1049    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1050
1051    if not sync_result:
1052        raise AirbyteMissingResourceError(
1053            resource_type="sync job",
1054            resource_name_or_id=connection_id,
1055        )
1056
1057    attempts = sync_result.get_attempts()
1058
1059    if not attempts:
1060        raise AirbyteMissingResourceError(
1061            resource_type="sync attempt",
1062            resource_name_or_id=str(sync_result.job_id),
1063        )
1064
1065    if attempt_number is not None:
1066        target_attempt = None
1067        for attempt in attempts:
1068            if attempt.attempt_number == attempt_number:
1069                target_attempt = attempt
1070                break
1071
1072        if target_attempt is None:
1073            raise AirbyteMissingResourceError(
1074                resource_type="sync attempt",
1075                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1076            )
1077    else:
1078        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1079
1080    logs = target_attempt.get_full_log_text()
1081
1082    if not logs:
1083        # Return empty result with zero lines
1084        return LogReadResult(
1085            log_text=(
1086                f"[No logs available for job '{sync_result.job_id}', "
1087                f"attempt {target_attempt.attempt_number}.]"
1088            ),
1089            log_text_start_line=1,
1090            log_text_line_count=0,
1091            total_log_lines_available=0,
1092            job_id=sync_result.job_id,
1093            attempt_number=target_attempt.attempt_number,
1094        )
1095
1096    # Apply line limiting
1097    log_lines = logs.splitlines()
1098    total_lines = len(log_lines)
1099
1100    # Determine effective max_lines (0 means no limit)
1101    effective_max = total_lines if max_lines == 0 else max_lines
1102
1103    # Calculate start_index and slice based on from_tail or line_offset
1104    if from_tail:
1105        start_index = max(0, total_lines - effective_max)
1106        selected_lines = log_lines[start_index:][:effective_max]
1107    else:
1108        start_index = line_offset or 0
1109        selected_lines = log_lines[start_index : start_index + effective_max]
1110
1111    return LogReadResult(
1112        log_text="\n".join(selected_lines),
1113        log_text_start_line=start_index + 1,  # Convert to 1-based index
1114        log_text_line_count=len(selected_lines),
1115        total_log_lines_available=total_lines,
1116        job_id=sync_result.job_id,
1117        attempt_number=target_attempt.attempt_number,
1118    )
1119
1120
1121@mcp_tool(
1122    domain="cloud",
1123    read_only=True,
1124    idempotent=True,
1125    open_world=True,
1126    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1127)
1128def list_deployed_cloud_connections(
1129    *,
1130    workspace_id: Annotated[
1131        str | None,
1132        Field(
1133            description=WORKSPACE_ID_TIP_TEXT,
1134            default=None,
1135        ),
1136    ],
1137    name_contains: Annotated[
1138        str | None,
1139        Field(
1140            description="Optional case-insensitive substring to filter connections by name",
1141            default=None,
1142        ),
1143    ],
1144    max_items_limit: Annotated[
1145        int | None,
1146        Field(
1147            description="Optional maximum number of items to return (default: no limit)",
1148            default=None,
1149        ),
1150    ],
1151    with_connection_status: Annotated[
1152        bool | None,
1153        Field(
1154            description="If True, include status info for each connection's most recent sync job",
1155            default=False,
1156        ),
1157    ],
1158    failing_connections_only: Annotated[
1159        bool | None,
1160        Field(
1161            description="If True, only return connections with failed/cancelled last sync",
1162            default=False,
1163        ),
1164    ],
1165) -> list[CloudConnectionResult]:
1166    """List all deployed connections in the Airbyte Cloud workspace.
1167
1168    When with_connection_status is True, each connection result will include
1169    information about the most recent sync job status, skipping over any
1170    currently in-progress syncs to find the last completed job.
1171
1172    When failing_connections_only is True, only connections where the most
1173    recent completed sync job failed or was cancelled will be returned.
1174    This implicitly enables with_connection_status.
1175    """
1176    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1177    connections = workspace.list_connections()
1178
1179    # Filter by name if requested
1180    if name_contains:
1181        needle = name_contains.lower()
1182        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1183
1184    # If failing_connections_only is True, implicitly enable with_connection_status
1185    if failing_connections_only:
1186        with_connection_status = True
1187
1188    results: list[CloudConnectionResult] = []
1189
1190    for connection in connections:
1191        last_job_status: str | None = None
1192        last_job_id: int | None = None
1193        last_job_time: str | None = None
1194        currently_running_job_id: int | None = None
1195        currently_running_job_start_time: str | None = None
1196
1197        if with_connection_status:
1198            sync_logs = connection.get_previous_sync_logs(limit=5)
1199            last_completed_job_status = None  # Keep enum for comparison
1200
1201            for sync_result in sync_logs:
1202                job_status = sync_result.get_job_status()
1203
1204                if not sync_result.is_job_complete():
1205                    currently_running_job_id = sync_result.job_id
1206                    currently_running_job_start_time = sync_result.start_time.isoformat()
1207                    continue
1208
1209                last_completed_job_status = job_status
1210                last_job_status = str(job_status.value) if job_status else None
1211                last_job_id = sync_result.job_id
1212                last_job_time = sync_result.start_time.isoformat()
1213                break
1214
1215            if failing_connections_only and (
1216                last_completed_job_status is None
1217                or last_completed_job_status not in FAILED_STATUSES
1218            ):
1219                continue
1220
1221        results.append(
1222            CloudConnectionResult(
1223                id=connection.connection_id,
1224                name=cast(str, connection.name),
1225                url=cast(str, connection.connection_url),
1226                source_id=connection.source_id,
1227                destination_id=connection.destination_id,
1228                last_job_status=last_job_status,
1229                last_job_id=last_job_id,
1230                last_job_time=last_job_time,
1231                currently_running_job_id=currently_running_job_id,
1232                currently_running_job_start_time=currently_running_job_start_time,
1233            )
1234        )
1235
1236        if max_items_limit is not None and len(results) >= max_items_limit:
1237            break
1238
1239    return results
1240
1241
1242def _resolve_organization(
1243    organization_id: str | None,
1244    organization_name: str | None,
1245    *,
1246    api_root: str,
1247    client_id: SecretString | None,
1248    client_secret: SecretString | None,
1249    bearer_token: SecretString | None = None,
1250) -> api_util.models.OrganizationResponse:
1251    """Resolve organization from either ID or exact name match.
1252
1253    Args:
1254        organization_id: The organization ID (if provided directly)
1255        organization_name: The organization name (exact match required)
1256        api_root: The API root URL
1257        client_id: OAuth client ID (optional if bearer_token is provided)
1258        client_secret: OAuth client secret (optional if bearer_token is provided)
1259        bearer_token: Bearer token for authentication (optional if client credentials provided)
1260
1261    Returns:
1262        The resolved OrganizationResponse object
1263
1264    Raises:
1265        PyAirbyteInputError: If neither or both parameters are provided,
1266            or if no organization matches the exact name
1267        AirbyteMissingResourceError: If the organization is not found
1268    """
1269    if organization_id and organization_name:
1270        raise PyAirbyteInputError(
1271            message="Provide either 'organization_id' or 'organization_name', not both."
1272        )
1273    if not organization_id and not organization_name:
1274        raise PyAirbyteInputError(
1275            message="Either 'organization_id' or 'organization_name' must be provided."
1276        )
1277
1278    # Get all organizations for the user
1279    orgs = api_util.list_organizations_for_user(
1280        api_root=api_root,
1281        client_id=client_id,
1282        client_secret=client_secret,
1283        bearer_token=bearer_token,
1284    )
1285
1286    if organization_id:
1287        # Find by ID
1288        matching_orgs = [org for org in orgs if org.organization_id == organization_id]
1289        if not matching_orgs:
1290            raise AirbyteMissingResourceError(
1291                resource_type="organization",
1292                context={
1293                    "organization_id": organization_id,
1294                    "message": f"No organization found with ID '{organization_id}' "
1295                    "for the current user.",
1296                },
1297            )
1298        return matching_orgs[0]
1299
1300    # Find by exact name match (case-sensitive)
1301    matching_orgs = [org for org in orgs if org.organization_name == organization_name]
1302
1303    if not matching_orgs:
1304        raise AirbyteMissingResourceError(
1305            resource_type="organization",
1306            context={
1307                "organization_name": organization_name,
1308                "message": f"No organization found with exact name '{organization_name}' "
1309                "for the current user.",
1310            },
1311        )
1312
1313    if len(matching_orgs) > 1:
1314        raise PyAirbyteInputError(
1315            message=f"Multiple organizations found with name '{organization_name}'. "
1316            "Please use 'organization_id' instead to specify the exact organization."
1317        )
1318
1319    return matching_orgs[0]
1320
1321
1322def _resolve_organization_id(
1323    organization_id: str | None,
1324    organization_name: str | None,
1325    *,
1326    api_root: str,
1327    client_id: SecretString | None,
1328    client_secret: SecretString | None,
1329    bearer_token: SecretString | None = None,
1330) -> str:
1331    """Resolve organization ID from either ID or exact name match.
1332
1333    This is a convenience wrapper around _resolve_organization that returns just the ID.
1334    """
1335    org = _resolve_organization(
1336        organization_id=organization_id,
1337        organization_name=organization_name,
1338        api_root=api_root,
1339        client_id=client_id,
1340        client_secret=client_secret,
1341        bearer_token=bearer_token,
1342    )
1343    return org.organization_id
1344
1345
1346@mcp_tool(
1347    domain="cloud",
1348    read_only=True,
1349    idempotent=True,
1350    open_world=True,
1351    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1352)
1353def list_cloud_workspaces(
1354    *,
1355    organization_id: Annotated[
1356        str | None,
1357        Field(
1358            description="Organization ID. Required if organization_name is not provided.",
1359            default=None,
1360        ),
1361    ],
1362    organization_name: Annotated[
1363        str | None,
1364        Field(
1365            description=(
1366                "Organization name (exact match). " "Required if organization_id is not provided."
1367            ),
1368            default=None,
1369        ),
1370    ],
1371    name_contains: Annotated[
1372        str | None,
1373        Field(
1374            description="Optional substring to filter workspaces by name (server-side filtering)",
1375            default=None,
1376        ),
1377    ],
1378    max_items_limit: Annotated[
1379        int | None,
1380        Field(
1381            description="Optional maximum number of items to return (default: no limit)",
1382            default=None,
1383        ),
1384    ],
1385) -> list[CloudWorkspaceResult]:
1386    """List all workspaces in a specific organization.
1387
1388    Requires either organization_id OR organization_name (exact match) to be provided.
1389    This tool will NOT list workspaces across all organizations - you must specify
1390    which organization to list workspaces from.
1391    """
1392    credentials = resolve_cloud_credentials()
1393
1394    resolved_org_id = _resolve_organization_id(
1395        organization_id=organization_id,
1396        organization_name=organization_name,
1397        api_root=credentials.api_root,
1398        client_id=credentials.client_id,
1399        client_secret=credentials.client_secret,
1400        bearer_token=credentials.bearer_token,
1401    )
1402
1403    workspaces = api_util.list_workspaces_in_organization(
1404        organization_id=resolved_org_id,
1405        api_root=credentials.api_root,
1406        client_id=credentials.client_id,
1407        client_secret=credentials.client_secret,
1408        bearer_token=credentials.bearer_token,
1409        name_contains=name_contains,
1410        max_items_limit=max_items_limit,
1411    )
1412
1413    return [
1414        CloudWorkspaceResult(
1415            workspace_id=ws.get("workspaceId", ""),
1416            workspace_name=ws.get("name", ""),
1417            organization_id=ws.get("organizationId", ""),
1418        )
1419        for ws in workspaces
1420    ]
1421
1422
1423@mcp_tool(
1424    domain="cloud",
1425    read_only=True,
1426    idempotent=True,
1427    open_world=True,
1428    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1429)
1430def describe_cloud_organization(
1431    *,
1432    organization_id: Annotated[
1433        str | None,
1434        Field(
1435            description="Organization ID. Required if organization_name is not provided.",
1436            default=None,
1437        ),
1438    ],
1439    organization_name: Annotated[
1440        str | None,
1441        Field(
1442            description=(
1443                "Organization name (exact match). " "Required if organization_id is not provided."
1444            ),
1445            default=None,
1446        ),
1447    ],
1448) -> CloudOrganizationResult:
1449    """Get details about a specific organization.
1450
1451    Requires either organization_id OR organization_name (exact match) to be provided.
1452    This tool is useful for looking up an organization's ID from its name, or vice versa.
1453    """
1454    credentials = resolve_cloud_credentials()
1455
1456    org = _resolve_organization(
1457        organization_id=organization_id,
1458        organization_name=organization_name,
1459        api_root=credentials.api_root,
1460        client_id=credentials.client_id,
1461        client_secret=credentials.client_secret,
1462        bearer_token=credentials.bearer_token,
1463    )
1464
1465    return CloudOrganizationResult(
1466        id=org.organization_id,
1467        name=org.organization_name,
1468        email=org.email,
1469    )
1470
1471
1472def _get_custom_source_definition_description(
1473    custom_source: CustomCloudSourceDefinition,
1474) -> str:
1475    return "\n".join(
1476        [
1477            f" - Custom Source Name: {custom_source.name}",
1478            f" - Definition ID: {custom_source.definition_id}",
1479            f" - Definition Version: {custom_source.version}",
1480            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1481            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1482        ]
1483    )
1484
1485
1486@mcp_tool(
1487    domain="cloud",
1488    open_world=True,
1489    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1490)
1491def publish_custom_source_definition(
1492    name: Annotated[
1493        str,
1494        Field(description="The name for the custom connector definition."),
1495    ],
1496    *,
1497    workspace_id: Annotated[
1498        str | None,
1499        Field(
1500            description=WORKSPACE_ID_TIP_TEXT,
1501            default=None,
1502        ),
1503    ],
1504    manifest_yaml: Annotated[
1505        str | Path | None,
1506        Field(
1507            description=(
1508                "The Low-code CDK manifest as a YAML string or file path. "
1509                "Required for YAML connectors."
1510            ),
1511            default=None,
1512        ),
1513    ] = None,
1514    unique: Annotated[
1515        bool,
1516        Field(
1517            description="Whether to require a unique name.",
1518            default=True,
1519        ),
1520    ] = True,
1521    pre_validate: Annotated[
1522        bool,
1523        Field(
1524            description="Whether to validate the manifest client-side before publishing.",
1525            default=True,
1526        ),
1527    ] = True,
1528    testing_values: Annotated[
1529        dict | str | None,
1530        Field(
1531            description=(
1532                "Optional testing configuration values for the Builder UI. "
1533                "Can be provided as a JSON object or JSON string. "
1534                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1535                "If provided, these values replace any existing testing values "
1536                "for the connector builder project, allowing immediate test read operations."
1537            ),
1538            default=None,
1539        ),
1540    ],
1541    testing_values_secret_name: Annotated[
1542        str | None,
1543        Field(
1544            description=(
1545                "Optional name of a secret containing testing configuration values "
1546                "in JSON or YAML format. The secret will be resolved by the MCP "
1547                "server and merged into testing_values, with secret values taking "
1548                "precedence. This lets the agent reference secrets without sending "
1549                "raw values as tool arguments."
1550            ),
1551            default=None,
1552        ),
1553    ],
1554) -> str:
1555    """Publish a custom YAML source connector definition to Airbyte Cloud.
1556
1557    Note: Only YAML (declarative) connectors are currently supported.
1558    Docker-based custom sources are not yet available.
1559    """
1560    processed_manifest = manifest_yaml
1561    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1562        processed_manifest = Path(manifest_yaml)
1563
1564    # Resolve testing values from inline config and/or secret
1565    testing_values_dict: dict[str, Any] | None = None
1566    if testing_values is not None or testing_values_secret_name is not None:
1567        testing_values_dict = (
1568            resolve_config(
1569                config=testing_values,
1570                config_secret_name=testing_values_secret_name,
1571            )
1572            or None
1573        )
1574
1575    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1576    custom_source = workspace.publish_custom_source_definition(
1577        name=name,
1578        manifest_yaml=processed_manifest,
1579        unique=unique,
1580        pre_validate=pre_validate,
1581        testing_values=testing_values_dict,
1582    )
1583    register_guid_created_in_session(custom_source.definition_id)
1584    return (
1585        "Successfully published custom YAML source definition:\n"
1586        + _get_custom_source_definition_description(
1587            custom_source=custom_source,
1588        )
1589        + "\n"
1590    )
1591
1592
1593@mcp_tool(
1594    domain="cloud",
1595    read_only=True,
1596    idempotent=True,
1597    open_world=True,
1598)
1599def list_custom_source_definitions(
1600    *,
1601    workspace_id: Annotated[
1602        str | None,
1603        Field(
1604            description=WORKSPACE_ID_TIP_TEXT,
1605            default=None,
1606        ),
1607    ],
1608) -> list[dict[str, Any]]:
1609    """List custom YAML source definitions in the Airbyte Cloud workspace.
1610
1611    Note: Only YAML (declarative) connectors are currently supported.
1612    Docker-based custom sources are not yet available.
1613    """
1614    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1615    definitions = workspace.list_custom_source_definitions(
1616        definition_type="yaml",
1617    )
1618
1619    return [
1620        {
1621            "definition_id": d.definition_id,
1622            "name": d.name,
1623            "version": d.version,
1624            "connector_builder_project_url": d.connector_builder_project_url,
1625        }
1626        for d in definitions
1627    ]
1628
1629
1630@mcp_tool(
1631    domain="cloud",
1632    read_only=True,
1633    idempotent=True,
1634    open_world=True,
1635)
1636def get_custom_source_definition(
1637    definition_id: Annotated[
1638        str,
1639        Field(description="The ID of the custom source definition to retrieve."),
1640    ],
1641    *,
1642    workspace_id: Annotated[
1643        str | None,
1644        Field(
1645            description=WORKSPACE_ID_TIP_TEXT,
1646            default=None,
1647        ),
1648    ],
1649) -> dict[str, Any]:
1650    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1651
1652    Returns the full definition details including the manifest YAML content,
1653    which can be used to inspect or store the connector configuration locally.
1654
1655    Note: Only YAML (declarative) connectors are currently supported.
1656    Docker-based custom sources are not yet available.
1657    """
1658    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1659    definition = workspace.get_custom_source_definition(
1660        definition_id=definition_id,
1661        definition_type="yaml",
1662    )
1663
1664    return {
1665        "definition_id": definition.definition_id,
1666        "name": definition.name,
1667        "version": definition.version,
1668        "connector_builder_project_id": definition.connector_builder_project_id,
1669        "connector_builder_project_url": definition.connector_builder_project_url,
1670        "manifest": definition.manifest,
1671    }
1672
1673
1674@mcp_tool(
1675    domain="cloud",
1676    destructive=True,
1677    open_world=True,
1678)
1679def update_custom_source_definition(
1680    definition_id: Annotated[
1681        str,
1682        Field(description="The ID of the definition to update."),
1683    ],
1684    manifest_yaml: Annotated[
1685        str | Path | None,
1686        Field(
1687            description=(
1688                "New manifest as YAML string or file path. "
1689                "Optional; omit to update only testing values."
1690            ),
1691            default=None,
1692        ),
1693    ] = None,
1694    *,
1695    workspace_id: Annotated[
1696        str | None,
1697        Field(
1698            description=WORKSPACE_ID_TIP_TEXT,
1699            default=None,
1700        ),
1701    ],
1702    pre_validate: Annotated[
1703        bool,
1704        Field(
1705            description="Whether to validate the manifest client-side before updating.",
1706            default=True,
1707        ),
1708    ] = True,
1709    testing_values: Annotated[
1710        dict | str | None,
1711        Field(
1712            description=(
1713                "Optional testing configuration values for the Builder UI. "
1714                "Can be provided as a JSON object or JSON string. "
1715                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1716                "If provided, these values replace any existing testing values "
1717                "for the connector builder project. The entire testing values object "
1718                "is overwritten, so pass the full set of values you want to persist."
1719            ),
1720            default=None,
1721        ),
1722    ],
1723    testing_values_secret_name: Annotated[
1724        str | None,
1725        Field(
1726            description=(
1727                "Optional name of a secret containing testing configuration values "
1728                "in JSON or YAML format. The secret will be resolved by the MCP "
1729                "server and merged into testing_values, with secret values taking "
1730                "precedence. This lets the agent reference secrets without sending "
1731                "raw values as tool arguments."
1732            ),
1733            default=None,
1734        ),
1735    ],
1736) -> str:
1737    """Update a custom YAML source definition in Airbyte Cloud.
1738
1739    Updates the manifest and/or testing values for an existing custom source definition.
1740    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1741    """
1742    check_guid_created_in_session(definition_id)
1743
1744    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1745
1746    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1747        raise PyAirbyteInputError(
1748            message=(
1749                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1750                "must be provided to update a custom source definition."
1751            ),
1752            context={
1753                "definition_id": definition_id,
1754                "workspace_id": workspace.workspace_id,
1755            },
1756        )
1757
1758    processed_manifest: str | Path | None = manifest_yaml
1759    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1760        processed_manifest = Path(manifest_yaml)
1761
1762    # Resolve testing values from inline config and/or secret
1763    testing_values_dict: dict[str, Any] | None = None
1764    if testing_values is not None or testing_values_secret_name is not None:
1765        testing_values_dict = (
1766            resolve_config(
1767                config=testing_values,
1768                config_secret_name=testing_values_secret_name,
1769            )
1770            or None
1771        )
1772
1773    definition = workspace.get_custom_source_definition(
1774        definition_id=definition_id,
1775        definition_type="yaml",
1776    )
1777    custom_source: CustomCloudSourceDefinition = definition
1778
1779    if processed_manifest is not None:
1780        custom_source = definition.update_definition(
1781            manifest_yaml=processed_manifest,
1782            pre_validate=pre_validate,
1783        )
1784
1785    if testing_values_dict is not None:
1786        custom_source.set_testing_values(testing_values_dict)
1787
1788    return (
1789        "Successfully updated custom YAML source definition:\n"
1790        + _get_custom_source_definition_description(
1791            custom_source=custom_source,
1792        )
1793    )
1794
1795
1796@mcp_tool(
1797    domain="cloud",
1798    destructive=True,
1799    open_world=True,
1800)
1801def permanently_delete_custom_source_definition(
1802    definition_id: Annotated[
1803        str,
1804        Field(description="The ID of the custom source definition to delete."),
1805    ],
1806    name: Annotated[
1807        str,
1808        Field(description="The expected name of the custom source definition (for verification)."),
1809    ],
1810    *,
1811    workspace_id: Annotated[
1812        str | None,
1813        Field(
1814            description=WORKSPACE_ID_TIP_TEXT,
1815            default=None,
1816        ),
1817    ],
1818) -> str:
1819    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1820
1821    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1822    (case insensitive).
1823
1824    If the connector does not meet this requirement, the deletion will be rejected with a
1825    helpful error message. Instruct the user to rename the connector appropriately to authorize
1826    the deletion.
1827
1828    The provided name must match the actual name of the definition for the operation to proceed.
1829    This is a safety measure to ensure you are deleting the correct resource.
1830
1831    Note: Only YAML (declarative) connectors are currently supported.
1832    Docker-based custom sources are not yet available.
1833    """
1834    check_guid_created_in_session(definition_id)
1835    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1836    definition = workspace.get_custom_source_definition(
1837        definition_id=definition_id,
1838        definition_type="yaml",
1839    )
1840    actual_name: str = definition.name
1841
1842    # Verify the name matches
1843    if actual_name != name:
1844        raise PyAirbyteInputError(
1845            message=(
1846                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1847                "The provided name must exactly match the definition's actual name. "
1848                "This is a safety measure to prevent accidental deletion."
1849            ),
1850            context={
1851                "definition_id": definition_id,
1852                "expected_name": name,
1853                "actual_name": actual_name,
1854            },
1855        )
1856
1857    definition.permanently_delete(
1858        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1859    )
1860    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
1861
1862
1863@mcp_tool(
1864    domain="cloud",
1865    destructive=True,
1866    open_world=True,
1867    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1868)
1869def permanently_delete_cloud_source(
1870    source_id: Annotated[
1871        str,
1872        Field(description="The ID of the deployed source to delete."),
1873    ],
1874    name: Annotated[
1875        str,
1876        Field(description="The expected name of the source (for verification)."),
1877    ],
1878) -> str:
1879    """Permanently delete a deployed source connector from Airbyte Cloud.
1880
1881    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1882    (case insensitive).
1883
1884    If the source does not meet this requirement, the deletion will be rejected with a
1885    helpful error message. Instruct the user to rename the source appropriately to authorize
1886    the deletion.
1887
1888    The provided name must match the actual name of the source for the operation to proceed.
1889    This is a safety measure to ensure you are deleting the correct resource.
1890    """
1891    check_guid_created_in_session(source_id)
1892    workspace: CloudWorkspace = _get_cloud_workspace()
1893    source = workspace.get_source(source_id=source_id)
1894    actual_name: str = cast(str, source.name)
1895
1896    # Verify the name matches
1897    if actual_name != name:
1898        raise PyAirbyteInputError(
1899            message=(
1900                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1901                "The provided name must exactly match the source's actual name. "
1902                "This is a safety measure to prevent accidental deletion."
1903            ),
1904            context={
1905                "source_id": source_id,
1906                "expected_name": name,
1907                "actual_name": actual_name,
1908            },
1909        )
1910
1911    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1912    workspace.permanently_delete_source(
1913        source=source_id,
1914        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1915    )
1916    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
1917
1918
1919@mcp_tool(
1920    domain="cloud",
1921    destructive=True,
1922    open_world=True,
1923    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1924)
1925def permanently_delete_cloud_destination(
1926    destination_id: Annotated[
1927        str,
1928        Field(description="The ID of the deployed destination to delete."),
1929    ],
1930    name: Annotated[
1931        str,
1932        Field(description="The expected name of the destination (for verification)."),
1933    ],
1934) -> str:
1935    """Permanently delete a deployed destination connector from Airbyte Cloud.
1936
1937    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1938    (case insensitive).
1939
1940    If the destination does not meet this requirement, the deletion will be rejected with a
1941    helpful error message. Instruct the user to rename the destination appropriately to authorize
1942    the deletion.
1943
1944    The provided name must match the actual name of the destination for the operation to proceed.
1945    This is a safety measure to ensure you are deleting the correct resource.
1946    """
1947    check_guid_created_in_session(destination_id)
1948    workspace: CloudWorkspace = _get_cloud_workspace()
1949    destination = workspace.get_destination(destination_id=destination_id)
1950    actual_name: str = cast(str, destination.name)
1951
1952    # Verify the name matches
1953    if actual_name != name:
1954        raise PyAirbyteInputError(
1955            message=(
1956                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1957                "The provided name must exactly match the destination's actual name. "
1958                "This is a safety measure to prevent accidental deletion."
1959            ),
1960            context={
1961                "destination_id": destination_id,
1962                "expected_name": name,
1963                "actual_name": actual_name,
1964            },
1965        )
1966
1967    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1968    workspace.permanently_delete_destination(
1969        destination=destination_id,
1970        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1971    )
1972    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
1973
1974
1975@mcp_tool(
1976    domain="cloud",
1977    destructive=True,
1978    open_world=True,
1979    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1980)
1981def permanently_delete_cloud_connection(
1982    connection_id: Annotated[
1983        str,
1984        Field(description="The ID of the connection to delete."),
1985    ],
1986    name: Annotated[
1987        str,
1988        Field(description="The expected name of the connection (for verification)."),
1989    ],
1990    *,
1991    cascade_delete_source: Annotated[
1992        bool,
1993        Field(
1994            description=(
1995                "Whether to also delete the source connector associated with this connection."
1996            ),
1997            default=False,
1998        ),
1999    ] = False,
2000    cascade_delete_destination: Annotated[
2001        bool,
2002        Field(
2003            description=(
2004                "Whether to also delete the destination connector associated with this connection."
2005            ),
2006            default=False,
2007        ),
2008    ] = False,
2009) -> str:
2010    """Permanently delete a connection from Airbyte Cloud.
2011
2012    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2013    (case insensitive).
2014
2015    If the connection does not meet this requirement, the deletion will be rejected with a
2016    helpful error message. Instruct the user to rename the connection appropriately to authorize
2017    the deletion.
2018
2019    The provided name must match the actual name of the connection for the operation to proceed.
2020    This is a safety measure to ensure you are deleting the correct resource.
2021    """
2022    check_guid_created_in_session(connection_id)
2023    workspace: CloudWorkspace = _get_cloud_workspace()
2024    connection = workspace.get_connection(connection_id=connection_id)
2025    actual_name: str = cast(str, connection.name)
2026
2027    # Verify the name matches
2028    if actual_name != name:
2029        raise PyAirbyteInputError(
2030            message=(
2031                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2032                "The provided name must exactly match the connection's actual name. "
2033                "This is a safety measure to prevent accidental deletion."
2034            ),
2035            context={
2036                "connection_id": connection_id,
2037                "expected_name": name,
2038                "actual_name": actual_name,
2039            },
2040        )
2041
2042    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2043    workspace.permanently_delete_connection(
2044        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2045        connection=connection_id,
2046        cascade_delete_source=cascade_delete_source,
2047        cascade_delete_destination=cascade_delete_destination,
2048    )
2049    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
2050
2051
2052@mcp_tool(
2053    domain="cloud",
2054    open_world=True,
2055    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2056)
2057def rename_cloud_source(
2058    source_id: Annotated[
2059        str,
2060        Field(description="The ID of the deployed source to rename."),
2061    ],
2062    name: Annotated[
2063        str,
2064        Field(description="New name for the source."),
2065    ],
2066    *,
2067    workspace_id: Annotated[
2068        str | None,
2069        Field(
2070            description=WORKSPACE_ID_TIP_TEXT,
2071            default=None,
2072        ),
2073    ],
2074) -> str:
2075    """Rename a deployed source connector on Airbyte Cloud."""
2076    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2077    source = workspace.get_source(source_id=source_id)
2078    source.rename(name=name)
2079    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
2080
2081
2082@mcp_tool(
2083    domain="cloud",
2084    destructive=True,
2085    open_world=True,
2086    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2087)
2088def update_cloud_source_config(
2089    source_id: Annotated[
2090        str,
2091        Field(description="The ID of the deployed source to update."),
2092    ],
2093    config: Annotated[
2094        dict | str,
2095        Field(
2096            description="New configuration for the source connector.",
2097        ),
2098    ],
2099    config_secret_name: Annotated[
2100        str | None,
2101        Field(
2102            description="The name of the secret containing the configuration.",
2103            default=None,
2104        ),
2105    ] = None,
2106    *,
2107    workspace_id: Annotated[
2108        str | None,
2109        Field(
2110            description=WORKSPACE_ID_TIP_TEXT,
2111            default=None,
2112        ),
2113    ],
2114) -> str:
2115    """Update a deployed source connector's configuration on Airbyte Cloud.
2116
2117    This is a destructive operation that can break existing connections if the
2118    configuration is changed incorrectly. Use with caution.
2119    """
2120    check_guid_created_in_session(source_id)
2121    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2122    source = workspace.get_source(source_id=source_id)
2123
2124    config_dict = resolve_config(
2125        config=config,
2126        config_secret_name=config_secret_name,
2127        config_spec_jsonschema=None,  # We don't have the spec here
2128    )
2129
2130    source.update_config(config=config_dict)
2131    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
2132
2133
2134@mcp_tool(
2135    domain="cloud",
2136    open_world=True,
2137    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2138)
2139def rename_cloud_destination(
2140    destination_id: Annotated[
2141        str,
2142        Field(description="The ID of the deployed destination to rename."),
2143    ],
2144    name: Annotated[
2145        str,
2146        Field(description="New name for the destination."),
2147    ],
2148    *,
2149    workspace_id: Annotated[
2150        str | None,
2151        Field(
2152            description=WORKSPACE_ID_TIP_TEXT,
2153            default=None,
2154        ),
2155    ],
2156) -> str:
2157    """Rename a deployed destination connector on Airbyte Cloud."""
2158    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2159    destination = workspace.get_destination(destination_id=destination_id)
2160    destination.rename(name=name)
2161    return (
2162        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2163        f"URL: {destination.connector_url}"
2164    )
2165
2166
2167@mcp_tool(
2168    domain="cloud",
2169    destructive=True,
2170    open_world=True,
2171    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2172)
2173def update_cloud_destination_config(
2174    destination_id: Annotated[
2175        str,
2176        Field(description="The ID of the deployed destination to update."),
2177    ],
2178    config: Annotated[
2179        dict | str,
2180        Field(
2181            description="New configuration for the destination connector.",
2182        ),
2183    ],
2184    config_secret_name: Annotated[
2185        str | None,
2186        Field(
2187            description="The name of the secret containing the configuration.",
2188            default=None,
2189        ),
2190    ],
2191    *,
2192    workspace_id: Annotated[
2193        str | None,
2194        Field(
2195            description=WORKSPACE_ID_TIP_TEXT,
2196            default=None,
2197        ),
2198    ],
2199) -> str:
2200    """Update a deployed destination connector's configuration on Airbyte Cloud.
2201
2202    This is a destructive operation that can break existing connections if the
2203    configuration is changed incorrectly. Use with caution.
2204    """
2205    check_guid_created_in_session(destination_id)
2206    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2207    destination = workspace.get_destination(destination_id=destination_id)
2208
2209    config_dict = resolve_config(
2210        config=config,
2211        config_secret_name=config_secret_name,
2212        config_spec_jsonschema=None,  # We don't have the spec here
2213    )
2214
2215    destination.update_config(config=config_dict)
2216    return (
2217        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2218    )
2219
2220
2221@mcp_tool(
2222    domain="cloud",
2223    open_world=True,
2224    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2225)
2226def rename_cloud_connection(
2227    connection_id: Annotated[
2228        str,
2229        Field(description="The ID of the connection to rename."),
2230    ],
2231    name: Annotated[
2232        str,
2233        Field(description="New name for the connection."),
2234    ],
2235    *,
2236    workspace_id: Annotated[
2237        str | None,
2238        Field(
2239            description=WORKSPACE_ID_TIP_TEXT,
2240            default=None,
2241        ),
2242    ],
2243) -> str:
2244    """Rename a connection on Airbyte Cloud."""
2245    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2246    connection = workspace.get_connection(connection_id=connection_id)
2247    connection.rename(name=name)
2248    return (
2249        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2250        f"URL: {connection.connection_url}"
2251    )
2252
2253
2254@mcp_tool(
2255    domain="cloud",
2256    destructive=True,
2257    open_world=True,
2258    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2259)
2260def set_cloud_connection_table_prefix(
2261    connection_id: Annotated[
2262        str,
2263        Field(description="The ID of the connection to update."),
2264    ],
2265    prefix: Annotated[
2266        str,
2267        Field(description="New table prefix to use when syncing to the destination."),
2268    ],
2269    *,
2270    workspace_id: Annotated[
2271        str | None,
2272        Field(
2273            description=WORKSPACE_ID_TIP_TEXT,
2274            default=None,
2275        ),
2276    ],
2277) -> str:
2278    """Set the table prefix for a connection on Airbyte Cloud.
2279
2280    This is a destructive operation that can break downstream dependencies if the
2281    table prefix is changed incorrectly. Use with caution.
2282    """
2283    check_guid_created_in_session(connection_id)
2284    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2285    connection = workspace.get_connection(connection_id=connection_id)
2286    connection.set_table_prefix(prefix=prefix)
2287    return (
2288        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2289        f"URL: {connection.connection_url}"
2290    )
2291
2292
2293@mcp_tool(
2294    domain="cloud",
2295    destructive=True,
2296    open_world=True,
2297    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2298)
2299def set_cloud_connection_selected_streams(
2300    connection_id: Annotated[
2301        str,
2302        Field(description="The ID of the connection to update."),
2303    ],
2304    stream_names: Annotated[
2305        str | list[str],
2306        Field(
2307            description=(
2308                "The selected stream names to sync within the connection. "
2309                "Must be an explicit stream name or list of streams."
2310            )
2311        ),
2312    ],
2313    *,
2314    workspace_id: Annotated[
2315        str | None,
2316        Field(
2317            description=WORKSPACE_ID_TIP_TEXT,
2318            default=None,
2319        ),
2320    ],
2321) -> str:
2322    """Set the selected streams for a connection on Airbyte Cloud.
2323
2324    This is a destructive operation that can break existing connections if the
2325    stream selection is changed incorrectly. Use with caution.
2326    """
2327    check_guid_created_in_session(connection_id)
2328    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2329    connection = workspace.get_connection(connection_id=connection_id)
2330
2331    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2332    connection.set_selected_streams(stream_names=resolved_streams_list)
2333
2334    return (
2335        f"Successfully set selected streams for connection '{connection_id}' "
2336        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2337    )
2338
2339
2340@mcp_tool(
2341    domain="cloud",
2342    open_world=True,
2343    destructive=True,
2344    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2345)
2346def update_cloud_connection(
2347    connection_id: Annotated[
2348        str,
2349        Field(description="The ID of the connection to update."),
2350    ],
2351    *,
2352    enabled: Annotated[
2353        bool | None,
2354        Field(
2355            description=(
2356                "Set the connection's enabled status. "
2357                "True enables the connection (status='active'), "
2358                "False disables it (status='inactive'). "
2359                "Leave unset to keep the current status."
2360            ),
2361            default=None,
2362        ),
2363    ],
2364    cron_expression: Annotated[
2365        str | None,
2366        Field(
2367            description=(
2368                "A cron expression defining when syncs should run. "
2369                "Examples: '0 0 * * *' (daily at midnight UTC), "
2370                "'0 */6 * * *' (every 6 hours), "
2371                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2372                "Leave unset to keep the current schedule. "
2373                "Cannot be used together with 'manual_schedule'."
2374            ),
2375            default=None,
2376        ),
2377    ],
2378    manual_schedule: Annotated[
2379        bool | None,
2380        Field(
2381            description=(
2382                "Set to True to disable automatic syncs (manual scheduling only). "
2383                "Syncs will only run when manually triggered. "
2384                "Cannot be used together with 'cron_expression'."
2385            ),
2386            default=None,
2387        ),
2388    ],
2389    workspace_id: Annotated[
2390        str | None,
2391        Field(
2392            description=WORKSPACE_ID_TIP_TEXT,
2393            default=None,
2394        ),
2395    ],
2396) -> str:
2397    """Update a connection's settings on Airbyte Cloud.
2398
2399    This tool allows updating multiple connection settings in a single call:
2400    - Enable or disable the connection
2401    - Set a cron schedule for automatic syncs
2402    - Switch to manual scheduling (no automatic syncs)
2403
2404    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2405    parameters are mutually exclusive.
2406    """
2407    check_guid_created_in_session(connection_id)
2408
2409    # Validate that at least one setting is provided
2410    if enabled is None and cron_expression is None and manual_schedule is None:
2411        raise ValueError(
2412            "At least one setting must be provided: 'enabled', 'cron_expression', "
2413            "or 'manual_schedule'."
2414        )
2415
2416    # Validate mutually exclusive schedule options
2417    if cron_expression is not None and manual_schedule is True:
2418        raise ValueError(
2419            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2420            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2421            "for manual-only syncs."
2422        )
2423
2424    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2425    connection = workspace.get_connection(connection_id=connection_id)
2426
2427    changes_made: list[str] = []
2428
2429    # Apply enabled status change
2430    if enabled is not None:
2431        connection.set_enabled(enabled=enabled)
2432        status_str = "enabled" if enabled else "disabled"
2433        changes_made.append(f"status set to '{status_str}'")
2434
2435    # Apply schedule change
2436    if cron_expression is not None:
2437        connection.set_schedule(cron_expression=cron_expression)
2438        changes_made.append(f"schedule set to '{cron_expression}'")
2439    elif manual_schedule is True:
2440        connection.set_manual_schedule()
2441        changes_made.append("schedule set to 'manual'")
2442
2443    changes_summary = ", ".join(changes_made)
2444    return (
2445        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2446        f"URL: {connection.connection_url}"
2447    )
2448
2449
2450@mcp_tool(
2451    domain="cloud",
2452    read_only=True,
2453    idempotent=True,
2454    open_world=True,
2455    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2456)
2457def get_connection_artifact(
2458    connection_id: Annotated[
2459        str,
2460        Field(description="The ID of the Airbyte Cloud connection."),
2461    ],
2462    artifact_type: Annotated[
2463        Literal["state", "catalog"],
2464        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2465    ],
2466    *,
2467    workspace_id: Annotated[
2468        str | None,
2469        Field(
2470            description=WORKSPACE_ID_TIP_TEXT,
2471            default=None,
2472        ),
2473    ],
2474) -> dict[str, Any] | list[dict[str, Any]]:
2475    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2476
2477    Retrieves the specified artifact for a connection:
2478    - 'state': Returns the persisted state for incremental syncs as a list of
2479      stream state objects, or {"ERROR": "..."} if no state is set.
2480    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2481      or {"ERROR": "..."} if not found.
2482    """
2483    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2484    connection = workspace.get_connection(connection_id=connection_id)
2485
2486    if artifact_type == "state":
2487        result = connection.get_state_artifacts()
2488        if result is None:
2489            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2490        return result
2491
2492    # artifact_type == "catalog"
2493    result = connection.get_catalog_artifact()
2494    if result is None:
2495        return {"ERROR": "No catalog found for this connection"}
2496    return result
2497
2498
2499def register_cloud_ops_tools(app: FastMCP) -> None:
2500    """@private Register tools with the FastMCP app.
2501
2502    This is an internal function and should not be called directly.
2503
2504    Tools are filtered based on mode settings:
2505    - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered
2506    - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive
2507      operations are protected by runtime session checks
2508    """
2509    register_tools(app, domain="cloud")
CLOUD_AUTH_TIP_TEXT = 'By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables will be used to authenticate with the Airbyte Cloud API.'
WORKSPACE_ID_TIP_TEXT = 'Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.'
class CloudSourceResult(pydantic.main.BaseModel):
41class CloudSourceResult(BaseModel):
42    """Information about a deployed source connector in Airbyte Cloud."""
43
44    id: str
45    """The source ID."""
46    name: str
47    """Display name of the source."""
48    url: str
49    """Web URL for managing this source in Airbyte Cloud."""

Information about a deployed source connector in Airbyte Cloud.

id: str = PydanticUndefined

The source ID.

name: str = PydanticUndefined

Display name of the source.

url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

class CloudDestinationResult(pydantic.main.BaseModel):
52class CloudDestinationResult(BaseModel):
53    """Information about a deployed destination connector in Airbyte Cloud."""
54
55    id: str
56    """The destination ID."""
57    name: str
58    """Display name of the destination."""
59    url: str
60    """Web URL for managing this destination in Airbyte Cloud."""

Information about a deployed destination connector in Airbyte Cloud.

id: str = PydanticUndefined

The destination ID.

name: str = PydanticUndefined

Display name of the destination.

url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

class CloudConnectionResult(pydantic.main.BaseModel):
63class CloudConnectionResult(BaseModel):
64    """Information about a deployed connection in Airbyte Cloud."""
65
66    id: str
67    """The connection ID."""
68    name: str
69    """Display name of the connection."""
70    url: str
71    """Web URL for managing this connection in Airbyte Cloud."""
72    source_id: str
73    """ID of the source used by this connection."""
74    destination_id: str
75    """ID of the destination used by this connection."""
76    last_job_status: str | None = None
77    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
78    Only populated when with_connection_status=True."""
79    last_job_id: int | None = None
80    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
81    last_job_time: str | None = None
82    """ISO 8601 timestamp of the most recent completed sync.
83    Only populated when with_connection_status=True."""
84    currently_running_job_id: int | None = None
85    """Job ID of a currently running sync, if any.
86    Only populated when with_connection_status=True."""
87    currently_running_job_start_time: str | None = None
88    """ISO 8601 timestamp of when the currently running sync started.
89    Only populated when with_connection_status=True."""

Information about a deployed connection in Airbyte Cloud.

id: str = PydanticUndefined

The connection ID.

name: str = PydanticUndefined

Display name of the connection.

url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

last_job_status: str | None = None

Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.

last_job_id: int | None = None

Job ID of the most recent completed sync. Only populated when with_connection_status=True.

last_job_time: str | None = None

ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.

currently_running_job_id: int | None = None

Job ID of a currently running sync, if any. Only populated when with_connection_status=True.

currently_running_job_start_time: str | None = None

ISO 8601 timestamp of when the currently running sync started. Only populated when with_connection_status=True.

class CloudSourceDetails(pydantic.main.BaseModel):
 92class CloudSourceDetails(BaseModel):
 93    """Detailed information about a deployed source connector in Airbyte Cloud."""
 94
 95    source_id: str
 96    """The source ID."""
 97    source_name: str
 98    """Display name of the source."""
 99    source_url: str
100    """Web URL for managing this source in Airbyte Cloud."""
101    connector_definition_id: str
102    """The connector definition ID (e.g., the ID for 'source-postgres')."""

Detailed information about a deployed source connector in Airbyte Cloud.

source_id: str = PydanticUndefined

The source ID.

source_name: str = PydanticUndefined

Display name of the source.

source_url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'source-postgres').

class CloudDestinationDetails(pydantic.main.BaseModel):
105class CloudDestinationDetails(BaseModel):
106    """Detailed information about a deployed destination connector in Airbyte Cloud."""
107
108    destination_id: str
109    """The destination ID."""
110    destination_name: str
111    """Display name of the destination."""
112    destination_url: str
113    """Web URL for managing this destination in Airbyte Cloud."""
114    connector_definition_id: str
115    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""

Detailed information about a deployed destination connector in Airbyte Cloud.

destination_id: str = PydanticUndefined

The destination ID.

destination_name: str = PydanticUndefined

Display name of the destination.

destination_url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'destination-snowflake').

class CloudConnectionDetails(pydantic.main.BaseModel):
118class CloudConnectionDetails(BaseModel):
119    """Detailed information about a deployed connection in Airbyte Cloud."""
120
121    connection_id: str
122    """The connection ID."""
123    connection_name: str
124    """Display name of the connection."""
125    connection_url: str
126    """Web URL for managing this connection in Airbyte Cloud."""
127    source_id: str
128    """ID of the source used by this connection."""
129    source_name: str
130    """Display name of the source."""
131    destination_id: str
132    """ID of the destination used by this connection."""
133    destination_name: str
134    """Display name of the destination."""
135    selected_streams: list[str]
136    """List of stream names selected for syncing."""
137    table_prefix: str | None
138    """Table prefix applied when syncing to the destination."""

Detailed information about a deployed connection in Airbyte Cloud.

connection_id: str = PydanticUndefined

The connection ID.

connection_name: str = PydanticUndefined

Display name of the connection.

connection_url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

source_name: str = PydanticUndefined

Display name of the source.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

destination_name: str = PydanticUndefined

Display name of the destination.

selected_streams: list[str] = PydanticUndefined

List of stream names selected for syncing.

table_prefix: str | None = PydanticUndefined

Table prefix applied when syncing to the destination.

class CloudOrganizationResult(pydantic.main.BaseModel):
141class CloudOrganizationResult(BaseModel):
142    """Information about an organization in Airbyte Cloud."""
143
144    id: str
145    """The organization ID."""
146    name: str
147    """Display name of the organization."""
148    email: str
149    """Email associated with the organization."""

Information about an organization in Airbyte Cloud.

id: str = PydanticUndefined

The organization ID.

name: str = PydanticUndefined

Display name of the organization.

email: str = PydanticUndefined

Email associated with the organization.

class CloudWorkspaceResult(pydantic.main.BaseModel):
152class CloudWorkspaceResult(BaseModel):
153    """Information about a workspace in Airbyte Cloud."""
154
155    workspace_id: str
156    """The workspace ID."""
157    workspace_name: str
158    """Display name of the workspace."""
159    workspace_url: str | None = None
160    """URL to access the workspace in Airbyte Cloud."""
161    organization_id: str
162    """ID of the organization (requires ORGANIZATION_READER permission)."""
163    organization_name: str | None = None
164    """Name of the organization (requires ORGANIZATION_READER permission)."""

Information about a workspace in Airbyte Cloud.

workspace_id: str = PydanticUndefined

The workspace ID.

workspace_name: str = PydanticUndefined

Display name of the workspace.

workspace_url: str | None = None

URL to access the workspace in Airbyte Cloud.

organization_id: str = PydanticUndefined

ID of the organization (requires ORGANIZATION_READER permission).

organization_name: str | None = None

Name of the organization (requires ORGANIZATION_READER permission).

class LogReadResult(pydantic.main.BaseModel):
167class LogReadResult(BaseModel):
168    """Result of reading sync logs with pagination support."""
169
170    job_id: int
171    """The job ID the logs belong to."""
172    attempt_number: int
173    """The attempt number the logs belong to."""
174    log_text: str
175    """The string containing the log text we are returning."""
176    log_text_start_line: int
177    """1-based line index of the first line returned."""
178    log_text_line_count: int
179    """Count of lines we are returning."""
180    total_log_lines_available: int
181    """Total number of log lines available, shows if any lines were missed due to the limit."""

Result of reading sync logs with pagination support.

job_id: int = PydanticUndefined

The job ID the logs belong to.

attempt_number: int = PydanticUndefined

The attempt number the logs belong to.

log_text: str = PydanticUndefined

The string containing the log text we are returning.

log_text_start_line: int = PydanticUndefined

1-based line index of the first line returned.

log_text_line_count: int = PydanticUndefined

Count of lines we are returning.

total_log_lines_available: int = PydanticUndefined

Total number of log lines available, shows if any lines were missed due to the limit.

class SyncJobResult(pydantic.main.BaseModel):
184class SyncJobResult(BaseModel):
185    """Information about a sync job."""
186
187    job_id: int
188    """The job ID."""
189    status: str
190    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
191    bytes_synced: int
192    """Number of bytes synced in this job."""
193    records_synced: int
194    """Number of records synced in this job."""
195    start_time: str
196    """ISO 8601 timestamp of when the job started."""
197    job_url: str
198    """URL to view the job in Airbyte Cloud."""

Information about a sync job.

job_id: int = PydanticUndefined

The job ID.

status: str = PydanticUndefined

The job status (e.g., 'succeeded', 'failed', 'running', 'pending').

bytes_synced: int = PydanticUndefined

Number of bytes synced in this job.

records_synced: int = PydanticUndefined

Number of records synced in this job.

start_time: str = PydanticUndefined

ISO 8601 timestamp of when the job started.

job_url: str = PydanticUndefined

URL to view the job in Airbyte Cloud.

class SyncJobListResult(pydantic.main.BaseModel):
201class SyncJobListResult(BaseModel):
202    """Result of listing sync jobs with pagination support."""
203
204    jobs: list[SyncJobResult]
205    """List of sync jobs."""
206    jobs_count: int
207    """Number of jobs returned in this response."""
208    jobs_offset: int
209    """Offset used for this request (0 if not specified)."""
210    from_tail: bool
211    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""

Result of listing sync jobs with pagination support.

jobs: list[SyncJobResult] = PydanticUndefined

List of sync jobs.

jobs_count: int = PydanticUndefined

Number of jobs returned in this response.

jobs_offset: int = PydanticUndefined

Offset used for this request (0 if not specified).

from_tail: bool = PydanticUndefined

Whether jobs are ordered newest-first (True) or oldest-first (False).

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_source_to_cloud( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
237@mcp_tool(
238    domain="cloud",
239    open_world=True,
240    extra_help_text=CLOUD_AUTH_TIP_TEXT,
241)
242def deploy_source_to_cloud(
243    source_name: Annotated[
244        str,
245        Field(description="The name to use when deploying the source."),
246    ],
247    source_connector_name: Annotated[
248        str,
249        Field(description="The name of the source connector (e.g., 'source-faker')."),
250    ],
251    *,
252    workspace_id: Annotated[
253        str | None,
254        Field(
255            description=WORKSPACE_ID_TIP_TEXT,
256            default=None,
257        ),
258    ],
259    config: Annotated[
260        dict | str | None,
261        Field(
262            description="The configuration for the source connector.",
263            default=None,
264        ),
265    ],
266    config_secret_name: Annotated[
267        str | None,
268        Field(
269            description="The name of the secret containing the configuration.",
270            default=None,
271        ),
272    ],
273    unique: Annotated[
274        bool,
275        Field(
276            description="Whether to require a unique name.",
277            default=True,
278        ),
279    ],
280) -> str:
281    """Deploy a source connector to Airbyte Cloud."""
282    source = get_source(
283        source_connector_name,
284        no_executor=True,
285    )
286    config_dict = resolve_config(
287        config=config,
288        config_secret_name=config_secret_name,
289        config_spec_jsonschema=source.config_spec,
290    )
291    source.set_config(config_dict, validate=True)
292
293    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
294    deployed_source = workspace.deploy_source(
295        name=source_name,
296        source=source,
297        unique=unique,
298    )
299
300    register_guid_created_in_session(deployed_source.connector_id)
301    return (
302        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
303        f" and URL: {deployed_source.connector_url}"
304    )

Deploy a source connector to Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_destination_to_cloud( destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
307@mcp_tool(
308    domain="cloud",
309    open_world=True,
310    extra_help_text=CLOUD_AUTH_TIP_TEXT,
311)
312def deploy_destination_to_cloud(
313    destination_name: Annotated[
314        str,
315        Field(description="The name to use when deploying the destination."),
316    ],
317    destination_connector_name: Annotated[
318        str,
319        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
320    ],
321    *,
322    workspace_id: Annotated[
323        str | None,
324        Field(
325            description=WORKSPACE_ID_TIP_TEXT,
326            default=None,
327        ),
328    ],
329    config: Annotated[
330        dict | str | None,
331        Field(
332            description="The configuration for the destination connector.",
333            default=None,
334        ),
335    ],
336    config_secret_name: Annotated[
337        str | None,
338        Field(
339            description="The name of the secret containing the configuration.",
340            default=None,
341        ),
342    ],
343    unique: Annotated[
344        bool,
345        Field(
346            description="Whether to require a unique name.",
347            default=True,
348        ),
349    ],
350) -> str:
351    """Deploy a destination connector to Airbyte Cloud."""
352    destination = get_destination(
353        destination_connector_name,
354        no_executor=True,
355    )
356    config_dict = resolve_config(
357        config=config,
358        config_secret_name=config_secret_name,
359        config_spec_jsonschema=destination.config_spec,
360    )
361    destination.set_config(config_dict, validate=True)
362
363    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
364    deployed_destination = workspace.deploy_destination(
365        name=destination_name,
366        destination=destination,
367        unique=unique,
368    )
369
370    register_guid_created_in_session(deployed_destination.connector_id)
371    return (
372        f"Successfully deployed destination '{destination_name}' "
373        f"with ID: {deployed_destination.connector_id}"
374    )

Deploy a destination connector to Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def create_connection_on_cloud( connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
377@mcp_tool(
378    domain="cloud",
379    open_world=True,
380    extra_help_text=CLOUD_AUTH_TIP_TEXT,
381)
382def create_connection_on_cloud(
383    connection_name: Annotated[
384        str,
385        Field(description="The name of the connection."),
386    ],
387    source_id: Annotated[
388        str,
389        Field(description="The ID of the deployed source."),
390    ],
391    destination_id: Annotated[
392        str,
393        Field(description="The ID of the deployed destination."),
394    ],
395    selected_streams: Annotated[
396        str | list[str],
397        Field(
398            description=(
399                "The selected stream names to sync within the connection. "
400                "Must be an explicit stream name or list of streams. "
401                "Cannot be empty or '*'."
402            )
403        ),
404    ],
405    *,
406    workspace_id: Annotated[
407        str | None,
408        Field(
409            description=WORKSPACE_ID_TIP_TEXT,
410            default=None,
411        ),
412    ],
413    table_prefix: Annotated[
414        str | None,
415        Field(
416            description="Optional table prefix to use when syncing to the destination.",
417            default=None,
418        ),
419    ],
420) -> str:
421    """Create a connection between a deployed source and destination on Airbyte Cloud."""
422    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
423    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
424    deployed_connection = workspace.deploy_connection(
425        connection_name=connection_name,
426        source=source_id,
427        destination=destination_id,
428        selected_streams=resolved_streams_list,
429        table_prefix=table_prefix,
430    )
431
432    register_guid_created_in_session(deployed_connection.connection_id)
433    return (
434        f"Successfully created connection '{connection_name}' "
435        f"with ID '{deployed_connection.connection_id}' and "
436        f"URL: {deployed_connection.connection_url}"
437    )

Create a connection between a deployed source and destination on Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def run_cloud_sync( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
440@mcp_tool(
441    domain="cloud",
442    open_world=True,
443    extra_help_text=CLOUD_AUTH_TIP_TEXT,
444)
445def run_cloud_sync(
446    connection_id: Annotated[
447        str,
448        Field(description="The ID of the Airbyte Cloud connection."),
449    ],
450    *,
451    workspace_id: Annotated[
452        str | None,
453        Field(
454            description=WORKSPACE_ID_TIP_TEXT,
455            default=None,
456        ),
457    ],
458    wait: Annotated[
459        bool,
460        Field(
461            description=(
462                "Whether to wait for the sync to complete. Since a sync can take between several "
463                "minutes and several hours, this option is not recommended for most "
464                "scenarios."
465            ),
466            default=False,
467        ),
468    ],
469    wait_timeout: Annotated[
470        int,
471        Field(
472            description="Maximum time to wait for sync completion (seconds).",
473            default=300,
474        ),
475    ],
476) -> str:
477    """Run a sync job on Airbyte Cloud."""
478    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
479    connection = workspace.get_connection(connection_id=connection_id)
480    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
481
482    if wait:
483        status = sync_result.get_job_status()
484        return (
485            f"Sync completed with status: {status}. "
486            f"Job ID is '{sync_result.job_id}' and "
487            f"job URL is: {sync_result.job_url}"
488        )
489    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"

Run a sync job on Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def check_airbyte_cloud_workspace( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudWorkspaceResult:
492@mcp_tool(
493    domain="cloud",
494    read_only=True,
495    idempotent=True,
496    open_world=True,
497    extra_help_text=CLOUD_AUTH_TIP_TEXT,
498)
499def check_airbyte_cloud_workspace(
500    *,
501    workspace_id: Annotated[
502        str | None,
503        Field(
504            description=WORKSPACE_ID_TIP_TEXT,
505            default=None,
506        ),
507    ],
508) -> CloudWorkspaceResult:
509    """Check if we have a valid Airbyte Cloud connection and return workspace info.
510
511    Returns workspace details including workspace ID, name, and organization info.
512    """
513    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
514
515    # Get workspace details from the public API using workspace's credentials
516    workspace_response = api_util.get_workspace(
517        workspace_id=workspace.workspace_id,
518        api_root=workspace.api_root,
519        client_id=workspace.client_id,
520        client_secret=workspace.client_secret,
521        bearer_token=workspace.bearer_token,
522    )
523
524    # Try to get organization info, but fail gracefully if we don't have permissions.
525    # Fetching organization info requires ORGANIZATION_READER permissions on the organization,
526    # which may not be available with workspace-scoped credentials.
527    organization = workspace.get_organization(raise_on_error=False)
528
529    return CloudWorkspaceResult(
530        workspace_id=workspace_response.workspace_id,
531        workspace_name=workspace_response.name,
532        workspace_url=workspace.workspace_url,
533        organization_id=(
534            organization.organization_id
535            if organization
536            else "[unavailable - requires ORGANIZATION_READER permission]"
537        ),
538        organization_name=organization.organization_name if organization else None,
539    )

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace details including workspace ID, name, and organization info.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_noop_destination_to_cloud( name: str = 'No-op Destination', *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], unique: bool = True) -> str:
542@mcp_tool(
543    domain="cloud",
544    open_world=True,
545    extra_help_text=CLOUD_AUTH_TIP_TEXT,
546)
547def deploy_noop_destination_to_cloud(
548    name: str = "No-op Destination",
549    *,
550    workspace_id: Annotated[
551        str | None,
552        Field(
553            description=WORKSPACE_ID_TIP_TEXT,
554            default=None,
555        ),
556    ],
557    unique: bool = True,
558) -> str:
559    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
560    destination = get_noop_destination()
561    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
562    deployed_destination = workspace.deploy_destination(
563        name=name,
564        destination=destination,
565        unique=unique,
566    )
567    register_guid_created_in_session(deployed_destination.connector_id)
568    return (
569        f"Successfully deployed No-op Destination "
570        f"with ID '{deployed_destination.connector_id}' and "
571        f"URL: {deployed_destination.connector_url}"
572    )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_status( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
575@mcp_tool(
576    domain="cloud",
577    read_only=True,
578    idempotent=True,
579    open_world=True,
580    extra_help_text=CLOUD_AUTH_TIP_TEXT,
581)
582def get_cloud_sync_status(
583    connection_id: Annotated[
584        str,
585        Field(
586            description="The ID of the Airbyte Cloud connection.",
587        ),
588    ],
589    job_id: Annotated[
590        int | None,
591        Field(
592            description="Optional job ID. If not provided, the latest job will be used.",
593            default=None,
594        ),
595    ],
596    *,
597    workspace_id: Annotated[
598        str | None,
599        Field(
600            description=WORKSPACE_ID_TIP_TEXT,
601            default=None,
602        ),
603    ],
604    include_attempts: Annotated[
605        bool,
606        Field(
607            description="Whether to include detailed attempts information.",
608            default=False,
609        ),
610    ],
611) -> dict[str, Any]:
612    """Get the status of a sync job from the Airbyte Cloud."""
613    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
614    connection = workspace.get_connection(connection_id=connection_id)
615
616    # If a job ID is provided, get the job by ID.
617    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
618
619    if not sync_result:
620        return {"status": None, "job_id": None, "attempts": []}
621
622    result = {
623        "status": sync_result.get_job_status(),
624        "job_id": sync_result.job_id,
625        "bytes_synced": sync_result.bytes_synced,
626        "records_synced": sync_result.records_synced,
627        "start_time": sync_result.start_time.isoformat(),
628        "job_url": sync_result.job_url,
629        "attempts": [],
630    }
631
632    if include_attempts:
633        attempts = sync_result.get_attempts()
634        result["attempts"] = [
635            {
636                "attempt_number": attempt.attempt_number,
637                "attempt_id": attempt.attempt_id,
638                "status": attempt.status,
639                "bytes_synced": attempt.bytes_synced,
640                "records_synced": attempt.records_synced,
641                "created_at": attempt.created_at.isoformat(),
642            }
643            for attempt in attempts
644        ]
645
646    return result

Get the status of a sync job from the Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_sync_jobs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_jobs: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=20, description='Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.')], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description='When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if `jobs_offset` is not specified. Cannot combine `from_tail=True` with `jobs_offset`.')], jobs_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of jobs to skip from the beginning. Cannot be combined with `from_tail=True`.')]) -> SyncJobListResult:
649@mcp_tool(
650    domain="cloud",
651    read_only=True,
652    idempotent=True,
653    open_world=True,
654    extra_help_text=CLOUD_AUTH_TIP_TEXT,
655)
656def list_cloud_sync_jobs(
657    connection_id: Annotated[
658        str,
659        Field(description="The ID of the Airbyte Cloud connection."),
660    ],
661    *,
662    workspace_id: Annotated[
663        str | None,
664        Field(
665            description=WORKSPACE_ID_TIP_TEXT,
666            default=None,
667        ),
668    ],
669    max_jobs: Annotated[
670        int,
671        Field(
672            description=(
673                "Maximum number of jobs to return. "
674                "Defaults to 20 if not specified. "
675                "Maximum allowed value is 500."
676            ),
677            default=20,
678        ),
679    ],
680    from_tail: Annotated[
681        bool | None,
682        Field(
683            description=(
684                "When True, jobs are ordered newest-first (createdAt DESC). "
685                "When False, jobs are ordered oldest-first (createdAt ASC). "
686                "Defaults to True if `jobs_offset` is not specified. "
687                "Cannot combine `from_tail=True` with `jobs_offset`."
688            ),
689            default=None,
690        ),
691    ],
692    jobs_offset: Annotated[
693        int | None,
694        Field(
695            description=(
696                "Number of jobs to skip from the beginning. "
697                "Cannot be combined with `from_tail=True`."
698            ),
699            default=None,
700        ),
701    ],
702) -> SyncJobListResult:
703    """List sync jobs for a connection with pagination support.
704
705    This tool allows you to retrieve a list of sync jobs for a connection,
706    with control over ordering and pagination. By default, jobs are returned
707    newest-first (from_tail=True).
708    """
709    # Validate that jobs_offset and from_tail are not both set
710    if jobs_offset is not None and from_tail is True:
711        raise PyAirbyteInputError(
712            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
713            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
714        )
715
716    # Default to from_tail=True if neither is specified
717    if from_tail is None and jobs_offset is None:
718        from_tail = True
719    elif from_tail is None:
720        from_tail = False
721
722    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
723    connection = workspace.get_connection(connection_id=connection_id)
724
725    # Cap at 500 to avoid overloading agent context
726    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
727
728    sync_results = connection.get_previous_sync_logs(
729        limit=effective_limit,
730        offset=jobs_offset,
731        from_tail=from_tail,
732    )
733
734    jobs = [
735        SyncJobResult(
736            job_id=sync_result.job_id,
737            status=str(sync_result.get_job_status()),
738            bytes_synced=sync_result.bytes_synced,
739            records_synced=sync_result.records_synced,
740            start_time=sync_result.start_time.isoformat(),
741            job_url=sync_result.job_url,
742        )
743        for sync_result in sync_results
744    ]
745
746    return SyncJobListResult(
747        jobs=jobs,
748        jobs_count=len(jobs),
749        jobs_offset=jobs_offset or 0,
750        from_tail=from_tail,
751    )

List sync jobs for a connection with pagination support.

This tool allows you to retrieve a list of sync jobs for a connection, with control over ordering and pagination. By default, jobs are returned newest-first (from_tail=True).

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_source_connectors( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter sources by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudSourceResult]:
754@mcp_tool(
755    domain="cloud",
756    read_only=True,
757    idempotent=True,
758    open_world=True,
759    extra_help_text=CLOUD_AUTH_TIP_TEXT,
760)
761def list_deployed_cloud_source_connectors(
762    *,
763    workspace_id: Annotated[
764        str | None,
765        Field(
766            description=WORKSPACE_ID_TIP_TEXT,
767            default=None,
768        ),
769    ],
770    name_contains: Annotated[
771        str | None,
772        Field(
773            description="Optional case-insensitive substring to filter sources by name",
774            default=None,
775        ),
776    ],
777    max_items_limit: Annotated[
778        int | None,
779        Field(
780            description="Optional maximum number of items to return (default: no limit)",
781            default=None,
782        ),
783    ],
784) -> list[CloudSourceResult]:
785    """List all deployed source connectors in the Airbyte Cloud workspace."""
786    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
787    sources = workspace.list_sources()
788
789    # Filter by name if requested
790    if name_contains:
791        needle = name_contains.lower()
792        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
793
794    # Apply limit if requested
795    if max_items_limit is not None:
796        sources = sources[:max_items_limit]
797
798    # Note: name and url are guaranteed non-null from list API responses
799    return [
800        CloudSourceResult(
801            id=source.source_id,
802            name=cast(str, source.name),
803            url=cast(str, source.connector_url),
804        )
805        for source in sources
806    ]

List all deployed source connectors in the Airbyte Cloud workspace.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_destination_connectors( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter destinations by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudDestinationResult]:
809@mcp_tool(
810    domain="cloud",
811    read_only=True,
812    idempotent=True,
813    open_world=True,
814    extra_help_text=CLOUD_AUTH_TIP_TEXT,
815)
816def list_deployed_cloud_destination_connectors(
817    *,
818    workspace_id: Annotated[
819        str | None,
820        Field(
821            description=WORKSPACE_ID_TIP_TEXT,
822            default=None,
823        ),
824    ],
825    name_contains: Annotated[
826        str | None,
827        Field(
828            description="Optional case-insensitive substring to filter destinations by name",
829            default=None,
830        ),
831    ],
832    max_items_limit: Annotated[
833        int | None,
834        Field(
835            description="Optional maximum number of items to return (default: no limit)",
836            default=None,
837        ),
838    ],
839) -> list[CloudDestinationResult]:
840    """List all deployed destination connectors in the Airbyte Cloud workspace."""
841    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
842    destinations = workspace.list_destinations()
843
844    # Filter by name if requested
845    if name_contains:
846        needle = name_contains.lower()
847        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
848
849    # Apply limit if requested
850    if max_items_limit is not None:
851        destinations = destinations[:max_items_limit]
852
853    # Note: name and url are guaranteed non-null from list API responses
854    return [
855        CloudDestinationResult(
856            id=destination.destination_id,
857            name=cast(str, destination.name),
858            url=cast(str, destination.connector_url),
859        )
860        for destination in destinations
861    ]

List all deployed destination connectors in the Airbyte Cloud workspace.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the source to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudSourceDetails:
864@mcp_tool(
865    domain="cloud",
866    read_only=True,
867    idempotent=True,
868    open_world=True,
869    extra_help_text=CLOUD_AUTH_TIP_TEXT,
870)
871def describe_cloud_source(
872    source_id: Annotated[
873        str,
874        Field(description="The ID of the source to describe."),
875    ],
876    *,
877    workspace_id: Annotated[
878        str | None,
879        Field(
880            description=WORKSPACE_ID_TIP_TEXT,
881            default=None,
882        ),
883    ],
884) -> CloudSourceDetails:
885    """Get detailed information about a specific deployed source connector."""
886    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
887    source = workspace.get_source(source_id=source_id)
888
889    # Access name property to ensure _connector_info is populated
890    source_name = cast(str, source.name)
891
892    return CloudSourceDetails(
893        source_id=source.source_id,
894        source_name=source_name,
895        source_url=source.connector_url,
896        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
897    )

Get detailed information about a specific deployed source connector.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the destination to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudDestinationDetails:
900@mcp_tool(
901    domain="cloud",
902    read_only=True,
903    idempotent=True,
904    open_world=True,
905    extra_help_text=CLOUD_AUTH_TIP_TEXT,
906)
907def describe_cloud_destination(
908    destination_id: Annotated[
909        str,
910        Field(description="The ID of the destination to describe."),
911    ],
912    *,
913    workspace_id: Annotated[
914        str | None,
915        Field(
916            description=WORKSPACE_ID_TIP_TEXT,
917            default=None,
918        ),
919    ],
920) -> CloudDestinationDetails:
921    """Get detailed information about a specific deployed destination connector."""
922    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
923    destination = workspace.get_destination(destination_id=destination_id)
924
925    # Access name property to ensure _connector_info is populated
926    destination_name = cast(str, destination.name)
927
928    return CloudDestinationDetails(
929        destination_id=destination.destination_id,
930        destination_name=destination_name,
931        destination_url=destination.connector_url,
932        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
933    )

Get detailed information about a specific deployed destination connector.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudConnectionDetails:
936@mcp_tool(
937    domain="cloud",
938    read_only=True,
939    idempotent=True,
940    open_world=True,
941    extra_help_text=CLOUD_AUTH_TIP_TEXT,
942)
943def describe_cloud_connection(
944    connection_id: Annotated[
945        str,
946        Field(description="The ID of the connection to describe."),
947    ],
948    *,
949    workspace_id: Annotated[
950        str | None,
951        Field(
952            description=WORKSPACE_ID_TIP_TEXT,
953            default=None,
954        ),
955    ],
956) -> CloudConnectionDetails:
957    """Get detailed information about a specific deployed connection."""
958    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
959    connection = workspace.get_connection(connection_id=connection_id)
960
961    return CloudConnectionDetails(
962        connection_id=connection.connection_id,
963        connection_name=cast(str, connection.name),
964        connection_url=cast(str, connection.connection_url),
965        source_id=connection.source_id,
966        source_name=cast(str, connection.source.name),
967        destination_id=connection.destination_id,
968        destination_name=cast(str, connection.destination.name),
969        selected_streams=connection.stream_names,
970        table_prefix=connection.table_prefix,
971    )

Get detailed information about a specific deployed connection.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_logs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_lines: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=4000, description="Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.")], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`.")], line_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`.')]) -> LogReadResult:
 974@mcp_tool(
 975    domain="cloud",
 976    read_only=True,
 977    idempotent=True,
 978    open_world=True,
 979    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 980)
 981def get_cloud_sync_logs(
 982    connection_id: Annotated[
 983        str,
 984        Field(description="The ID of the Airbyte Cloud connection."),
 985    ],
 986    job_id: Annotated[
 987        int | None,
 988        Field(description="Optional job ID. If not provided, the latest job will be used."),
 989    ] = None,
 990    attempt_number: Annotated[
 991        int | None,
 992        Field(
 993            description="Optional attempt number. If not provided, the latest attempt will be used."
 994        ),
 995    ] = None,
 996    *,
 997    workspace_id: Annotated[
 998        str | None,
 999        Field(
1000            description=WORKSPACE_ID_TIP_TEXT,
1001            default=None,
1002        ),
1003    ],
1004    max_lines: Annotated[
1005        int,
1006        Field(
1007            description=(
1008                "Maximum number of lines to return. "
1009                "Defaults to 4000 if not specified. "
1010                "If '0' is provided, no limit is applied."
1011            ),
1012            default=4000,
1013        ),
1014    ],
1015    from_tail: Annotated[
1016        bool | None,
1017        Field(
1018            description=(
1019                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1020                "Defaults to True if `line_offset` is not specified. "
1021                "Cannot combine `from_tail=True` with `line_offset`."
1022            ),
1023            default=None,
1024        ),
1025    ],
1026    line_offset: Annotated[
1027        int | None,
1028        Field(
1029            description=(
1030                "Number of lines to skip from the beginning of the logs. "
1031                "Cannot be combined with `from_tail=True`."
1032            ),
1033            default=None,
1034        ),
1035    ],
1036) -> LogReadResult:
1037    """Get the logs from a sync job attempt on Airbyte Cloud."""
1038    # Validate that line_offset and from_tail are not both set
1039    if line_offset is not None and from_tail:
1040        raise PyAirbyteInputError(
1041            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1042            context={"line_offset": line_offset, "from_tail": from_tail},
1043        )
1044
1045    if from_tail is None and line_offset is None:
1046        from_tail = True
1047    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1048    connection = workspace.get_connection(connection_id=connection_id)
1049
1050    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1051
1052    if not sync_result:
1053        raise AirbyteMissingResourceError(
1054            resource_type="sync job",
1055            resource_name_or_id=connection_id,
1056        )
1057
1058    attempts = sync_result.get_attempts()
1059
1060    if not attempts:
1061        raise AirbyteMissingResourceError(
1062            resource_type="sync attempt",
1063            resource_name_or_id=str(sync_result.job_id),
1064        )
1065
1066    if attempt_number is not None:
1067        target_attempt = None
1068        for attempt in attempts:
1069            if attempt.attempt_number == attempt_number:
1070                target_attempt = attempt
1071                break
1072
1073        if target_attempt is None:
1074            raise AirbyteMissingResourceError(
1075                resource_type="sync attempt",
1076                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1077            )
1078    else:
1079        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1080
1081    logs = target_attempt.get_full_log_text()
1082
1083    if not logs:
1084        # Return empty result with zero lines
1085        return LogReadResult(
1086            log_text=(
1087                f"[No logs available for job '{sync_result.job_id}', "
1088                f"attempt {target_attempt.attempt_number}.]"
1089            ),
1090            log_text_start_line=1,
1091            log_text_line_count=0,
1092            total_log_lines_available=0,
1093            job_id=sync_result.job_id,
1094            attempt_number=target_attempt.attempt_number,
1095        )
1096
1097    # Apply line limiting
1098    log_lines = logs.splitlines()
1099    total_lines = len(log_lines)
1100
1101    # Determine effective max_lines (0 means no limit)
1102    effective_max = total_lines if max_lines == 0 else max_lines
1103
1104    # Calculate start_index and slice based on from_tail or line_offset
1105    if from_tail:
1106        start_index = max(0, total_lines - effective_max)
1107        selected_lines = log_lines[start_index:][:effective_max]
1108    else:
1109        start_index = line_offset or 0
1110        selected_lines = log_lines[start_index : start_index + effective_max]
1111
1112    return LogReadResult(
1113        log_text="\n".join(selected_lines),
1114        log_text_start_line=start_index + 1,  # Convert to 1-based index
1115        log_text_line_count=len(selected_lines),
1116        total_log_lines_available=total_lines,
1117        job_id=sync_result.job_id,
1118        attempt_number=target_attempt.attempt_number,
1119    )

Get the logs from a sync job attempt on Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_connections( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter connections by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')], with_connection_status: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description="If True, include status info for each connection's most recent sync job")], failing_connections_only: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description='If True, only return connections with failed/cancelled last sync')]) -> list[CloudConnectionResult]:
1122@mcp_tool(
1123    domain="cloud",
1124    read_only=True,
1125    idempotent=True,
1126    open_world=True,
1127    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1128)
1129def list_deployed_cloud_connections(
1130    *,
1131    workspace_id: Annotated[
1132        str | None,
1133        Field(
1134            description=WORKSPACE_ID_TIP_TEXT,
1135            default=None,
1136        ),
1137    ],
1138    name_contains: Annotated[
1139        str | None,
1140        Field(
1141            description="Optional case-insensitive substring to filter connections by name",
1142            default=None,
1143        ),
1144    ],
1145    max_items_limit: Annotated[
1146        int | None,
1147        Field(
1148            description="Optional maximum number of items to return (default: no limit)",
1149            default=None,
1150        ),
1151    ],
1152    with_connection_status: Annotated[
1153        bool | None,
1154        Field(
1155            description="If True, include status info for each connection's most recent sync job",
1156            default=False,
1157        ),
1158    ],
1159    failing_connections_only: Annotated[
1160        bool | None,
1161        Field(
1162            description="If True, only return connections with failed/cancelled last sync",
1163            default=False,
1164        ),
1165    ],
1166) -> list[CloudConnectionResult]:
1167    """List all deployed connections in the Airbyte Cloud workspace.
1168
1169    When with_connection_status is True, each connection result will include
1170    information about the most recent sync job status, skipping over any
1171    currently in-progress syncs to find the last completed job.
1172
1173    When failing_connections_only is True, only connections where the most
1174    recent completed sync job failed or was cancelled will be returned.
1175    This implicitly enables with_connection_status.
1176    """
1177    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1178    connections = workspace.list_connections()
1179
1180    # Filter by name if requested
1181    if name_contains:
1182        needle = name_contains.lower()
1183        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1184
1185    # If failing_connections_only is True, implicitly enable with_connection_status
1186    if failing_connections_only:
1187        with_connection_status = True
1188
1189    results: list[CloudConnectionResult] = []
1190
1191    for connection in connections:
1192        last_job_status: str | None = None
1193        last_job_id: int | None = None
1194        last_job_time: str | None = None
1195        currently_running_job_id: int | None = None
1196        currently_running_job_start_time: str | None = None
1197
1198        if with_connection_status:
1199            sync_logs = connection.get_previous_sync_logs(limit=5)
1200            last_completed_job_status = None  # Keep enum for comparison
1201
1202            for sync_result in sync_logs:
1203                job_status = sync_result.get_job_status()
1204
1205                if not sync_result.is_job_complete():
1206                    currently_running_job_id = sync_result.job_id
1207                    currently_running_job_start_time = sync_result.start_time.isoformat()
1208                    continue
1209
1210                last_completed_job_status = job_status
1211                last_job_status = str(job_status.value) if job_status else None
1212                last_job_id = sync_result.job_id
1213                last_job_time = sync_result.start_time.isoformat()
1214                break
1215
1216            if failing_connections_only and (
1217                last_completed_job_status is None
1218                or last_completed_job_status not in FAILED_STATUSES
1219            ):
1220                continue
1221
1222        results.append(
1223            CloudConnectionResult(
1224                id=connection.connection_id,
1225                name=cast(str, connection.name),
1226                url=cast(str, connection.connection_url),
1227                source_id=connection.source_id,
1228                destination_id=connection.destination_id,
1229                last_job_status=last_job_status,
1230                last_job_id=last_job_id,
1231                last_job_time=last_job_time,
1232                currently_running_job_id=currently_running_job_id,
1233                currently_running_job_start_time=currently_running_job_start_time,
1234            )
1235        )
1236
1237        if max_items_limit is not None and len(results) >= max_items_limit:
1238            break
1239
1240    return results

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include information about the most recent sync job status, skipping over any currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most recent completed sync job failed or was cancelled will be returned. This implicitly enables with_connection_status.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_workspaces( *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional substring to filter workspaces by name (server-side filtering)')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudWorkspaceResult]:
1347@mcp_tool(
1348    domain="cloud",
1349    read_only=True,
1350    idempotent=True,
1351    open_world=True,
1352    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1353)
1354def list_cloud_workspaces(
1355    *,
1356    organization_id: Annotated[
1357        str | None,
1358        Field(
1359            description="Organization ID. Required if organization_name is not provided.",
1360            default=None,
1361        ),
1362    ],
1363    organization_name: Annotated[
1364        str | None,
1365        Field(
1366            description=(
1367                "Organization name (exact match). " "Required if organization_id is not provided."
1368            ),
1369            default=None,
1370        ),
1371    ],
1372    name_contains: Annotated[
1373        str | None,
1374        Field(
1375            description="Optional substring to filter workspaces by name (server-side filtering)",
1376            default=None,
1377        ),
1378    ],
1379    max_items_limit: Annotated[
1380        int | None,
1381        Field(
1382            description="Optional maximum number of items to return (default: no limit)",
1383            default=None,
1384        ),
1385    ],
1386) -> list[CloudWorkspaceResult]:
1387    """List all workspaces in a specific organization.
1388
1389    Requires either organization_id OR organization_name (exact match) to be provided.
1390    This tool will NOT list workspaces across all organizations - you must specify
1391    which organization to list workspaces from.
1392    """
1393    credentials = resolve_cloud_credentials()
1394
1395    resolved_org_id = _resolve_organization_id(
1396        organization_id=organization_id,
1397        organization_name=organization_name,
1398        api_root=credentials.api_root,
1399        client_id=credentials.client_id,
1400        client_secret=credentials.client_secret,
1401        bearer_token=credentials.bearer_token,
1402    )
1403
1404    workspaces = api_util.list_workspaces_in_organization(
1405        organization_id=resolved_org_id,
1406        api_root=credentials.api_root,
1407        client_id=credentials.client_id,
1408        client_secret=credentials.client_secret,
1409        bearer_token=credentials.bearer_token,
1410        name_contains=name_contains,
1411        max_items_limit=max_items_limit,
1412    )
1413
1414    return [
1415        CloudWorkspaceResult(
1416            workspace_id=ws.get("workspaceId", ""),
1417            workspace_name=ws.get("name", ""),
1418            organization_id=ws.get("organizationId", ""),
1419        )
1420        for ws in workspaces
1421    ]

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided. This tool will NOT list workspaces across all organizations - you must specify which organization to list workspaces from.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_organization( *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')]) -> CloudOrganizationResult:
1424@mcp_tool(
1425    domain="cloud",
1426    read_only=True,
1427    idempotent=True,
1428    open_world=True,
1429    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1430)
1431def describe_cloud_organization(
1432    *,
1433    organization_id: Annotated[
1434        str | None,
1435        Field(
1436            description="Organization ID. Required if organization_name is not provided.",
1437            default=None,
1438        ),
1439    ],
1440    organization_name: Annotated[
1441        str | None,
1442        Field(
1443            description=(
1444                "Organization name (exact match). " "Required if organization_id is not provided."
1445            ),
1446            default=None,
1447        ),
1448    ],
1449) -> CloudOrganizationResult:
1450    """Get details about a specific organization.
1451
1452    Requires either organization_id OR organization_name (exact match) to be provided.
1453    This tool is useful for looking up an organization's ID from its name, or vice versa.
1454    """
1455    credentials = resolve_cloud_credentials()
1456
1457    org = _resolve_organization(
1458        organization_id=organization_id,
1459        organization_name=organization_name,
1460        api_root=credentials.api_root,
1461        client_id=credentials.client_id,
1462        client_secret=credentials.client_secret,
1463        bearer_token=credentials.bearer_token,
1464    )
1465
1466    return CloudOrganizationResult(
1467        id=org.organization_id,
1468        name=org.organization_name,
1469        email=org.email,
1470    )

Get details about a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided. This tool is useful for looking up an organization's ID from its name, or vice versa.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def publish_custom_source_definition( name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name for the custom connector definition.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors.')] = None, unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')] = True, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before publishing.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1487@mcp_tool(
1488    domain="cloud",
1489    open_world=True,
1490    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1491)
1492def publish_custom_source_definition(
1493    name: Annotated[
1494        str,
1495        Field(description="The name for the custom connector definition."),
1496    ],
1497    *,
1498    workspace_id: Annotated[
1499        str | None,
1500        Field(
1501            description=WORKSPACE_ID_TIP_TEXT,
1502            default=None,
1503        ),
1504    ],
1505    manifest_yaml: Annotated[
1506        str | Path | None,
1507        Field(
1508            description=(
1509                "The Low-code CDK manifest as a YAML string or file path. "
1510                "Required for YAML connectors."
1511            ),
1512            default=None,
1513        ),
1514    ] = None,
1515    unique: Annotated[
1516        bool,
1517        Field(
1518            description="Whether to require a unique name.",
1519            default=True,
1520        ),
1521    ] = True,
1522    pre_validate: Annotated[
1523        bool,
1524        Field(
1525            description="Whether to validate the manifest client-side before publishing.",
1526            default=True,
1527        ),
1528    ] = True,
1529    testing_values: Annotated[
1530        dict | str | None,
1531        Field(
1532            description=(
1533                "Optional testing configuration values for the Builder UI. "
1534                "Can be provided as a JSON object or JSON string. "
1535                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1536                "If provided, these values replace any existing testing values "
1537                "for the connector builder project, allowing immediate test read operations."
1538            ),
1539            default=None,
1540        ),
1541    ],
1542    testing_values_secret_name: Annotated[
1543        str | None,
1544        Field(
1545            description=(
1546                "Optional name of a secret containing testing configuration values "
1547                "in JSON or YAML format. The secret will be resolved by the MCP "
1548                "server and merged into testing_values, with secret values taking "
1549                "precedence. This lets the agent reference secrets without sending "
1550                "raw values as tool arguments."
1551            ),
1552            default=None,
1553        ),
1554    ],
1555) -> str:
1556    """Publish a custom YAML source connector definition to Airbyte Cloud.
1557
1558    Note: Only YAML (declarative) connectors are currently supported.
1559    Docker-based custom sources are not yet available.
1560    """
1561    processed_manifest = manifest_yaml
1562    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1563        processed_manifest = Path(manifest_yaml)
1564
1565    # Resolve testing values from inline config and/or secret
1566    testing_values_dict: dict[str, Any] | None = None
1567    if testing_values is not None or testing_values_secret_name is not None:
1568        testing_values_dict = (
1569            resolve_config(
1570                config=testing_values,
1571                config_secret_name=testing_values_secret_name,
1572            )
1573            or None
1574        )
1575
1576    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1577    custom_source = workspace.publish_custom_source_definition(
1578        name=name,
1579        manifest_yaml=processed_manifest,
1580        unique=unique,
1581        pre_validate=pre_validate,
1582        testing_values=testing_values_dict,
1583    )
1584    register_guid_created_in_session(custom_source.definition_id)
1585    return (
1586        "Successfully published custom YAML source definition:\n"
1587        + _get_custom_source_definition_description(
1588            custom_source=custom_source,
1589        )
1590        + "\n"
1591    )

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_custom_source_definitions( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> list[dict[str, typing.Any]]:
1594@mcp_tool(
1595    domain="cloud",
1596    read_only=True,
1597    idempotent=True,
1598    open_world=True,
1599)
1600def list_custom_source_definitions(
1601    *,
1602    workspace_id: Annotated[
1603        str | None,
1604        Field(
1605            description=WORKSPACE_ID_TIP_TEXT,
1606            default=None,
1607        ),
1608    ],
1609) -> list[dict[str, Any]]:
1610    """List custom YAML source definitions in the Airbyte Cloud workspace.
1611
1612    Note: Only YAML (declarative) connectors are currently supported.
1613    Docker-based custom sources are not yet available.
1614    """
1615    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1616    definitions = workspace.list_custom_source_definitions(
1617        definition_type="yaml",
1618    )
1619
1620    return [
1621        {
1622            "definition_id": d.definition_id,
1623            "name": d.name,
1624            "version": d.version,
1625            "connector_builder_project_url": d.connector_builder_project_url,
1626        }
1627        for d in definitions
1628    ]

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def get_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to retrieve.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any]:
1631@mcp_tool(
1632    domain="cloud",
1633    read_only=True,
1634    idempotent=True,
1635    open_world=True,
1636)
1637def get_custom_source_definition(
1638    definition_id: Annotated[
1639        str,
1640        Field(description="The ID of the custom source definition to retrieve."),
1641    ],
1642    *,
1643    workspace_id: Annotated[
1644        str | None,
1645        Field(
1646            description=WORKSPACE_ID_TIP_TEXT,
1647            default=None,
1648        ),
1649    ],
1650) -> dict[str, Any]:
1651    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1652
1653    Returns the full definition details including the manifest YAML content,
1654    which can be used to inspect or store the connector configuration locally.
1655
1656    Note: Only YAML (declarative) connectors are currently supported.
1657    Docker-based custom sources are not yet available.
1658    """
1659    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1660    definition = workspace.get_custom_source_definition(
1661        definition_id=definition_id,
1662        definition_type="yaml",
1663    )
1664
1665    return {
1666        "definition_id": definition.definition_id,
1667        "name": definition.name,
1668        "version": definition.version,
1669        "connector_builder_project_id": definition.connector_builder_project_id,
1670        "connector_builder_project_url": definition.connector_builder_project_url,
1671        "manifest": definition.manifest,
1672    }

Get a custom YAML source definition from Airbyte Cloud, including its manifest.

Returns the full definition details including the manifest YAML content, which can be used to inspect or store the connector configuration locally.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def update_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the definition to update.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='New manifest as YAML string or file path. Optional; omit to update only testing values.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before updating.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1675@mcp_tool(
1676    domain="cloud",
1677    destructive=True,
1678    open_world=True,
1679)
1680def update_custom_source_definition(
1681    definition_id: Annotated[
1682        str,
1683        Field(description="The ID of the definition to update."),
1684    ],
1685    manifest_yaml: Annotated[
1686        str | Path | None,
1687        Field(
1688            description=(
1689                "New manifest as YAML string or file path. "
1690                "Optional; omit to update only testing values."
1691            ),
1692            default=None,
1693        ),
1694    ] = None,
1695    *,
1696    workspace_id: Annotated[
1697        str | None,
1698        Field(
1699            description=WORKSPACE_ID_TIP_TEXT,
1700            default=None,
1701        ),
1702    ],
1703    pre_validate: Annotated[
1704        bool,
1705        Field(
1706            description="Whether to validate the manifest client-side before updating.",
1707            default=True,
1708        ),
1709    ] = True,
1710    testing_values: Annotated[
1711        dict | str | None,
1712        Field(
1713            description=(
1714                "Optional testing configuration values for the Builder UI. "
1715                "Can be provided as a JSON object or JSON string. "
1716                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1717                "If provided, these values replace any existing testing values "
1718                "for the connector builder project. The entire testing values object "
1719                "is overwritten, so pass the full set of values you want to persist."
1720            ),
1721            default=None,
1722        ),
1723    ],
1724    testing_values_secret_name: Annotated[
1725        str | None,
1726        Field(
1727            description=(
1728                "Optional name of a secret containing testing configuration values "
1729                "in JSON or YAML format. The secret will be resolved by the MCP "
1730                "server and merged into testing_values, with secret values taking "
1731                "precedence. This lets the agent reference secrets without sending "
1732                "raw values as tool arguments."
1733            ),
1734            default=None,
1735        ),
1736    ],
1737) -> str:
1738    """Update a custom YAML source definition in Airbyte Cloud.
1739
1740    Updates the manifest and/or testing values for an existing custom source definition.
1741    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1742    """
1743    check_guid_created_in_session(definition_id)
1744
1745    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1746
1747    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1748        raise PyAirbyteInputError(
1749            message=(
1750                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1751                "must be provided to update a custom source definition."
1752            ),
1753            context={
1754                "definition_id": definition_id,
1755                "workspace_id": workspace.workspace_id,
1756            },
1757        )
1758
1759    processed_manifest: str | Path | None = manifest_yaml
1760    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1761        processed_manifest = Path(manifest_yaml)
1762
1763    # Resolve testing values from inline config and/or secret
1764    testing_values_dict: dict[str, Any] | None = None
1765    if testing_values is not None or testing_values_secret_name is not None:
1766        testing_values_dict = (
1767            resolve_config(
1768                config=testing_values,
1769                config_secret_name=testing_values_secret_name,
1770            )
1771            or None
1772        )
1773
1774    definition = workspace.get_custom_source_definition(
1775        definition_id=definition_id,
1776        definition_type="yaml",
1777    )
1778    custom_source: CustomCloudSourceDefinition = definition
1779
1780    if processed_manifest is not None:
1781        custom_source = definition.update_definition(
1782            manifest_yaml=processed_manifest,
1783            pre_validate=pre_validate,
1784        )
1785
1786    if testing_values_dict is not None:
1787        custom_source.set_testing_values(testing_values_dict)
1788
1789    return (
1790        "Successfully updated custom YAML source definition:\n"
1791        + _get_custom_source_definition_description(
1792            custom_source=custom_source,
1793        )
1794    )

Update a custom YAML source definition in Airbyte Cloud.

Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def permanently_delete_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the custom source definition (for verification).')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1797@mcp_tool(
1798    domain="cloud",
1799    destructive=True,
1800    open_world=True,
1801)
1802def permanently_delete_custom_source_definition(
1803    definition_id: Annotated[
1804        str,
1805        Field(description="The ID of the custom source definition to delete."),
1806    ],
1807    name: Annotated[
1808        str,
1809        Field(description="The expected name of the custom source definition (for verification)."),
1810    ],
1811    *,
1812    workspace_id: Annotated[
1813        str | None,
1814        Field(
1815            description=WORKSPACE_ID_TIP_TEXT,
1816            default=None,
1817        ),
1818    ],
1819) -> str:
1820    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1821
1822    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1823    (case insensitive).
1824
1825    If the connector does not meet this requirement, the deletion will be rejected with a
1826    helpful error message. Instruct the user to rename the connector appropriately to authorize
1827    the deletion.
1828
1829    The provided name must match the actual name of the definition for the operation to proceed.
1830    This is a safety measure to ensure you are deleting the correct resource.
1831
1832    Note: Only YAML (declarative) connectors are currently supported.
1833    Docker-based custom sources are not yet available.
1834    """
1835    check_guid_created_in_session(definition_id)
1836    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1837    definition = workspace.get_custom_source_definition(
1838        definition_id=definition_id,
1839        definition_type="yaml",
1840    )
1841    actual_name: str = definition.name
1842
1843    # Verify the name matches
1844    if actual_name != name:
1845        raise PyAirbyteInputError(
1846            message=(
1847                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1848                "The provided name must exactly match the definition's actual name. "
1849                "This is a safety measure to prevent accidental deletion."
1850            ),
1851            context={
1852                "definition_id": definition_id,
1853                "expected_name": name,
1854                "actual_name": actual_name,
1855            },
1856        )
1857
1858    definition.permanently_delete(
1859        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1860    )
1861    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the source (for verification).')]) -> str:
1864@mcp_tool(
1865    domain="cloud",
1866    destructive=True,
1867    open_world=True,
1868    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1869)
1870def permanently_delete_cloud_source(
1871    source_id: Annotated[
1872        str,
1873        Field(description="The ID of the deployed source to delete."),
1874    ],
1875    name: Annotated[
1876        str,
1877        Field(description="The expected name of the source (for verification)."),
1878    ],
1879) -> str:
1880    """Permanently delete a deployed source connector from Airbyte Cloud.
1881
1882    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1883    (case insensitive).
1884
1885    If the source does not meet this requirement, the deletion will be rejected with a
1886    helpful error message. Instruct the user to rename the source appropriately to authorize
1887    the deletion.
1888
1889    The provided name must match the actual name of the source for the operation to proceed.
1890    This is a safety measure to ensure you are deleting the correct resource.
1891    """
1892    check_guid_created_in_session(source_id)
1893    workspace: CloudWorkspace = _get_cloud_workspace()
1894    source = workspace.get_source(source_id=source_id)
1895    actual_name: str = cast(str, source.name)
1896
1897    # Verify the name matches
1898    if actual_name != name:
1899        raise PyAirbyteInputError(
1900            message=(
1901                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1902                "The provided name must exactly match the source's actual name. "
1903                "This is a safety measure to prevent accidental deletion."
1904            ),
1905            context={
1906                "source_id": source_id,
1907                "expected_name": name,
1908                "actual_name": actual_name,
1909            },
1910        )
1911
1912    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1913    workspace.permanently_delete_source(
1914        source=source_id,
1915        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1916    )
1917    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" (case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the source appropriately to authorize the deletion.

The provided name must match the actual name of the source for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the destination (for verification).')]) -> str:
1920@mcp_tool(
1921    domain="cloud",
1922    destructive=True,
1923    open_world=True,
1924    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1925)
1926def permanently_delete_cloud_destination(
1927    destination_id: Annotated[
1928        str,
1929        Field(description="The ID of the deployed destination to delete."),
1930    ],
1931    name: Annotated[
1932        str,
1933        Field(description="The expected name of the destination (for verification)."),
1934    ],
1935) -> str:
1936    """Permanently delete a deployed destination connector from Airbyte Cloud.
1937
1938    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1939    (case insensitive).
1940
1941    If the destination does not meet this requirement, the deletion will be rejected with a
1942    helpful error message. Instruct the user to rename the destination appropriately to authorize
1943    the deletion.
1944
1945    The provided name must match the actual name of the destination for the operation to proceed.
1946    This is a safety measure to ensure you are deleting the correct resource.
1947    """
1948    check_guid_created_in_session(destination_id)
1949    workspace: CloudWorkspace = _get_cloud_workspace()
1950    destination = workspace.get_destination(destination_id=destination_id)
1951    actual_name: str = cast(str, destination.name)
1952
1953    # Verify the name matches
1954    if actual_name != name:
1955        raise PyAirbyteInputError(
1956            message=(
1957                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1958                "The provided name must exactly match the destination's actual name. "
1959                "This is a safety measure to prevent accidental deletion."
1960            ),
1961            context={
1962                "destination_id": destination_id,
1963                "expected_name": name,
1964                "actual_name": actual_name,
1965            },
1966        )
1967
1968    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1969    workspace.permanently_delete_destination(
1970        destination=destination_id,
1971        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1972    )
1973    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" (case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the destination appropriately to authorize the deletion.

The provided name must match the actual name of the destination for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the connection (for verification).')], *, cascade_delete_source: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the source connector associated with this connection.')] = False, cascade_delete_destination: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the destination connector associated with this connection.')] = False) -> str:
1976@mcp_tool(
1977    domain="cloud",
1978    destructive=True,
1979    open_world=True,
1980    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1981)
1982def permanently_delete_cloud_connection(
1983    connection_id: Annotated[
1984        str,
1985        Field(description="The ID of the connection to delete."),
1986    ],
1987    name: Annotated[
1988        str,
1989        Field(description="The expected name of the connection (for verification)."),
1990    ],
1991    *,
1992    cascade_delete_source: Annotated[
1993        bool,
1994        Field(
1995            description=(
1996                "Whether to also delete the source connector associated with this connection."
1997            ),
1998            default=False,
1999        ),
2000    ] = False,
2001    cascade_delete_destination: Annotated[
2002        bool,
2003        Field(
2004            description=(
2005                "Whether to also delete the destination connector associated with this connection."
2006            ),
2007            default=False,
2008        ),
2009    ] = False,
2010) -> str:
2011    """Permanently delete a connection from Airbyte Cloud.
2012
2013    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2014    (case insensitive).
2015
2016    If the connection does not meet this requirement, the deletion will be rejected with a
2017    helpful error message. Instruct the user to rename the connection appropriately to authorize
2018    the deletion.
2019
2020    The provided name must match the actual name of the connection for the operation to proceed.
2021    This is a safety measure to ensure you are deleting the correct resource.
2022    """
2023    check_guid_created_in_session(connection_id)
2024    workspace: CloudWorkspace = _get_cloud_workspace()
2025    connection = workspace.get_connection(connection_id=connection_id)
2026    actual_name: str = cast(str, connection.name)
2027
2028    # Verify the name matches
2029    if actual_name != name:
2030        raise PyAirbyteInputError(
2031            message=(
2032                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2033                "The provided name must exactly match the connection's actual name. "
2034                "This is a safety measure to prevent accidental deletion."
2035            ),
2036            context={
2037                "connection_id": connection_id,
2038                "expected_name": name,
2039                "actual_name": actual_name,
2040            },
2041        )
2042
2043    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2044    workspace.permanently_delete_connection(
2045        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2046        connection=connection_id,
2047        cascade_delete_source=cascade_delete_source,
2048        cascade_delete_destination=cascade_delete_destination,
2049    )
2050    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" (case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connection appropriately to authorize the deletion.

The provided name must match the actual name of the connection for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the source.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2053@mcp_tool(
2054    domain="cloud",
2055    open_world=True,
2056    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2057)
2058def rename_cloud_source(
2059    source_id: Annotated[
2060        str,
2061        Field(description="The ID of the deployed source to rename."),
2062    ],
2063    name: Annotated[
2064        str,
2065        Field(description="New name for the source."),
2066    ],
2067    *,
2068    workspace_id: Annotated[
2069        str | None,
2070        Field(
2071            description=WORKSPACE_ID_TIP_TEXT,
2072            default=None,
2073        ),
2074    ],
2075) -> str:
2076    """Rename a deployed source connector on Airbyte Cloud."""
2077    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2078    source = workspace.get_source(source_id=source_id)
2079    source.rename(name=name)
2080    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"

Rename a deployed source connector on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_source_config( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2083@mcp_tool(
2084    domain="cloud",
2085    destructive=True,
2086    open_world=True,
2087    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2088)
2089def update_cloud_source_config(
2090    source_id: Annotated[
2091        str,
2092        Field(description="The ID of the deployed source to update."),
2093    ],
2094    config: Annotated[
2095        dict | str,
2096        Field(
2097            description="New configuration for the source connector.",
2098        ),
2099    ],
2100    config_secret_name: Annotated[
2101        str | None,
2102        Field(
2103            description="The name of the secret containing the configuration.",
2104            default=None,
2105        ),
2106    ] = None,
2107    *,
2108    workspace_id: Annotated[
2109        str | None,
2110        Field(
2111            description=WORKSPACE_ID_TIP_TEXT,
2112            default=None,
2113        ),
2114    ],
2115) -> str:
2116    """Update a deployed source connector's configuration on Airbyte Cloud.
2117
2118    This is a destructive operation that can break existing connections if the
2119    configuration is changed incorrectly. Use with caution.
2120    """
2121    check_guid_created_in_session(source_id)
2122    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2123    source = workspace.get_source(source_id=source_id)
2124
2125    config_dict = resolve_config(
2126        config=config,
2127        config_secret_name=config_secret_name,
2128        config_spec_jsonschema=None,  # We don't have the spec here
2129    )
2130
2131    source.update_config(config=config_dict)
2132    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2135@mcp_tool(
2136    domain="cloud",
2137    open_world=True,
2138    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2139)
2140def rename_cloud_destination(
2141    destination_id: Annotated[
2142        str,
2143        Field(description="The ID of the deployed destination to rename."),
2144    ],
2145    name: Annotated[
2146        str,
2147        Field(description="New name for the destination."),
2148    ],
2149    *,
2150    workspace_id: Annotated[
2151        str | None,
2152        Field(
2153            description=WORKSPACE_ID_TIP_TEXT,
2154            default=None,
2155        ),
2156    ],
2157) -> str:
2158    """Rename a deployed destination connector on Airbyte Cloud."""
2159    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2160    destination = workspace.get_destination(destination_id=destination_id)
2161    destination.rename(name=name)
2162    return (
2163        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2164        f"URL: {destination.connector_url}"
2165    )

Rename a deployed destination connector on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_destination_config( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2168@mcp_tool(
2169    domain="cloud",
2170    destructive=True,
2171    open_world=True,
2172    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2173)
2174def update_cloud_destination_config(
2175    destination_id: Annotated[
2176        str,
2177        Field(description="The ID of the deployed destination to update."),
2178    ],
2179    config: Annotated[
2180        dict | str,
2181        Field(
2182            description="New configuration for the destination connector.",
2183        ),
2184    ],
2185    config_secret_name: Annotated[
2186        str | None,
2187        Field(
2188            description="The name of the secret containing the configuration.",
2189            default=None,
2190        ),
2191    ],
2192    *,
2193    workspace_id: Annotated[
2194        str | None,
2195        Field(
2196            description=WORKSPACE_ID_TIP_TEXT,
2197            default=None,
2198        ),
2199    ],
2200) -> str:
2201    """Update a deployed destination connector's configuration on Airbyte Cloud.
2202
2203    This is a destructive operation that can break existing connections if the
2204    configuration is changed incorrectly. Use with caution.
2205    """
2206    check_guid_created_in_session(destination_id)
2207    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2208    destination = workspace.get_destination(destination_id=destination_id)
2209
2210    config_dict = resolve_config(
2211        config=config,
2212        config_secret_name=config_secret_name,
2213        config_spec_jsonschema=None,  # We don't have the spec here
2214    )
2215
2216    destination.update_config(config=config_dict)
2217    return (
2218        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2219    )

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2222@mcp_tool(
2223    domain="cloud",
2224    open_world=True,
2225    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2226)
2227def rename_cloud_connection(
2228    connection_id: Annotated[
2229        str,
2230        Field(description="The ID of the connection to rename."),
2231    ],
2232    name: Annotated[
2233        str,
2234        Field(description="New name for the connection."),
2235    ],
2236    *,
2237    workspace_id: Annotated[
2238        str | None,
2239        Field(
2240            description=WORKSPACE_ID_TIP_TEXT,
2241            default=None,
2242        ),
2243    ],
2244) -> str:
2245    """Rename a connection on Airbyte Cloud."""
2246    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2247    connection = workspace.get_connection(connection_id=connection_id)
2248    connection.rename(name=name)
2249    return (
2250        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2251        f"URL: {connection.connection_url}"
2252    )

Rename a connection on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_table_prefix( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], prefix: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New table prefix to use when syncing to the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2255@mcp_tool(
2256    domain="cloud",
2257    destructive=True,
2258    open_world=True,
2259    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2260)
2261def set_cloud_connection_table_prefix(
2262    connection_id: Annotated[
2263        str,
2264        Field(description="The ID of the connection to update."),
2265    ],
2266    prefix: Annotated[
2267        str,
2268        Field(description="New table prefix to use when syncing to the destination."),
2269    ],
2270    *,
2271    workspace_id: Annotated[
2272        str | None,
2273        Field(
2274            description=WORKSPACE_ID_TIP_TEXT,
2275            default=None,
2276        ),
2277    ],
2278) -> str:
2279    """Set the table prefix for a connection on Airbyte Cloud.
2280
2281    This is a destructive operation that can break downstream dependencies if the
2282    table prefix is changed incorrectly. Use with caution.
2283    """
2284    check_guid_created_in_session(connection_id)
2285    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2286    connection = workspace.get_connection(connection_id=connection_id)
2287    connection.set_table_prefix(prefix=prefix)
2288    return (
2289        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2290        f"URL: {connection.connection_url}"
2291    )

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_selected_streams( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], stream_names: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description='The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2294@mcp_tool(
2295    domain="cloud",
2296    destructive=True,
2297    open_world=True,
2298    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2299)
2300def set_cloud_connection_selected_streams(
2301    connection_id: Annotated[
2302        str,
2303        Field(description="The ID of the connection to update."),
2304    ],
2305    stream_names: Annotated[
2306        str | list[str],
2307        Field(
2308            description=(
2309                "The selected stream names to sync within the connection. "
2310                "Must be an explicit stream name or list of streams."
2311            )
2312        ),
2313    ],
2314    *,
2315    workspace_id: Annotated[
2316        str | None,
2317        Field(
2318            description=WORKSPACE_ID_TIP_TEXT,
2319            default=None,
2320        ),
2321    ],
2322) -> str:
2323    """Set the selected streams for a connection on Airbyte Cloud.
2324
2325    This is a destructive operation that can break existing connections if the
2326    stream selection is changed incorrectly. Use with caution.
2327    """
2328    check_guid_created_in_session(connection_id)
2329    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2330    connection = workspace.get_connection(connection_id=connection_id)
2331
2332    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2333    connection.set_selected_streams(stream_names=resolved_streams_list)
2334
2335    return (
2336        f"Successfully set selected streams for connection '{connection_id}' "
2337        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2338    )

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, destructive=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], *, enabled: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status.")], cron_expression: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'.")], manual_schedule: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'.")], workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2341@mcp_tool(
2342    domain="cloud",
2343    open_world=True,
2344    destructive=True,
2345    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2346)
2347def update_cloud_connection(
2348    connection_id: Annotated[
2349        str,
2350        Field(description="The ID of the connection to update."),
2351    ],
2352    *,
2353    enabled: Annotated[
2354        bool | None,
2355        Field(
2356            description=(
2357                "Set the connection's enabled status. "
2358                "True enables the connection (status='active'), "
2359                "False disables it (status='inactive'). "
2360                "Leave unset to keep the current status."
2361            ),
2362            default=None,
2363        ),
2364    ],
2365    cron_expression: Annotated[
2366        str | None,
2367        Field(
2368            description=(
2369                "A cron expression defining when syncs should run. "
2370                "Examples: '0 0 * * *' (daily at midnight UTC), "
2371                "'0 */6 * * *' (every 6 hours), "
2372                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2373                "Leave unset to keep the current schedule. "
2374                "Cannot be used together with 'manual_schedule'."
2375            ),
2376            default=None,
2377        ),
2378    ],
2379    manual_schedule: Annotated[
2380        bool | None,
2381        Field(
2382            description=(
2383                "Set to True to disable automatic syncs (manual scheduling only). "
2384                "Syncs will only run when manually triggered. "
2385                "Cannot be used together with 'cron_expression'."
2386            ),
2387            default=None,
2388        ),
2389    ],
2390    workspace_id: Annotated[
2391        str | None,
2392        Field(
2393            description=WORKSPACE_ID_TIP_TEXT,
2394            default=None,
2395        ),
2396    ],
2397) -> str:
2398    """Update a connection's settings on Airbyte Cloud.
2399
2400    This tool allows updating multiple connection settings in a single call:
2401    - Enable or disable the connection
2402    - Set a cron schedule for automatic syncs
2403    - Switch to manual scheduling (no automatic syncs)
2404
2405    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2406    parameters are mutually exclusive.
2407    """
2408    check_guid_created_in_session(connection_id)
2409
2410    # Validate that at least one setting is provided
2411    if enabled is None and cron_expression is None and manual_schedule is None:
2412        raise ValueError(
2413            "At least one setting must be provided: 'enabled', 'cron_expression', "
2414            "or 'manual_schedule'."
2415        )
2416
2417    # Validate mutually exclusive schedule options
2418    if cron_expression is not None and manual_schedule is True:
2419        raise ValueError(
2420            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2421            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2422            "for manual-only syncs."
2423        )
2424
2425    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2426    connection = workspace.get_connection(connection_id=connection_id)
2427
2428    changes_made: list[str] = []
2429
2430    # Apply enabled status change
2431    if enabled is not None:
2432        connection.set_enabled(enabled=enabled)
2433        status_str = "enabled" if enabled else "disabled"
2434        changes_made.append(f"status set to '{status_str}'")
2435
2436    # Apply schedule change
2437    if cron_expression is not None:
2438        connection.set_schedule(cron_expression=cron_expression)
2439        changes_made.append(f"schedule set to '{cron_expression}'")
2440    elif manual_schedule is True:
2441        connection.set_manual_schedule()
2442        changes_made.append("schedule set to 'manual'")
2443
2444    changes_summary = ", ".join(changes_made)
2445    return (
2446        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2447        f"URL: {connection.connection_url}"
2448    )

Update a connection's settings on Airbyte Cloud.

This tool allows updating multiple connection settings in a single call:

  • Enable or disable the connection
  • Set a cron schedule for automatic syncs
  • Switch to manual scheduling (no automatic syncs)

At least one setting must be provided. The 'cron_expression' and 'manual_schedule' parameters are mutually exclusive.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_connection_artifact( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], artifact_type: Annotated[Literal['state', 'catalog'], FieldInfo(annotation=NoneType, required=True, description="The type of artifact to retrieve: 'state' or 'catalog'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
2451@mcp_tool(
2452    domain="cloud",
2453    read_only=True,
2454    idempotent=True,
2455    open_world=True,
2456    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2457)
2458def get_connection_artifact(
2459    connection_id: Annotated[
2460        str,
2461        Field(description="The ID of the Airbyte Cloud connection."),
2462    ],
2463    artifact_type: Annotated[
2464        Literal["state", "catalog"],
2465        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2466    ],
2467    *,
2468    workspace_id: Annotated[
2469        str | None,
2470        Field(
2471            description=WORKSPACE_ID_TIP_TEXT,
2472            default=None,
2473        ),
2474    ],
2475) -> dict[str, Any] | list[dict[str, Any]]:
2476    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2477
2478    Retrieves the specified artifact for a connection:
2479    - 'state': Returns the persisted state for incremental syncs as a list of
2480      stream state objects, or {"ERROR": "..."} if no state is set.
2481    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2482      or {"ERROR": "..."} if not found.
2483    """
2484    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2485    connection = workspace.get_connection(connection_id=connection_id)
2486
2487    if artifact_type == "state":
2488        result = connection.get_state_artifacts()
2489        if result is None:
2490            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2491        return result
2492
2493    # artifact_type == "catalog"
2494    result = connection.get_catalog_artifact()
2495    if result is None:
2496        return {"ERROR": "No catalog found for this connection"}
2497    return result

Get a connection artifact (state or catalog) from Airbyte Cloud.

Retrieves the specified artifact for a connection:

  • 'state': Returns the persisted state for incremental syncs as a list of stream state objects, or {"ERROR": "..."} if no state is set.
  • 'catalog': Returns the configured catalog (syncCatalog) as a dict, or {"ERROR": "..."} if not found.