airbyte.mcp.cloud_ops

Airbyte Cloud MCP operations.

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations."""
   3
   4from pathlib import Path
   5from typing import Annotated, Any
   6
   7from fastmcp import FastMCP
   8from pydantic import Field
   9
  10from airbyte import cloud, get_destination, get_source
  11from airbyte.cloud.auth import (
  12    resolve_cloud_api_url,
  13    resolve_cloud_client_id,
  14    resolve_cloud_client_secret,
  15    resolve_cloud_workspace_id,
  16)
  17from airbyte.cloud.connections import CloudConnection
  18from airbyte.cloud.connectors import CloudDestination, CloudSource, CustomCloudSourceDefinition
  19from airbyte.cloud.workspaces import CloudWorkspace
  20from airbyte.destinations.util import get_noop_destination
  21from airbyte.mcp._tool_utils import (
  22    check_guid_created_in_session,
  23    mcp_tool,
  24    register_guid_created_in_session,
  25    register_tools,
  26)
  27from airbyte.mcp._util import resolve_config, resolve_list_of_strings
  28
  29
  30def _get_cloud_workspace() -> CloudWorkspace:
  31    """Get an authenticated CloudWorkspace using environment variables."""
  32    return CloudWorkspace(
  33        workspace_id=resolve_cloud_workspace_id(),
  34        client_id=resolve_cloud_client_id(),
  35        client_secret=resolve_cloud_client_secret(),
  36        api_root=resolve_cloud_api_url(),
  37    )
  38
  39
  40@mcp_tool(
  41    domain="cloud",
  42    open_world=True,
  43)
  44def deploy_source_to_cloud(
  45    source_name: Annotated[
  46        str,
  47        Field(description="The name to use when deploying the source."),
  48    ],
  49    source_connector_name: Annotated[
  50        str,
  51        Field(description="The name of the source connector (e.g., 'source-faker')."),
  52    ],
  53    *,
  54    config: Annotated[
  55        dict | str | None,
  56        Field(
  57            description="The configuration for the source connector.",
  58            default=None,
  59        ),
  60    ],
  61    config_secret_name: Annotated[
  62        str | None,
  63        Field(
  64            description="The name of the secret containing the configuration.",
  65            default=None,
  66        ),
  67    ],
  68    unique: Annotated[
  69        bool,
  70        Field(
  71            description="Whether to require a unique name.",
  72            default=True,
  73        ),
  74    ],
  75) -> str:
  76    """Deploy a source connector to Airbyte Cloud.
  77
  78    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
  79    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
  80    Airbyte Cloud API.
  81    """
  82    try:
  83        source = get_source(
  84            source_connector_name,
  85            install_if_missing=False,
  86        )
  87        config_dict = resolve_config(
  88            config=config,
  89            config_secret_name=config_secret_name,
  90            config_spec_jsonschema=source.config_spec,
  91        )
  92        source.set_config(config_dict)
  93
  94        workspace: CloudWorkspace = _get_cloud_workspace()
  95        deployed_source = workspace.deploy_source(
  96            name=source_name,
  97            source=source,
  98            unique=unique,
  99        )
 100
 101    except Exception as ex:
 102        return f"Failed to deploy source '{source_name}': {ex}"
 103    else:
 104        register_guid_created_in_session(deployed_source.connector_id)
 105        return (
 106            f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 107            f" and URL: {deployed_source.connector_url}"
 108        )
 109
 110
 111@mcp_tool(
 112    domain="cloud",
 113    open_world=True,
 114)
 115def deploy_destination_to_cloud(
 116    destination_name: Annotated[
 117        str,
 118        Field(description="The name to use when deploying the destination."),
 119    ],
 120    destination_connector_name: Annotated[
 121        str,
 122        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 123    ],
 124    *,
 125    config: Annotated[
 126        dict | str | None,
 127        Field(
 128            description="The configuration for the destination connector.",
 129            default=None,
 130        ),
 131    ],
 132    config_secret_name: Annotated[
 133        str | None,
 134        Field(
 135            description="The name of the secret containing the configuration.",
 136            default=None,
 137        ),
 138    ],
 139    unique: Annotated[
 140        bool,
 141        Field(
 142            description="Whether to require a unique name.",
 143            default=True,
 144        ),
 145    ],
 146) -> str:
 147    """Deploy a destination connector to Airbyte Cloud.
 148
 149    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 150    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 151    Airbyte Cloud API.
 152    """
 153    try:
 154        destination = get_destination(
 155            destination_connector_name,
 156            install_if_missing=False,
 157        )
 158        config_dict = resolve_config(
 159            config=config,
 160            config_secret_name=config_secret_name,
 161            config_spec_jsonschema=destination.config_spec,
 162        )
 163        destination.set_config(config_dict)
 164
 165        workspace: CloudWorkspace = _get_cloud_workspace()
 166        deployed_destination = workspace.deploy_destination(
 167            name=destination_name,
 168            destination=destination,
 169            unique=unique,
 170        )
 171
 172    except Exception as ex:
 173        return f"Failed to deploy destination '{destination_name}': {ex}"
 174    else:
 175        register_guid_created_in_session(deployed_destination.connector_id)
 176        return (
 177            f"Successfully deployed destination '{destination_name}' "
 178            f"with ID: {deployed_destination.connector_id}"
 179        )
 180
 181
 182@mcp_tool(
 183    domain="cloud",
 184    open_world=True,
 185)
 186def create_connection_on_cloud(
 187    connection_name: Annotated[
 188        str,
 189        Field(description="The name of the connection."),
 190    ],
 191    source_id: Annotated[
 192        str,
 193        Field(description="The ID of the deployed source."),
 194    ],
 195    destination_id: Annotated[
 196        str,
 197        Field(description="The ID of the deployed destination."),
 198    ],
 199    selected_streams: Annotated[
 200        str | list[str],
 201        Field(
 202            description=(
 203                "The selected stream names to sync within the connection. "
 204                "Must be an explicit stream name or list of streams. "
 205                "Cannot be empty or '*'."
 206            )
 207        ),
 208    ],
 209    table_prefix: Annotated[
 210        str | None,
 211        Field(
 212            description="Optional table prefix to use when syncing to the destination.",
 213            default=None,
 214        ),
 215    ],
 216) -> str:
 217    """Create a connection between a deployed source and destination on Airbyte Cloud.
 218
 219    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 220    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 221    Airbyte Cloud API.
 222    """
 223    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 224    try:
 225        workspace: CloudWorkspace = _get_cloud_workspace()
 226        deployed_connection = workspace.deploy_connection(
 227            connection_name=connection_name,
 228            source=source_id,
 229            destination=destination_id,
 230            selected_streams=resolved_streams_list,
 231            table_prefix=table_prefix,
 232        )
 233
 234    except Exception as ex:
 235        return f"Failed to create connection '{connection_name}': {ex}"
 236    else:
 237        register_guid_created_in_session(deployed_connection.connection_id)
 238        return (
 239            f"Successfully created connection '{connection_name}' "
 240            f"with ID '{deployed_connection.connection_id}' and "
 241            f"URL: {deployed_connection.connection_url}"
 242        )
 243
 244
 245@mcp_tool(
 246    domain="cloud",
 247    open_world=True,
 248)
 249def run_cloud_sync(
 250    connection_id: Annotated[
 251        str,
 252        Field(description="The ID of the Airbyte Cloud connection."),
 253    ],
 254    *,
 255    wait: Annotated[
 256        bool,
 257        Field(
 258            description="Whether to wait for the sync to complete.",
 259            default=True,
 260        ),
 261    ],
 262    wait_timeout: Annotated[
 263        int,
 264        Field(
 265            description="Maximum time to wait for sync completion (seconds).",
 266            default=300,
 267        ),
 268    ],
 269) -> str:
 270    """Run a sync job on Airbyte Cloud.
 271
 272    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 273    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 274    Airbyte Cloud API.
 275    """
 276    try:
 277        workspace: CloudWorkspace = _get_cloud_workspace()
 278        connection = workspace.get_connection(connection_id=connection_id)
 279        sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 280
 281    except Exception as ex:
 282        return f"Failed to run sync for connection '{connection_id}': {ex}"
 283    else:
 284        if wait:
 285            status = sync_result.get_job_status()
 286            return (
 287                f"Sync completed with status: {status}. "  # Sync completed.
 288                f"Job ID is '{sync_result.job_id}' and "
 289                f"job URL is: {sync_result.job_url}"
 290            )
 291        return (
 292            f"Sync started. "  # Sync started.
 293            f"Job ID is '{sync_result.job_id}' and "
 294            f"job URL is: {sync_result.job_url}"
 295        )
 296
 297
 298@mcp_tool(
 299    domain="cloud",
 300    read_only=True,
 301    idempotent=True,
 302    open_world=True,
 303)
 304def check_airbyte_cloud_workspace() -> str:
 305    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 306
 307    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 308    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 309    Airbyte Cloud API.
 310
 311    Returns workspace ID and workspace URL for verification.
 312    """
 313    try:
 314        workspace: CloudWorkspace = _get_cloud_workspace()
 315        workspace.connect()
 316
 317    except Exception as ex:
 318        return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}"
 319    else:
 320        return (
 321            f"✅ Successfully connected to Airbyte Cloud workspace.\n"
 322            f"Workspace ID: {workspace.workspace_id}\n"
 323            f"Workspace URL: {workspace.workspace_url}"
 324        )
 325
 326
 327@mcp_tool(
 328    domain="cloud",
 329    open_world=True,
 330)
 331def deploy_noop_destination_to_cloud(
 332    name: str = "No-op Destination",
 333    *,
 334    unique: bool = True,
 335) -> str:
 336    """Deploy the No-op destination to Airbyte Cloud for testing purposes.
 337
 338    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 339    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 340    Airbyte Cloud API.
 341    """
 342    try:
 343        destination = get_noop_destination()
 344        workspace: CloudWorkspace = _get_cloud_workspace()
 345        deployed_destination = workspace.deploy_destination(
 346            name=name,
 347            destination=destination,
 348            unique=unique,
 349        )
 350    except Exception as ex:
 351        return f"Failed to deploy No-op Destination: {ex}"
 352    else:
 353        register_guid_created_in_session(deployed_destination.connector_id)
 354        return (
 355            f"Successfully deployed No-op Destination "
 356            f"with ID '{deployed_destination.connector_id}' and "
 357            f"URL: {deployed_destination.connector_url}"
 358        )
 359
 360
 361@mcp_tool(
 362    domain="cloud",
 363    read_only=True,
 364    idempotent=True,
 365    open_world=True,
 366)
 367def get_cloud_sync_status(
 368    connection_id: Annotated[
 369        str,
 370        Field(
 371            description="The ID of the Airbyte Cloud connection.",
 372        ),
 373    ],
 374    job_id: Annotated[
 375        int | None,
 376        Field(
 377            description="Optional job ID. If not provided, the latest job will be used.",
 378            default=None,
 379        ),
 380    ],
 381    *,
 382    include_attempts: Annotated[
 383        bool,
 384        Field(
 385            description="Whether to include detailed attempts information.",
 386            default=False,
 387        ),
 388    ],
 389) -> dict[str, Any]:
 390    """Get the status of a sync job from the Airbyte Cloud.
 391
 392    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 393    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 394    Airbyte Cloud API.
 395    """
 396    try:
 397        workspace: CloudWorkspace = _get_cloud_workspace()
 398        connection = workspace.get_connection(connection_id=connection_id)
 399
 400        # If a job ID is provided, get the job by ID.
 401        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 402
 403        if not sync_result:
 404            return {"status": None, "job_id": None, "attempts": []}
 405
 406        result = {
 407            "status": sync_result.get_job_status(),
 408            "job_id": sync_result.job_id,
 409            "bytes_synced": sync_result.bytes_synced,
 410            "records_synced": sync_result.records_synced,
 411            "start_time": sync_result.start_time.isoformat(),
 412            "job_url": sync_result.job_url,
 413            "attempts": [],
 414        }
 415
 416        if include_attempts:
 417            attempts = sync_result.get_attempts()
 418            result["attempts"] = [
 419                {
 420                    "attempt_number": attempt.attempt_number,
 421                    "attempt_id": attempt.attempt_id,
 422                    "status": attempt.status,
 423                    "bytes_synced": attempt.bytes_synced,
 424                    "records_synced": attempt.records_synced,
 425                    "created_at": attempt.created_at.isoformat(),
 426                }
 427                for attempt in attempts
 428            ]
 429
 430        return result  # noqa: TRY300
 431
 432    except Exception as ex:
 433        return {
 434            "status": None,
 435            "job_id": job_id,
 436            "error": f"Failed to get sync status for connection '{connection_id}': {ex}",
 437            "attempts": [],
 438        }
 439
 440
 441@mcp_tool(
 442    domain="cloud",
 443    read_only=True,
 444    idempotent=True,
 445    open_world=True,
 446)
 447def list_deployed_cloud_source_connectors() -> list[CloudSource]:
 448    """List all deployed source connectors in the Airbyte Cloud workspace.
 449
 450    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 451    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 452    Airbyte Cloud API.
 453    """
 454    workspace: CloudWorkspace = _get_cloud_workspace()
 455    return workspace.list_sources()
 456
 457
 458@mcp_tool(
 459    domain="cloud",
 460    read_only=True,
 461    idempotent=True,
 462    open_world=True,
 463)
 464def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
 465    """List all deployed destination connectors in the Airbyte Cloud workspace.
 466
 467    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 468    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 469    Airbyte Cloud API.
 470    """
 471    workspace: CloudWorkspace = _get_cloud_workspace()
 472    return workspace.list_destinations()
 473
 474
 475@mcp_tool(
 476    domain="cloud",
 477    read_only=True,
 478    idempotent=True,
 479    open_world=True,
 480)
 481def get_cloud_sync_logs(
 482    connection_id: Annotated[
 483        str,
 484        Field(description="The ID of the Airbyte Cloud connection."),
 485    ],
 486    job_id: Annotated[
 487        int | None,
 488        Field(description="Optional job ID. If not provided, the latest job will be used."),
 489    ] = None,
 490    attempt_number: Annotated[
 491        int | None,
 492        Field(
 493            description="Optional attempt number. If not provided, the latest attempt will be used."
 494        ),
 495    ] = None,
 496) -> str:
 497    """Get the logs from a sync job attempt on Airbyte Cloud.
 498
 499    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 500    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 501    Airbyte Cloud API.
 502    """
 503    try:
 504        workspace: CloudWorkspace = _get_cloud_workspace()
 505        connection = workspace.get_connection(connection_id=connection_id)
 506
 507        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 508
 509        if not sync_result:
 510            return f"No sync job found for connection '{connection_id}'"
 511
 512        attempts = sync_result.get_attempts()
 513
 514        if not attempts:
 515            return f"No attempts found for job '{sync_result.job_id}'"
 516
 517        if attempt_number is not None:
 518            target_attempt = None
 519            for attempt in attempts:
 520                if attempt.attempt_number == attempt_number:
 521                    target_attempt = attempt
 522                    break
 523
 524            if target_attempt is None:
 525                return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
 526        else:
 527            target_attempt = max(attempts, key=lambda a: a.attempt_number)
 528
 529        logs = target_attempt.get_full_log_text()
 530
 531        if not logs:
 532            return (
 533                f"No logs available for job '{sync_result.job_id}', "
 534                f"attempt {target_attempt.attempt_number}"
 535            )
 536
 537        return logs  # noqa: TRY300
 538
 539    except Exception as ex:
 540        return f"Failed to get logs for connection '{connection_id}': {ex}"
 541
 542
 543@mcp_tool(
 544    domain="cloud",
 545    read_only=True,
 546    idempotent=True,
 547    open_world=True,
 548)
 549def list_deployed_cloud_connections() -> list[CloudConnection]:
 550    """List all deployed connections in the Airbyte Cloud workspace.
 551
 552    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 553    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 554    Airbyte Cloud API.
 555    """
 556    workspace: CloudWorkspace = _get_cloud_workspace()
 557    return workspace.list_connections()
 558
 559
 560def _get_custom_source_definition_description(
 561    custom_source: CustomCloudSourceDefinition,
 562) -> str:
 563    return "\n".join(
 564        [
 565            f" - Custom Source Name: {custom_source.name}",
 566            f" - Definition ID: {custom_source.definition_id}",
 567            f" - Definition Version: {custom_source.version}",
 568            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
 569            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
 570        ]
 571    )
 572
 573
 574@mcp_tool(
 575    domain="cloud",
 576    open_world=True,
 577)
 578def publish_custom_source_definition(
 579    name: Annotated[
 580        str,
 581        Field(description="The name for the custom connector definition."),
 582    ],
 583    *,
 584    manifest_yaml: Annotated[
 585        str | Path | None,
 586        Field(
 587            description=(
 588                "The Low-code CDK manifest as a YAML string or file path. "
 589                "Required for YAML connectors."
 590            ),
 591            default=None,
 592        ),
 593    ] = None,
 594    unique: Annotated[
 595        bool,
 596        Field(
 597            description="Whether to require a unique name.",
 598            default=True,
 599        ),
 600    ] = True,
 601    pre_validate: Annotated[
 602        bool,
 603        Field(
 604            description="Whether to validate the manifest client-side before publishing.",
 605            default=True,
 606        ),
 607    ] = True,
 608) -> str:
 609    """Publish a custom YAML source connector definition to Airbyte Cloud.
 610
 611    Note: Only YAML (declarative) connectors are currently supported.
 612    Docker-based custom sources are not yet available.
 613    """
 614    try:
 615        processed_manifest = manifest_yaml
 616        if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
 617            processed_manifest = Path(manifest_yaml)
 618
 619        workspace: CloudWorkspace = _get_cloud_workspace()
 620        custom_source = workspace.publish_custom_source_definition(
 621            name=name,
 622            manifest_yaml=processed_manifest,
 623            unique=unique,
 624            pre_validate=pre_validate,
 625        )
 626    except Exception as ex:
 627        return f"Failed to publish custom source definition '{name}': {ex}"
 628    else:
 629        register_guid_created_in_session(custom_source.definition_id)
 630        return (
 631            "Successfully published custom YAML source definition:\n"
 632            + _get_custom_source_definition_description(
 633                custom_source=custom_source,
 634            )
 635            + "\n"
 636        )
 637
 638
 639@mcp_tool(
 640    domain="cloud",
 641    read_only=True,
 642    idempotent=True,
 643    open_world=True,
 644)
 645def list_custom_source_definitions() -> list[dict[str, Any]]:
 646    """List custom YAML source definitions in the Airbyte Cloud workspace.
 647
 648    Note: Only YAML (declarative) connectors are currently supported.
 649    Docker-based custom sources are not yet available.
 650    """
 651    workspace: CloudWorkspace = _get_cloud_workspace()
 652    definitions = workspace.list_custom_source_definitions(
 653        definition_type="yaml",
 654    )
 655
 656    return [
 657        {
 658            "definition_id": d.definition_id,
 659            "name": d.name,
 660            "version": d.version,
 661            "connector_builder_project_url": d.connector_builder_project_url,
 662        }
 663        for d in definitions
 664    ]
 665
 666
 667@mcp_tool(
 668    domain="cloud",
 669    destructive=True,
 670    open_world=True,
 671)
 672def update_custom_source_definition(
 673    definition_id: Annotated[
 674        str,
 675        Field(description="The ID of the definition to update."),
 676    ],
 677    manifest_yaml: Annotated[
 678        str | Path,
 679        Field(
 680            description="New manifest as YAML string or file path.",
 681        ),
 682    ],
 683    *,
 684    pre_validate: Annotated[
 685        bool,
 686        Field(
 687            description="Whether to validate the manifest client-side before updating.",
 688            default=True,
 689        ),
 690    ] = True,
 691) -> str:
 692    """Update a custom YAML source definition in Airbyte Cloud.
 693
 694    Note: Only YAML (declarative) connectors are currently supported.
 695    Docker-based custom sources are not yet available.
 696    """
 697    check_guid_created_in_session(definition_id)
 698    try:
 699        processed_manifest = manifest_yaml
 700        if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
 701            processed_manifest = Path(manifest_yaml)
 702
 703        workspace: CloudWorkspace = _get_cloud_workspace()
 704        definition = workspace.get_custom_source_definition(
 705            definition_id=definition_id,
 706            definition_type="yaml",
 707        )
 708        custom_source: CustomCloudSourceDefinition = definition.update_definition(
 709            manifest_yaml=processed_manifest,
 710            pre_validate=pre_validate,
 711        )
 712    except Exception as ex:
 713        return f"Failed to update custom source definition '{definition_id}': {ex}"
 714    else:
 715        return (
 716            "Successfully updated custom YAML source definition:\n"
 717            + _get_custom_source_definition_description(
 718                custom_source=custom_source,
 719            )
 720        )
 721
 722
 723@mcp_tool(
 724    domain="cloud",
 725    destructive=True,
 726    open_world=True,
 727)
 728def permanently_delete_custom_source_definition(
 729    definition_id: Annotated[
 730        str,
 731        Field(description="The ID of the custom source definition to delete."),
 732    ],
 733) -> str:
 734    """Permanently delete a custom YAML source definition from Airbyte Cloud.
 735
 736    IMPORTANT: This operation requires the connector name to either:
 737    1. Start with "delete:" (case insensitive), OR
 738    2. Contain "delete-me" (case insensitive)
 739
 740    If the connector does not meet these requirements, the deletion will be rejected with a
 741    helpful error message. Instruct the user to rename the connector appropriately to authorize
 742    the deletion.
 743
 744    Note: Only YAML (declarative) connectors are currently supported.
 745    Docker-based custom sources are not yet available.
 746    """
 747    check_guid_created_in_session(definition_id)
 748    workspace: CloudWorkspace = _get_cloud_workspace()
 749    definition = workspace.get_custom_source_definition(
 750        definition_id=definition_id,
 751        definition_type="yaml",
 752    )
 753    definition_name: str = definition.name  # Capture name before deletion
 754    definition.permanently_delete(
 755        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
 756    )
 757    return (
 758        f"Successfully deleted custom source definition '{definition_name}' (ID: {definition_id})"
 759    )
 760
 761
 762@mcp_tool(
 763    domain="cloud",
 764    open_world=True,
 765)
 766def rename_cloud_source(
 767    source_id: Annotated[
 768        str,
 769        Field(description="The ID of the deployed source to rename."),
 770    ],
 771    name: Annotated[
 772        str,
 773        Field(description="New name for the source."),
 774    ],
 775) -> str:
 776    """Rename a deployed source connector on Airbyte Cloud.
 777
 778    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 779    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 780    Airbyte Cloud API.
 781    """
 782    workspace: CloudWorkspace = _get_cloud_workspace()
 783    source = workspace.get_source(source_id=source_id)
 784    source.rename(name=name)
 785    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
 786
 787
 788@mcp_tool(
 789    domain="cloud",
 790    destructive=True,
 791    open_world=True,
 792)
 793def update_cloud_source_config(
 794    source_id: Annotated[
 795        str,
 796        Field(description="The ID of the deployed source to update."),
 797    ],
 798    config: Annotated[
 799        dict | str,
 800        Field(
 801            description="New configuration for the source connector.",
 802        ),
 803    ],
 804    config_secret_name: Annotated[
 805        str | None,
 806        Field(
 807            description="The name of the secret containing the configuration.",
 808            default=None,
 809        ),
 810    ] = None,
 811) -> str:
 812    """Update a deployed source connector's configuration on Airbyte Cloud.
 813
 814    This is a destructive operation that can break existing connections if the
 815    configuration is changed incorrectly. Use with caution.
 816
 817    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 818    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 819    Airbyte Cloud API.
 820    """
 821    check_guid_created_in_session(source_id)
 822    workspace: CloudWorkspace = _get_cloud_workspace()
 823    source = workspace.get_source(source_id=source_id)
 824
 825    config_dict = resolve_config(
 826        config=config,
 827        config_secret_name=config_secret_name,
 828        config_spec_jsonschema=None,  # We don't have the spec here
 829    )
 830
 831    source.update_config(config=config_dict)
 832    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
 833
 834
 835@mcp_tool(
 836    domain="cloud",
 837    open_world=True,
 838)
 839def rename_cloud_destination(
 840    destination_id: Annotated[
 841        str,
 842        Field(description="The ID of the deployed destination to rename."),
 843    ],
 844    name: Annotated[
 845        str,
 846        Field(description="New name for the destination."),
 847    ],
 848) -> str:
 849    """Rename a deployed destination connector on Airbyte Cloud.
 850
 851    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 852    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 853    Airbyte Cloud API.
 854    """
 855    workspace: CloudWorkspace = _get_cloud_workspace()
 856    destination = workspace.get_destination(destination_id=destination_id)
 857    destination.rename(name=name)
 858    return (
 859        f"Successfully renamed destination '{destination_id}' to '{name}'. "
 860        f"URL: {destination.connector_url}"
 861    )
 862
 863
 864@mcp_tool(
 865    domain="cloud",
 866    destructive=True,
 867    open_world=True,
 868)
 869def update_cloud_destination_config(
 870    destination_id: Annotated[
 871        str,
 872        Field(description="The ID of the deployed destination to update."),
 873    ],
 874    config: Annotated[
 875        dict | str,
 876        Field(
 877            description="New configuration for the destination connector.",
 878        ),
 879    ],
 880    config_secret_name: Annotated[
 881        str | None,
 882        Field(
 883            description="The name of the secret containing the configuration.",
 884            default=None,
 885        ),
 886    ] = None,
 887) -> str:
 888    """Update a deployed destination connector's configuration on Airbyte Cloud.
 889
 890    This is a destructive operation that can break existing connections if the
 891    configuration is changed incorrectly. Use with caution.
 892
 893    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 894    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 895    Airbyte Cloud API.
 896    """
 897    check_guid_created_in_session(destination_id)
 898    workspace: CloudWorkspace = _get_cloud_workspace()
 899    destination = workspace.get_destination(destination_id=destination_id)
 900
 901    config_dict = resolve_config(
 902        config=config,
 903        config_secret_name=config_secret_name,
 904        config_spec_jsonschema=None,  # We don't have the spec here
 905    )
 906
 907    destination.update_config(config=config_dict)
 908    return (
 909        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
 910    )
 911
 912
 913@mcp_tool(
 914    domain="cloud",
 915    open_world=True,
 916)
 917def rename_cloud_connection(
 918    connection_id: Annotated[
 919        str,
 920        Field(description="The ID of the connection to rename."),
 921    ],
 922    name: Annotated[
 923        str,
 924        Field(description="New name for the connection."),
 925    ],
 926) -> str:
 927    """Rename a connection on Airbyte Cloud.
 928
 929    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 930    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 931    Airbyte Cloud API.
 932    """
 933    workspace: CloudWorkspace = _get_cloud_workspace()
 934    connection = workspace.get_connection(connection_id=connection_id)
 935    connection.rename(name=name)
 936    return (
 937        f"Successfully renamed connection '{connection_id}' to '{name}'. "
 938        f"URL: {connection.connection_url}"
 939    )
 940
 941
 942@mcp_tool(
 943    domain="cloud",
 944    destructive=True,
 945    open_world=True,
 946)
 947def set_cloud_connection_table_prefix(
 948    connection_id: Annotated[
 949        str,
 950        Field(description="The ID of the connection to update."),
 951    ],
 952    prefix: Annotated[
 953        str,
 954        Field(description="New table prefix to use when syncing to the destination."),
 955    ],
 956) -> str:
 957    """Set the table prefix for a connection on Airbyte Cloud.
 958
 959    This is a destructive operation that can break downstream dependencies if the
 960    table prefix is changed incorrectly. Use with caution.
 961
 962    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 963    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 964    Airbyte Cloud API.
 965    """
 966    check_guid_created_in_session(connection_id)
 967    workspace: CloudWorkspace = _get_cloud_workspace()
 968    connection = workspace.get_connection(connection_id=connection_id)
 969    connection.set_table_prefix(prefix=prefix)
 970    return (
 971        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
 972        f"URL: {connection.connection_url}"
 973    )
 974
 975
 976@mcp_tool(
 977    domain="cloud",
 978    destructive=True,
 979    open_world=True,
 980)
 981def set_cloud_connection_selected_streams(
 982    connection_id: Annotated[
 983        str,
 984        Field(description="The ID of the connection to update."),
 985    ],
 986    stream_names: Annotated[
 987        str | list[str],
 988        Field(
 989            description=(
 990                "The selected stream names to sync within the connection. "
 991                "Must be an explicit stream name or list of streams."
 992            )
 993        ),
 994    ],
 995) -> str:
 996    """Set the selected streams for a connection on Airbyte Cloud.
 997
 998    This is a destructive operation that can break existing connections if the
 999    stream selection is changed incorrectly. Use with caution.
1000
1001    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
1002    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
1003    Airbyte Cloud API.
1004    """
1005    check_guid_created_in_session(connection_id)
1006    workspace: CloudWorkspace = _get_cloud_workspace()
1007    connection = workspace.get_connection(connection_id=connection_id)
1008
1009    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
1010    connection.set_selected_streams(stream_names=resolved_streams_list)
1011
1012    return (
1013        f"Successfully set selected streams for connection '{connection_id}' "
1014        f"to {resolved_streams_list}. URL: {connection.connection_url}"
1015    )
1016
1017
1018def register_cloud_ops_tools(app: FastMCP) -> None:
1019    """@private Register tools with the FastMCP app.
1020
1021    This is an internal function and should not be called directly.
1022
1023    Tools are filtered based on mode settings:
1024    - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered
1025    - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive
1026      operations are protected by runtime session checks
1027    """
1028    register_tools(app, domain="cloud")
@mcp_tool(domain='cloud', open_world=True)
def deploy_source_to_cloud( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
 41@mcp_tool(
 42    domain="cloud",
 43    open_world=True,
 44)
 45def deploy_source_to_cloud(
 46    source_name: Annotated[
 47        str,
 48        Field(description="The name to use when deploying the source."),
 49    ],
 50    source_connector_name: Annotated[
 51        str,
 52        Field(description="The name of the source connector (e.g., 'source-faker')."),
 53    ],
 54    *,
 55    config: Annotated[
 56        dict | str | None,
 57        Field(
 58            description="The configuration for the source connector.",
 59            default=None,
 60        ),
 61    ],
 62    config_secret_name: Annotated[
 63        str | None,
 64        Field(
 65            description="The name of the secret containing the configuration.",
 66            default=None,
 67        ),
 68    ],
 69    unique: Annotated[
 70        bool,
 71        Field(
 72            description="Whether to require a unique name.",
 73            default=True,
 74        ),
 75    ],
 76) -> str:
 77    """Deploy a source connector to Airbyte Cloud.
 78
 79    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
 80    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
 81    Airbyte Cloud API.
 82    """
 83    try:
 84        source = get_source(
 85            source_connector_name,
 86            install_if_missing=False,
 87        )
 88        config_dict = resolve_config(
 89            config=config,
 90            config_secret_name=config_secret_name,
 91            config_spec_jsonschema=source.config_spec,
 92        )
 93        source.set_config(config_dict)
 94
 95        workspace: CloudWorkspace = _get_cloud_workspace()
 96        deployed_source = workspace.deploy_source(
 97            name=source_name,
 98            source=source,
 99            unique=unique,
100        )
101
102    except Exception as ex:
103        return f"Failed to deploy source '{source_name}': {ex}"
104    else:
105        register_guid_created_in_session(deployed_source.connector_id)
106        return (
107            f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
108            f" and URL: {deployed_source.connector_url}"
109        )

Deploy a source connector to Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', open_world=True)
def deploy_destination_to_cloud( destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
112@mcp_tool(
113    domain="cloud",
114    open_world=True,
115)
116def deploy_destination_to_cloud(
117    destination_name: Annotated[
118        str,
119        Field(description="The name to use when deploying the destination."),
120    ],
121    destination_connector_name: Annotated[
122        str,
123        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
124    ],
125    *,
126    config: Annotated[
127        dict | str | None,
128        Field(
129            description="The configuration for the destination connector.",
130            default=None,
131        ),
132    ],
133    config_secret_name: Annotated[
134        str | None,
135        Field(
136            description="The name of the secret containing the configuration.",
137            default=None,
138        ),
139    ],
140    unique: Annotated[
141        bool,
142        Field(
143            description="Whether to require a unique name.",
144            default=True,
145        ),
146    ],
147) -> str:
148    """Deploy a destination connector to Airbyte Cloud.
149
150    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
151    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
152    Airbyte Cloud API.
153    """
154    try:
155        destination = get_destination(
156            destination_connector_name,
157            install_if_missing=False,
158        )
159        config_dict = resolve_config(
160            config=config,
161            config_secret_name=config_secret_name,
162            config_spec_jsonschema=destination.config_spec,
163        )
164        destination.set_config(config_dict)
165
166        workspace: CloudWorkspace = _get_cloud_workspace()
167        deployed_destination = workspace.deploy_destination(
168            name=destination_name,
169            destination=destination,
170            unique=unique,
171        )
172
173    except Exception as ex:
174        return f"Failed to deploy destination '{destination_name}': {ex}"
175    else:
176        register_guid_created_in_session(deployed_destination.connector_id)
177        return (
178            f"Successfully deployed destination '{destination_name}' "
179            f"with ID: {deployed_destination.connector_id}"
180        )

Deploy a destination connector to Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', open_world=True)
def create_connection_on_cloud( connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
183@mcp_tool(
184    domain="cloud",
185    open_world=True,
186)
187def create_connection_on_cloud(
188    connection_name: Annotated[
189        str,
190        Field(description="The name of the connection."),
191    ],
192    source_id: Annotated[
193        str,
194        Field(description="The ID of the deployed source."),
195    ],
196    destination_id: Annotated[
197        str,
198        Field(description="The ID of the deployed destination."),
199    ],
200    selected_streams: Annotated[
201        str | list[str],
202        Field(
203            description=(
204                "The selected stream names to sync within the connection. "
205                "Must be an explicit stream name or list of streams. "
206                "Cannot be empty or '*'."
207            )
208        ),
209    ],
210    table_prefix: Annotated[
211        str | None,
212        Field(
213            description="Optional table prefix to use when syncing to the destination.",
214            default=None,
215        ),
216    ],
217) -> str:
218    """Create a connection between a deployed source and destination on Airbyte Cloud.
219
220    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
221    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
222    Airbyte Cloud API.
223    """
224    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
225    try:
226        workspace: CloudWorkspace = _get_cloud_workspace()
227        deployed_connection = workspace.deploy_connection(
228            connection_name=connection_name,
229            source=source_id,
230            destination=destination_id,
231            selected_streams=resolved_streams_list,
232            table_prefix=table_prefix,
233        )
234
235    except Exception as ex:
236        return f"Failed to create connection '{connection_name}': {ex}"
237    else:
238        register_guid_created_in_session(deployed_connection.connection_id)
239        return (
240            f"Successfully created connection '{connection_name}' "
241            f"with ID '{deployed_connection.connection_id}' and "
242            f"URL: {deployed_connection.connection_url}"
243        )

Create a connection between a deployed source and destination on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', open_world=True)
def run_cloud_sync( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to wait for the sync to complete.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
246@mcp_tool(
247    domain="cloud",
248    open_world=True,
249)
250def run_cloud_sync(
251    connection_id: Annotated[
252        str,
253        Field(description="The ID of the Airbyte Cloud connection."),
254    ],
255    *,
256    wait: Annotated[
257        bool,
258        Field(
259            description="Whether to wait for the sync to complete.",
260            default=True,
261        ),
262    ],
263    wait_timeout: Annotated[
264        int,
265        Field(
266            description="Maximum time to wait for sync completion (seconds).",
267            default=300,
268        ),
269    ],
270) -> str:
271    """Run a sync job on Airbyte Cloud.
272
273    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
274    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
275    Airbyte Cloud API.
276    """
277    try:
278        workspace: CloudWorkspace = _get_cloud_workspace()
279        connection = workspace.get_connection(connection_id=connection_id)
280        sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
281
282    except Exception as ex:
283        return f"Failed to run sync for connection '{connection_id}': {ex}"
284    else:
285        if wait:
286            status = sync_result.get_job_status()
287            return (
288                f"Sync completed with status: {status}. "  # Sync completed.
289                f"Job ID is '{sync_result.job_id}' and "
290                f"job URL is: {sync_result.job_url}"
291            )
292        return (
293            f"Sync started. "  # Sync started.
294            f"Job ID is '{sync_result.job_id}' and "
295            f"job URL is: {sync_result.job_url}"
296        )

Run a sync job on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def check_airbyte_cloud_workspace() -> str:
299@mcp_tool(
300    domain="cloud",
301    read_only=True,
302    idempotent=True,
303    open_world=True,
304)
305def check_airbyte_cloud_workspace() -> str:
306    """Check if we have a valid Airbyte Cloud connection and return workspace info.
307
308    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
309    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
310    Airbyte Cloud API.
311
312    Returns workspace ID and workspace URL for verification.
313    """
314    try:
315        workspace: CloudWorkspace = _get_cloud_workspace()
316        workspace.connect()
317
318    except Exception as ex:
319        return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}"
320    else:
321        return (
322            f"✅ Successfully connected to Airbyte Cloud workspace.\n"
323            f"Workspace ID: {workspace.workspace_id}\n"
324            f"Workspace URL: {workspace.workspace_url}"
325        )

Check if we have a valid Airbyte Cloud connection and return workspace info.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

Returns workspace ID and workspace URL for verification.

@mcp_tool(domain='cloud', open_world=True)
def deploy_noop_destination_to_cloud(name: str = 'No-op Destination', *, unique: bool = True) -> str:
328@mcp_tool(
329    domain="cloud",
330    open_world=True,
331)
332def deploy_noop_destination_to_cloud(
333    name: str = "No-op Destination",
334    *,
335    unique: bool = True,
336) -> str:
337    """Deploy the No-op destination to Airbyte Cloud for testing purposes.
338
339    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
340    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
341    Airbyte Cloud API.
342    """
343    try:
344        destination = get_noop_destination()
345        workspace: CloudWorkspace = _get_cloud_workspace()
346        deployed_destination = workspace.deploy_destination(
347            name=name,
348            destination=destination,
349            unique=unique,
350        )
351    except Exception as ex:
352        return f"Failed to deploy No-op Destination: {ex}"
353    else:
354        register_guid_created_in_session(deployed_destination.connector_id)
355        return (
356            f"Successfully deployed No-op Destination "
357            f"with ID '{deployed_destination.connector_id}' and "
358            f"URL: {deployed_destination.connector_url}"
359        )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def get_cloud_sync_status( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
362@mcp_tool(
363    domain="cloud",
364    read_only=True,
365    idempotent=True,
366    open_world=True,
367)
368def get_cloud_sync_status(
369    connection_id: Annotated[
370        str,
371        Field(
372            description="The ID of the Airbyte Cloud connection.",
373        ),
374    ],
375    job_id: Annotated[
376        int | None,
377        Field(
378            description="Optional job ID. If not provided, the latest job will be used.",
379            default=None,
380        ),
381    ],
382    *,
383    include_attempts: Annotated[
384        bool,
385        Field(
386            description="Whether to include detailed attempts information.",
387            default=False,
388        ),
389    ],
390) -> dict[str, Any]:
391    """Get the status of a sync job from the Airbyte Cloud.
392
393    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
394    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
395    Airbyte Cloud API.
396    """
397    try:
398        workspace: CloudWorkspace = _get_cloud_workspace()
399        connection = workspace.get_connection(connection_id=connection_id)
400
401        # If a job ID is provided, get the job by ID.
402        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
403
404        if not sync_result:
405            return {"status": None, "job_id": None, "attempts": []}
406
407        result = {
408            "status": sync_result.get_job_status(),
409            "job_id": sync_result.job_id,
410            "bytes_synced": sync_result.bytes_synced,
411            "records_synced": sync_result.records_synced,
412            "start_time": sync_result.start_time.isoformat(),
413            "job_url": sync_result.job_url,
414            "attempts": [],
415        }
416
417        if include_attempts:
418            attempts = sync_result.get_attempts()
419            result["attempts"] = [
420                {
421                    "attempt_number": attempt.attempt_number,
422                    "attempt_id": attempt.attempt_id,
423                    "status": attempt.status,
424                    "bytes_synced": attempt.bytes_synced,
425                    "records_synced": attempt.records_synced,
426                    "created_at": attempt.created_at.isoformat(),
427                }
428                for attempt in attempts
429            ]
430
431        return result  # noqa: TRY300
432
433    except Exception as ex:
434        return {
435            "status": None,
436            "job_id": job_id,
437            "error": f"Failed to get sync status for connection '{connection_id}': {ex}",
438            "attempts": [],
439        }

Get the status of a sync job from the Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_deployed_cloud_source_connectors() -> list[airbyte.cloud.connectors.CloudSource]:
442@mcp_tool(
443    domain="cloud",
444    read_only=True,
445    idempotent=True,
446    open_world=True,
447)
448def list_deployed_cloud_source_connectors() -> list[CloudSource]:
449    """List all deployed source connectors in the Airbyte Cloud workspace.
450
451    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
452    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
453    Airbyte Cloud API.
454    """
455    workspace: CloudWorkspace = _get_cloud_workspace()
456    return workspace.list_sources()

List all deployed source connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_deployed_cloud_destination_connectors() -> list[airbyte.cloud.connectors.CloudDestination]:
459@mcp_tool(
460    domain="cloud",
461    read_only=True,
462    idempotent=True,
463    open_world=True,
464)
465def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
466    """List all deployed destination connectors in the Airbyte Cloud workspace.
467
468    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
469    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
470    Airbyte Cloud API.
471    """
472    workspace: CloudWorkspace = _get_cloud_workspace()
473    return workspace.list_destinations()

List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def get_cloud_sync_logs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None) -> str:
476@mcp_tool(
477    domain="cloud",
478    read_only=True,
479    idempotent=True,
480    open_world=True,
481)
482def get_cloud_sync_logs(
483    connection_id: Annotated[
484        str,
485        Field(description="The ID of the Airbyte Cloud connection."),
486    ],
487    job_id: Annotated[
488        int | None,
489        Field(description="Optional job ID. If not provided, the latest job will be used."),
490    ] = None,
491    attempt_number: Annotated[
492        int | None,
493        Field(
494            description="Optional attempt number. If not provided, the latest attempt will be used."
495        ),
496    ] = None,
497) -> str:
498    """Get the logs from a sync job attempt on Airbyte Cloud.
499
500    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
501    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
502    Airbyte Cloud API.
503    """
504    try:
505        workspace: CloudWorkspace = _get_cloud_workspace()
506        connection = workspace.get_connection(connection_id=connection_id)
507
508        sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
509
510        if not sync_result:
511            return f"No sync job found for connection '{connection_id}'"
512
513        attempts = sync_result.get_attempts()
514
515        if not attempts:
516            return f"No attempts found for job '{sync_result.job_id}'"
517
518        if attempt_number is not None:
519            target_attempt = None
520            for attempt in attempts:
521                if attempt.attempt_number == attempt_number:
522                    target_attempt = attempt
523                    break
524
525            if target_attempt is None:
526                return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'"
527        else:
528            target_attempt = max(attempts, key=lambda a: a.attempt_number)
529
530        logs = target_attempt.get_full_log_text()
531
532        if not logs:
533            return (
534                f"No logs available for job '{sync_result.job_id}', "
535                f"attempt {target_attempt.attempt_number}"
536            )
537
538        return logs  # noqa: TRY300
539
540    except Exception as ex:
541        return f"Failed to get logs for connection '{connection_id}': {ex}"

Get the logs from a sync job attempt on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_deployed_cloud_connections() -> list[airbyte.cloud.CloudConnection]:
544@mcp_tool(
545    domain="cloud",
546    read_only=True,
547    idempotent=True,
548    open_world=True,
549)
550def list_deployed_cloud_connections() -> list[CloudConnection]:
551    """List all deployed connections in the Airbyte Cloud workspace.
552
553    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
554    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
555    Airbyte Cloud API.
556    """
557    workspace: CloudWorkspace = _get_cloud_workspace()
558    return workspace.list_connections()

List all deployed connections in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', open_world=True)
def publish_custom_source_definition( name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name for the custom connector definition.')], *, manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors.')] = None, unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')] = True, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before publishing.')] = True) -> str:
575@mcp_tool(
576    domain="cloud",
577    open_world=True,
578)
579def publish_custom_source_definition(
580    name: Annotated[
581        str,
582        Field(description="The name for the custom connector definition."),
583    ],
584    *,
585    manifest_yaml: Annotated[
586        str | Path | None,
587        Field(
588            description=(
589                "The Low-code CDK manifest as a YAML string or file path. "
590                "Required for YAML connectors."
591            ),
592            default=None,
593        ),
594    ] = None,
595    unique: Annotated[
596        bool,
597        Field(
598            description="Whether to require a unique name.",
599            default=True,
600        ),
601    ] = True,
602    pre_validate: Annotated[
603        bool,
604        Field(
605            description="Whether to validate the manifest client-side before publishing.",
606            default=True,
607        ),
608    ] = True,
609) -> str:
610    """Publish a custom YAML source connector definition to Airbyte Cloud.
611
612    Note: Only YAML (declarative) connectors are currently supported.
613    Docker-based custom sources are not yet available.
614    """
615    try:
616        processed_manifest = manifest_yaml
617        if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
618            processed_manifest = Path(manifest_yaml)
619
620        workspace: CloudWorkspace = _get_cloud_workspace()
621        custom_source = workspace.publish_custom_source_definition(
622            name=name,
623            manifest_yaml=processed_manifest,
624            unique=unique,
625            pre_validate=pre_validate,
626        )
627    except Exception as ex:
628        return f"Failed to publish custom source definition '{name}': {ex}"
629    else:
630        register_guid_created_in_session(custom_source.definition_id)
631        return (
632            "Successfully published custom YAML source definition:\n"
633            + _get_custom_source_definition_description(
634                custom_source=custom_source,
635            )
636            + "\n"
637        )

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_custom_source_definitions() -> list[dict[str, typing.Any]]:
640@mcp_tool(
641    domain="cloud",
642    read_only=True,
643    idempotent=True,
644    open_world=True,
645)
646def list_custom_source_definitions() -> list[dict[str, Any]]:
647    """List custom YAML source definitions in the Airbyte Cloud workspace.
648
649    Note: Only YAML (declarative) connectors are currently supported.
650    Docker-based custom sources are not yet available.
651    """
652    workspace: CloudWorkspace = _get_cloud_workspace()
653    definitions = workspace.list_custom_source_definitions(
654        definition_type="yaml",
655    )
656
657    return [
658        {
659            "definition_id": d.definition_id,
660            "name": d.name,
661            "version": d.version,
662            "connector_builder_project_url": d.connector_builder_project_url,
663        }
664        for d in definitions
665    ]

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def update_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the definition to update.')], manifest_yaml: typing.Annotated[str | pathlib.Path, FieldInfo(annotation=NoneType, required=True, description='New manifest as YAML string or file path.')], *, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before updating.')] = True) -> str:
668@mcp_tool(
669    domain="cloud",
670    destructive=True,
671    open_world=True,
672)
673def update_custom_source_definition(
674    definition_id: Annotated[
675        str,
676        Field(description="The ID of the definition to update."),
677    ],
678    manifest_yaml: Annotated[
679        str | Path,
680        Field(
681            description="New manifest as YAML string or file path.",
682        ),
683    ],
684    *,
685    pre_validate: Annotated[
686        bool,
687        Field(
688            description="Whether to validate the manifest client-side before updating.",
689            default=True,
690        ),
691    ] = True,
692) -> str:
693    """Update a custom YAML source definition in Airbyte Cloud.
694
695    Note: Only YAML (declarative) connectors are currently supported.
696    Docker-based custom sources are not yet available.
697    """
698    check_guid_created_in_session(definition_id)
699    try:
700        processed_manifest = manifest_yaml
701        if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
702            processed_manifest = Path(manifest_yaml)
703
704        workspace: CloudWorkspace = _get_cloud_workspace()
705        definition = workspace.get_custom_source_definition(
706            definition_id=definition_id,
707            definition_type="yaml",
708        )
709        custom_source: CustomCloudSourceDefinition = definition.update_definition(
710            manifest_yaml=processed_manifest,
711            pre_validate=pre_validate,
712        )
713    except Exception as ex:
714        return f"Failed to update custom source definition '{definition_id}': {ex}"
715    else:
716        return (
717            "Successfully updated custom YAML source definition:\n"
718            + _get_custom_source_definition_description(
719                custom_source=custom_source,
720            )
721        )

Update a custom YAML source definition in Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def permanently_delete_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to delete.')]) -> str:
724@mcp_tool(
725    domain="cloud",
726    destructive=True,
727    open_world=True,
728)
729def permanently_delete_custom_source_definition(
730    definition_id: Annotated[
731        str,
732        Field(description="The ID of the custom source definition to delete."),
733    ],
734) -> str:
735    """Permanently delete a custom YAML source definition from Airbyte Cloud.
736
737    IMPORTANT: This operation requires the connector name to either:
738    1. Start with "delete:" (case insensitive), OR
739    2. Contain "delete-me" (case insensitive)
740
741    If the connector does not meet these requirements, the deletion will be rejected with a
742    helpful error message. Instruct the user to rename the connector appropriately to authorize
743    the deletion.
744
745    Note: Only YAML (declarative) connectors are currently supported.
746    Docker-based custom sources are not yet available.
747    """
748    check_guid_created_in_session(definition_id)
749    workspace: CloudWorkspace = _get_cloud_workspace()
750    definition = workspace.get_custom_source_definition(
751        definition_id=definition_id,
752        definition_type="yaml",
753    )
754    definition_name: str = definition.name  # Capture name before deletion
755    definition.permanently_delete(
756        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
757    )
758    return (
759        f"Successfully deleted custom source definition '{definition_name}' (ID: {definition_id})"
760    )

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to either:

  1. Start with "delete:" (case insensitive), OR
  2. Contain "delete-me" (case insensitive)

If the connector does not meet these requirements, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', open_world=True)
def rename_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the source.')]) -> str:
763@mcp_tool(
764    domain="cloud",
765    open_world=True,
766)
767def rename_cloud_source(
768    source_id: Annotated[
769        str,
770        Field(description="The ID of the deployed source to rename."),
771    ],
772    name: Annotated[
773        str,
774        Field(description="New name for the source."),
775    ],
776) -> str:
777    """Rename a deployed source connector on Airbyte Cloud.
778
779    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
780    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
781    Airbyte Cloud API.
782    """
783    workspace: CloudWorkspace = _get_cloud_workspace()
784    source = workspace.get_source(source_id=source_id)
785    source.rename(name=name)
786    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"

Rename a deployed source connector on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def update_cloud_source_config( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None) -> str:
789@mcp_tool(
790    domain="cloud",
791    destructive=True,
792    open_world=True,
793)
794def update_cloud_source_config(
795    source_id: Annotated[
796        str,
797        Field(description="The ID of the deployed source to update."),
798    ],
799    config: Annotated[
800        dict | str,
801        Field(
802            description="New configuration for the source connector.",
803        ),
804    ],
805    config_secret_name: Annotated[
806        str | None,
807        Field(
808            description="The name of the secret containing the configuration.",
809            default=None,
810        ),
811    ] = None,
812) -> str:
813    """Update a deployed source connector's configuration on Airbyte Cloud.
814
815    This is a destructive operation that can break existing connections if the
816    configuration is changed incorrectly. Use with caution.
817
818    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
819    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
820    Airbyte Cloud API.
821    """
822    check_guid_created_in_session(source_id)
823    workspace: CloudWorkspace = _get_cloud_workspace()
824    source = workspace.get_source(source_id=source_id)
825
826    config_dict = resolve_config(
827        config=config,
828        config_secret_name=config_secret_name,
829        config_spec_jsonschema=None,  # We don't have the spec here
830    )
831
832    source.update_config(config=config_dict)
833    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', open_world=True)
def rename_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the destination.')]) -> str:
836@mcp_tool(
837    domain="cloud",
838    open_world=True,
839)
840def rename_cloud_destination(
841    destination_id: Annotated[
842        str,
843        Field(description="The ID of the deployed destination to rename."),
844    ],
845    name: Annotated[
846        str,
847        Field(description="New name for the destination."),
848    ],
849) -> str:
850    """Rename a deployed destination connector on Airbyte Cloud.
851
852    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
853    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
854    Airbyte Cloud API.
855    """
856    workspace: CloudWorkspace = _get_cloud_workspace()
857    destination = workspace.get_destination(destination_id=destination_id)
858    destination.rename(name=name)
859    return (
860        f"Successfully renamed destination '{destination_id}' to '{name}'. "
861        f"URL: {destination.connector_url}"
862    )

Rename a deployed destination connector on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def update_cloud_destination_config( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None) -> str:
865@mcp_tool(
866    domain="cloud",
867    destructive=True,
868    open_world=True,
869)
870def update_cloud_destination_config(
871    destination_id: Annotated[
872        str,
873        Field(description="The ID of the deployed destination to update."),
874    ],
875    config: Annotated[
876        dict | str,
877        Field(
878            description="New configuration for the destination connector.",
879        ),
880    ],
881    config_secret_name: Annotated[
882        str | None,
883        Field(
884            description="The name of the secret containing the configuration.",
885            default=None,
886        ),
887    ] = None,
888) -> str:
889    """Update a deployed destination connector's configuration on Airbyte Cloud.
890
891    This is a destructive operation that can break existing connections if the
892    configuration is changed incorrectly. Use with caution.
893
894    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
895    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
896    Airbyte Cloud API.
897    """
898    check_guid_created_in_session(destination_id)
899    workspace: CloudWorkspace = _get_cloud_workspace()
900    destination = workspace.get_destination(destination_id=destination_id)
901
902    config_dict = resolve_config(
903        config=config,
904        config_secret_name=config_secret_name,
905        config_spec_jsonschema=None,  # We don't have the spec here
906    )
907
908    destination.update_config(config=config_dict)
909    return (
910        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
911    )

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', open_world=True)
def rename_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the connection.')]) -> str:
914@mcp_tool(
915    domain="cloud",
916    open_world=True,
917)
918def rename_cloud_connection(
919    connection_id: Annotated[
920        str,
921        Field(description="The ID of the connection to rename."),
922    ],
923    name: Annotated[
924        str,
925        Field(description="New name for the connection."),
926    ],
927) -> str:
928    """Rename a connection on Airbyte Cloud.
929
930    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
931    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
932    Airbyte Cloud API.
933    """
934    workspace: CloudWorkspace = _get_cloud_workspace()
935    connection = workspace.get_connection(connection_id=connection_id)
936    connection.rename(name=name)
937    return (
938        f"Successfully renamed connection '{connection_id}' to '{name}'. "
939        f"URL: {connection.connection_url}"
940    )

Rename a connection on Airbyte Cloud.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def set_cloud_connection_table_prefix( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], prefix: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New table prefix to use when syncing to the destination.')]) -> str:
943@mcp_tool(
944    domain="cloud",
945    destructive=True,
946    open_world=True,
947)
948def set_cloud_connection_table_prefix(
949    connection_id: Annotated[
950        str,
951        Field(description="The ID of the connection to update."),
952    ],
953    prefix: Annotated[
954        str,
955        Field(description="New table prefix to use when syncing to the destination."),
956    ],
957) -> str:
958    """Set the table prefix for a connection on Airbyte Cloud.
959
960    This is a destructive operation that can break downstream dependencies if the
961    table prefix is changed incorrectly. Use with caution.
962
963    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
964    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
965    Airbyte Cloud API.
966    """
967    check_guid_created_in_session(connection_id)
968    workspace: CloudWorkspace = _get_cloud_workspace()
969    connection = workspace.get_connection(connection_id=connection_id)
970    connection.set_table_prefix(prefix=prefix)
971    return (
972        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
973        f"URL: {connection.connection_url}"
974    )

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def set_cloud_connection_selected_streams( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], stream_names: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description='The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.')]) -> str:
 977@mcp_tool(
 978    domain="cloud",
 979    destructive=True,
 980    open_world=True,
 981)
 982def set_cloud_connection_selected_streams(
 983    connection_id: Annotated[
 984        str,
 985        Field(description="The ID of the connection to update."),
 986    ],
 987    stream_names: Annotated[
 988        str | list[str],
 989        Field(
 990            description=(
 991                "The selected stream names to sync within the connection. "
 992                "Must be an explicit stream name or list of streams."
 993            )
 994        ),
 995    ],
 996) -> str:
 997    """Set the selected streams for a connection on Airbyte Cloud.
 998
 999    This is a destructive operation that can break existing connections if the
1000    stream selection is changed incorrectly. Use with caution.
1001
1002    By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
1003    and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
1004    Airbyte Cloud API.
1005    """
1006    check_guid_created_in_session(connection_id)
1007    workspace: CloudWorkspace = _get_cloud_workspace()
1008    connection = workspace.get_connection(connection_id=connection_id)
1009
1010    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
1011    connection.set_selected_streams(stream_names=resolved_streams_list)
1012
1013    return (
1014        f"Successfully set selected streams for connection '{connection_id}' "
1015        f"to {resolved_streams_list}. URL: {connection.connection_url}"
1016    )

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID, and AIRBYTE_API_ROOT environment variables will be used to authenticate with the Airbyte Cloud API.