airbyte.mcp.cloud_ops

Airbyte Cloud MCP operations.

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations."""
   3
   4from pathlib import Path
   5from typing import Annotated, Any, cast
   6
   7from fastmcp import FastMCP
   8from pydantic import BaseModel, Field
   9
  10from airbyte import cloud, get_destination, get_source
  11from airbyte._util import api_util
  12from airbyte.cloud.auth import (
  13    resolve_cloud_api_url,
  14    resolve_cloud_client_id,
  15    resolve_cloud_client_secret,
  16    resolve_cloud_workspace_id,
  17)
  18from airbyte.cloud.connectors import CustomCloudSourceDefinition
  19from airbyte.cloud.constants import FAILED_STATUSES
  20from airbyte.cloud.workspaces import CloudWorkspace
  21from airbyte.destinations.util import get_noop_destination
  22from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  23from airbyte.mcp._tool_utils import (
  24    check_guid_created_in_session,
  25    mcp_tool,
  26    register_guid_created_in_session,
  27    register_tools,
  28)
  29from airbyte.mcp._util import resolve_config, resolve_list_of_strings
  30from airbyte.secrets import SecretString
  31
  32
  33CLOUD_AUTH_TIP_TEXT = (
  34    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  35    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  36    "will be used to authenticate with the Airbyte Cloud API."
  37)
  38WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  39
  40
  41class CloudSourceResult(BaseModel):
  42    """Information about a deployed source connector in Airbyte Cloud."""
  43
  44    id: str
  45    """The source ID."""
  46    name: str
  47    """Display name of the source."""
  48    url: str
  49    """Web URL for managing this source in Airbyte Cloud."""
  50
  51
  52class CloudDestinationResult(BaseModel):
  53    """Information about a deployed destination connector in Airbyte Cloud."""
  54
  55    id: str
  56    """The destination ID."""
  57    name: str
  58    """Display name of the destination."""
  59    url: str
  60    """Web URL for managing this destination in Airbyte Cloud."""
  61
  62
  63class CloudConnectionResult(BaseModel):
  64    """Information about a deployed connection in Airbyte Cloud."""
  65
  66    id: str
  67    """The connection ID."""
  68    name: str
  69    """Display name of the connection."""
  70    url: str
  71    """Web URL for managing this connection in Airbyte Cloud."""
  72    source_id: str
  73    """ID of the source used by this connection."""
  74    destination_id: str
  75    """ID of the destination used by this connection."""
  76    last_job_status: str | None = None
  77    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  78    Only populated when with_connection_status=True."""
  79    last_job_id: int | None = None
  80    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  81    last_job_time: str | None = None
  82    """ISO 8601 timestamp of the most recent completed sync.
  83    Only populated when with_connection_status=True."""
  84    currently_running_job_id: int | None = None
  85    """Job ID of a currently running sync, if any.
  86    Only populated when with_connection_status=True."""
  87    currently_running_job_start_time: str | None = None
  88    """ISO 8601 timestamp of when the currently running sync started.
  89    Only populated when with_connection_status=True."""
  90
  91
  92class CloudSourceDetails(BaseModel):
  93    """Detailed information about a deployed source connector in Airbyte Cloud."""
  94
  95    source_id: str
  96    """The source ID."""
  97    source_name: str
  98    """Display name of the source."""
  99    source_url: str
 100    """Web URL for managing this source in Airbyte Cloud."""
 101    connector_definition_id: str
 102    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 103
 104
 105class CloudDestinationDetails(BaseModel):
 106    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 107
 108    destination_id: str
 109    """The destination ID."""
 110    destination_name: str
 111    """Display name of the destination."""
 112    destination_url: str
 113    """Web URL for managing this destination in Airbyte Cloud."""
 114    connector_definition_id: str
 115    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 116
 117
 118class CloudConnectionDetails(BaseModel):
 119    """Detailed information about a deployed connection in Airbyte Cloud."""
 120
 121    connection_id: str
 122    """The connection ID."""
 123    connection_name: str
 124    """Display name of the connection."""
 125    connection_url: str
 126    """Web URL for managing this connection in Airbyte Cloud."""
 127    source_id: str
 128    """ID of the source used by this connection."""
 129    source_name: str
 130    """Display name of the source."""
 131    destination_id: str
 132    """ID of the destination used by this connection."""
 133    destination_name: str
 134    """Display name of the destination."""
 135    selected_streams: list[str]
 136    """List of stream names selected for syncing."""
 137    table_prefix: str | None
 138    """Table prefix applied when syncing to the destination."""
 139
 140
 141class CloudOrganizationResult(BaseModel):
 142    """Information about an organization in Airbyte Cloud."""
 143
 144    id: str
 145    """The organization ID."""
 146    name: str
 147    """Display name of the organization."""
 148    email: str
 149    """Email associated with the organization."""
 150
 151
 152class CloudWorkspaceResult(BaseModel):
 153    """Information about a workspace in Airbyte Cloud."""
 154
 155    id: str
 156    """The workspace ID."""
 157    name: str
 158    """Display name of the workspace."""
 159    organization_id: str
 160    """ID of the organization this workspace belongs to."""
 161
 162
 163class LogReadResult(BaseModel):
 164    """Result of reading sync logs with pagination support."""
 165
 166    job_id: int
 167    """The job ID the logs belong to."""
 168    attempt_number: int
 169    """The attempt number the logs belong to."""
 170    log_text: str
 171    """The string containing the log text we are returning."""
 172    log_text_start_line: int
 173    """1-based line index of the first line returned."""
 174    log_text_line_count: int
 175    """Count of lines we are returning."""
 176    total_log_lines_available: int
 177    """Total number of log lines available, shows if any lines were missed due to the limit."""
 178
 179
 180def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
 181    """Get an authenticated CloudWorkspace.
 182
 183    Args:
 184        workspace_id: Optional workspace ID. If not provided, uses the
 185            AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
 186    """
 187    return CloudWorkspace(
 188        workspace_id=resolve_cloud_workspace_id(workspace_id),
 189        client_id=resolve_cloud_client_id(),
 190        client_secret=resolve_cloud_client_secret(),
 191        api_root=resolve_cloud_api_url(),
 192    )
 193
 194
 195@mcp_tool(
 196    domain="cloud",
 197    open_world=True,
 198    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 199)
 200def deploy_source_to_cloud(
 201    source_name: Annotated[
 202        str,
 203        Field(description="The name to use when deploying the source."),
 204    ],
 205    source_connector_name: Annotated[
 206        str,
 207        Field(description="The name of the source connector (e.g., 'source-faker')."),
 208    ],
 209    *,
 210    workspace_id: Annotated[
 211        str | None,
 212        Field(
 213            description=WORKSPACE_ID_TIP_TEXT,
 214            default=None,
 215        ),
 216    ],
 217    config: Annotated[
 218        dict | str | None,
 219        Field(
 220            description="The configuration for the source connector.",
 221            default=None,
 222        ),
 223    ],
 224    config_secret_name: Annotated[
 225        str | None,
 226        Field(
 227            description="The name of the secret containing the configuration.",
 228            default=None,
 229        ),
 230    ],
 231    unique: Annotated[
 232        bool,
 233        Field(
 234            description="Whether to require a unique name.",
 235            default=True,
 236        ),
 237    ],
 238) -> str:
 239    """Deploy a source connector to Airbyte Cloud."""
 240    source = get_source(
 241        source_connector_name,
 242        no_executor=True,
 243    )
 244    config_dict = resolve_config(
 245        config=config,
 246        config_secret_name=config_secret_name,
 247        config_spec_jsonschema=source.config_spec,
 248    )
 249    source.set_config(config_dict, validate=True)
 250
 251    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 252    deployed_source = workspace.deploy_source(
 253        name=source_name,
 254        source=source,
 255        unique=unique,
 256    )
 257
 258    register_guid_created_in_session(deployed_source.connector_id)
 259    return (
 260        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 261        f" and URL: {deployed_source.connector_url}"
 262    )
 263
 264
 265@mcp_tool(
 266    domain="cloud",
 267    open_world=True,
 268    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 269)
 270def deploy_destination_to_cloud(
 271    destination_name: Annotated[
 272        str,
 273        Field(description="The name to use when deploying the destination."),
 274    ],
 275    destination_connector_name: Annotated[
 276        str,
 277        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 278    ],
 279    *,
 280    workspace_id: Annotated[
 281        str | None,
 282        Field(
 283            description=WORKSPACE_ID_TIP_TEXT,
 284            default=None,
 285        ),
 286    ],
 287    config: Annotated[
 288        dict | str | None,
 289        Field(
 290            description="The configuration for the destination connector.",
 291            default=None,
 292        ),
 293    ],
 294    config_secret_name: Annotated[
 295        str | None,
 296        Field(
 297            description="The name of the secret containing the configuration.",
 298            default=None,
 299        ),
 300    ],
 301    unique: Annotated[
 302        bool,
 303        Field(
 304            description="Whether to require a unique name.",
 305            default=True,
 306        ),
 307    ],
 308) -> str:
 309    """Deploy a destination connector to Airbyte Cloud."""
 310    destination = get_destination(
 311        destination_connector_name,
 312        no_executor=True,
 313    )
 314    config_dict = resolve_config(
 315        config=config,
 316        config_secret_name=config_secret_name,
 317        config_spec_jsonschema=destination.config_spec,
 318    )
 319    destination.set_config(config_dict, validate=True)
 320
 321    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 322    deployed_destination = workspace.deploy_destination(
 323        name=destination_name,
 324        destination=destination,
 325        unique=unique,
 326    )
 327
 328    register_guid_created_in_session(deployed_destination.connector_id)
 329    return (
 330        f"Successfully deployed destination '{destination_name}' "
 331        f"with ID: {deployed_destination.connector_id}"
 332    )
 333
 334
 335@mcp_tool(
 336    domain="cloud",
 337    open_world=True,
 338    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 339)
 340def create_connection_on_cloud(
 341    connection_name: Annotated[
 342        str,
 343        Field(description="The name of the connection."),
 344    ],
 345    source_id: Annotated[
 346        str,
 347        Field(description="The ID of the deployed source."),
 348    ],
 349    destination_id: Annotated[
 350        str,
 351        Field(description="The ID of the deployed destination."),
 352    ],
 353    selected_streams: Annotated[
 354        str | list[str],
 355        Field(
 356            description=(
 357                "The selected stream names to sync within the connection. "
 358                "Must be an explicit stream name or list of streams. "
 359                "Cannot be empty or '*'."
 360            )
 361        ),
 362    ],
 363    *,
 364    workspace_id: Annotated[
 365        str | None,
 366        Field(
 367            description=WORKSPACE_ID_TIP_TEXT,
 368            default=None,
 369        ),
 370    ],
 371    table_prefix: Annotated[
 372        str | None,
 373        Field(
 374            description="Optional table prefix to use when syncing to the destination.",
 375            default=None,
 376        ),
 377    ],
 378) -> str:
 379    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 380    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 381    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 382    deployed_connection = workspace.deploy_connection(
 383        connection_name=connection_name,
 384        source=source_id,
 385        destination=destination_id,
 386        selected_streams=resolved_streams_list,
 387        table_prefix=table_prefix,
 388    )
 389
 390    register_guid_created_in_session(deployed_connection.connection_id)
 391    return (
 392        f"Successfully created connection '{connection_name}' "
 393        f"with ID '{deployed_connection.connection_id}' and "
 394        f"URL: {deployed_connection.connection_url}"
 395    )
 396
 397
 398@mcp_tool(
 399    domain="cloud",
 400    open_world=True,
 401    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 402)
 403def run_cloud_sync(
 404    connection_id: Annotated[
 405        str,
 406        Field(description="The ID of the Airbyte Cloud connection."),
 407    ],
 408    *,
 409    workspace_id: Annotated[
 410        str | None,
 411        Field(
 412            description=WORKSPACE_ID_TIP_TEXT,
 413            default=None,
 414        ),
 415    ],
 416    wait: Annotated[
 417        bool,
 418        Field(
 419            description=(
 420                "Whether to wait for the sync to complete. Since a sync can take between several "
 421                "minutes and several hours, this option is not recommended for most "
 422                "scenarios."
 423            ),
 424            default=False,
 425        ),
 426    ],
 427    wait_timeout: Annotated[
 428        int,
 429        Field(
 430            description="Maximum time to wait for sync completion (seconds).",
 431            default=300,
 432        ),
 433    ],
 434) -> str:
 435    """Run a sync job on Airbyte Cloud."""
 436    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 437    connection = workspace.get_connection(connection_id=connection_id)
 438    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 439
 440    if wait:
 441        status = sync_result.get_job_status()
 442        return (
 443            f"Sync completed with status: {status}. "
 444            f"Job ID is '{sync_result.job_id}' and "
 445            f"job URL is: {sync_result.job_url}"
 446        )
 447    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 448
 449
 450@mcp_tool(
 451    domain="cloud",
 452    read_only=True,
 453    idempotent=True,
 454    open_world=True,
 455    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 456)
 457def check_airbyte_cloud_workspace(
 458    *,
 459    workspace_id: Annotated[
 460        str | None,
 461        Field(
 462            description=WORKSPACE_ID_TIP_TEXT,
 463            default=None,
 464        ),
 465    ],
 466) -> str:
 467    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 468
 469    Returns workspace ID and workspace URL for verification.
 470    """
 471    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 472    workspace.connect()
 473
 474    return (
 475        f"✅ Successfully connected to Airbyte Cloud workspace.\n"
 476        f"Workspace ID: {workspace.workspace_id}\n"
 477        f"Workspace URL: {workspace.workspace_url}"
 478    )
 479
 480
 481@mcp_tool(
 482    domain="cloud",
 483    open_world=True,
 484    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 485)
 486def deploy_noop_destination_to_cloud(
 487    name: str = "No-op Destination",
 488    *,
 489    workspace_id: Annotated[
 490        str | None,
 491        Field(
 492            description=WORKSPACE_ID_TIP_TEXT,
 493            default=None,
 494        ),
 495    ],
 496    unique: bool = True,
 497) -> str:
 498    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 499    destination = get_noop_destination()
 500    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 501    deployed_destination = workspace.deploy_destination(
 502        name=name,
 503        destination=destination,
 504        unique=unique,
 505    )
 506    register_guid_created_in_session(deployed_destination.connector_id)
 507    return (
 508        f"Successfully deployed No-op Destination "
 509        f"with ID '{deployed_destination.connector_id}' and "
 510        f"URL: {deployed_destination.connector_url}"
 511    )
 512
 513
 514@mcp_tool(
 515    domain="cloud",
 516    read_only=True,
 517    idempotent=True,
 518    open_world=True,
 519    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 520)
 521def get_cloud_sync_status(
 522    connection_id: Annotated[
 523        str,
 524        Field(
 525            description="The ID of the Airbyte Cloud connection.",
 526        ),
 527    ],
 528    job_id: Annotated[
 529        int | None,
 530        Field(
 531            description="Optional job ID. If not provided, the latest job will be used.",
 532            default=None,
 533        ),
 534    ],
 535    *,
 536    workspace_id: Annotated[
 537        str | None,
 538        Field(
 539            description=WORKSPACE_ID_TIP_TEXT,
 540            default=None,
 541        ),
 542    ],
 543    include_attempts: Annotated[
 544        bool,
 545        Field(
 546            description="Whether to include detailed attempts information.",
 547            default=False,
 548        ),
 549    ],
 550) -> dict[str, Any]:
 551    """Get the status of a sync job from the Airbyte Cloud."""
 552    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 553    connection = workspace.get_connection(connection_id=connection_id)
 554
 555    # If a job ID is provided, get the job by ID.
 556    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 557
 558    if not sync_result:
 559        return {"status": None, "job_id": None, "attempts": []}
 560
 561    result = {
 562        "status": sync_result.get_job_status(),
 563        "job_id": sync_result.job_id,
 564        "bytes_synced": sync_result.bytes_synced,
 565        "records_synced": sync_result.records_synced,
 566        "start_time": sync_result.start_time.isoformat(),
 567        "job_url": sync_result.job_url,
 568        "attempts": [],
 569    }
 570
 571    if include_attempts:
 572        attempts = sync_result.get_attempts()
 573        result["attempts"] = [
 574            {
 575                "attempt_number": attempt.attempt_number,
 576                "attempt_id": attempt.attempt_id,
 577                "status": attempt.status,
 578                "bytes_synced": attempt.bytes_synced,
 579                "records_synced": attempt.records_synced,
 580                "created_at": attempt.created_at.isoformat(),
 581            }
 582            for attempt in attempts
 583        ]
 584
 585    return result
 586
 587
 588@mcp_tool(
 589    domain="cloud",
 590    read_only=True,
 591    idempotent=True,
 592    open_world=True,
 593    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 594)
 595def list_deployed_cloud_source_connectors(
 596    *,
 597    workspace_id: Annotated[
 598        str | None,
 599        Field(
 600            description=WORKSPACE_ID_TIP_TEXT,
 601            default=None,
 602        ),
 603    ],
 604    name_contains: Annotated[
 605        str | None,
 606        Field(
 607            description="Optional case-insensitive substring to filter sources by name",
 608            default=None,
 609        ),
 610    ],
 611    max_items_limit: Annotated[
 612        int | None,
 613        Field(
 614            description="Optional maximum number of items to return (default: no limit)",
 615            default=None,
 616        ),
 617    ],
 618) -> list[CloudSourceResult]:
 619    """List all deployed source connectors in the Airbyte Cloud workspace."""
 620    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 621    sources = workspace.list_sources()
 622
 623    # Filter by name if requested
 624    if name_contains:
 625        needle = name_contains.lower()
 626        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 627
 628    # Apply limit if requested
 629    if max_items_limit is not None:
 630        sources = sources[:max_items_limit]
 631
 632    # Note: name and url are guaranteed non-null from list API responses
 633    return [
 634        CloudSourceResult(
 635            id=source.source_id,
 636            name=cast(str, source.name),
 637            url=cast(str, source.connector_url),
 638        )
 639        for source in sources
 640    ]
 641
 642
 643@mcp_tool(
 644    domain="cloud",
 645    read_only=True,
 646    idempotent=True,
 647    open_world=True,
 648    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 649)
 650def list_deployed_cloud_destination_connectors(
 651    *,
 652    workspace_id: Annotated[
 653        str | None,
 654        Field(
 655            description=WORKSPACE_ID_TIP_TEXT,
 656            default=None,
 657        ),
 658    ],
 659    name_contains: Annotated[
 660        str | None,
 661        Field(
 662            description="Optional case-insensitive substring to filter destinations by name",
 663            default=None,
 664        ),
 665    ],
 666    max_items_limit: Annotated[
 667        int | None,
 668        Field(
 669            description="Optional maximum number of items to return (default: no limit)",
 670            default=None,
 671        ),
 672    ],
 673) -> list[CloudDestinationResult]:
 674    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 675    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 676    destinations = workspace.list_destinations()
 677
 678    # Filter by name if requested
 679    if name_contains:
 680        needle = name_contains.lower()
 681        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 682
 683    # Apply limit if requested
 684    if max_items_limit is not None:
 685        destinations = destinations[:max_items_limit]
 686
 687    # Note: name and url are guaranteed non-null from list API responses
 688    return [
 689        CloudDestinationResult(
 690            id=destination.destination_id,
 691            name=cast(str, destination.name),
 692            url=cast(str, destination.connector_url),
 693        )
 694        for destination in destinations
 695    ]
 696
 697
 698@mcp_tool(
 699    domain="cloud",
 700    read_only=True,
 701    idempotent=True,
 702    open_world=True,
 703    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 704)
 705def describe_cloud_source(
 706    source_id: Annotated[
 707        str,
 708        Field(description="The ID of the source to describe."),
 709    ],
 710    *,
 711    workspace_id: Annotated[
 712        str | None,
 713        Field(
 714            description=WORKSPACE_ID_TIP_TEXT,
 715            default=None,
 716        ),
 717    ],
 718) -> CloudSourceDetails:
 719    """Get detailed information about a specific deployed source connector."""
 720    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 721    source = workspace.get_source(source_id=source_id)
 722
 723    # Access name property to ensure _connector_info is populated
 724    source_name = cast(str, source.name)
 725
 726    return CloudSourceDetails(
 727        source_id=source.source_id,
 728        source_name=source_name,
 729        source_url=source.connector_url,
 730        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 731    )
 732
 733
 734@mcp_tool(
 735    domain="cloud",
 736    read_only=True,
 737    idempotent=True,
 738    open_world=True,
 739    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 740)
 741def describe_cloud_destination(
 742    destination_id: Annotated[
 743        str,
 744        Field(description="The ID of the destination to describe."),
 745    ],
 746    *,
 747    workspace_id: Annotated[
 748        str | None,
 749        Field(
 750            description=WORKSPACE_ID_TIP_TEXT,
 751            default=None,
 752        ),
 753    ],
 754) -> CloudDestinationDetails:
 755    """Get detailed information about a specific deployed destination connector."""
 756    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 757    destination = workspace.get_destination(destination_id=destination_id)
 758
 759    # Access name property to ensure _connector_info is populated
 760    destination_name = cast(str, destination.name)
 761
 762    return CloudDestinationDetails(
 763        destination_id=destination.destination_id,
 764        destination_name=destination_name,
 765        destination_url=destination.connector_url,
 766        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 767    )
 768
 769
 770@mcp_tool(
 771    domain="cloud",
 772    read_only=True,
 773    idempotent=True,
 774    open_world=True,
 775    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 776)
 777def describe_cloud_connection(
 778    connection_id: Annotated[
 779        str,
 780        Field(description="The ID of the connection to describe."),
 781    ],
 782    *,
 783    workspace_id: Annotated[
 784        str | None,
 785        Field(
 786            description=WORKSPACE_ID_TIP_TEXT,
 787            default=None,
 788        ),
 789    ],
 790) -> CloudConnectionDetails:
 791    """Get detailed information about a specific deployed connection."""
 792    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 793    connection = workspace.get_connection(connection_id=connection_id)
 794
 795    return CloudConnectionDetails(
 796        connection_id=connection.connection_id,
 797        connection_name=cast(str, connection.name),
 798        connection_url=cast(str, connection.connection_url),
 799        source_id=connection.source_id,
 800        source_name=cast(str, connection.source.name),
 801        destination_id=connection.destination_id,
 802        destination_name=cast(str, connection.destination.name),
 803        selected_streams=connection.stream_names,
 804        table_prefix=connection.table_prefix,
 805    )
 806
 807
 808@mcp_tool(
 809    domain="cloud",
 810    read_only=True,
 811    idempotent=True,
 812    open_world=True,
 813    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 814)
 815def get_cloud_sync_logs(
 816    connection_id: Annotated[
 817        str,
 818        Field(description="The ID of the Airbyte Cloud connection."),
 819    ],
 820    job_id: Annotated[
 821        int | None,
 822        Field(description="Optional job ID. If not provided, the latest job will be used."),
 823    ] = None,
 824    attempt_number: Annotated[
 825        int | None,
 826        Field(
 827            description="Optional attempt number. If not provided, the latest attempt will be used."
 828        ),
 829    ] = None,
 830    *,
 831    workspace_id: Annotated[
 832        str | None,
 833        Field(
 834            description=WORKSPACE_ID_TIP_TEXT,
 835            default=None,
 836        ),
 837    ],
 838    max_lines: Annotated[
 839        int,
 840        Field(
 841            description=(
 842                "Maximum number of lines to return. "
 843                "Defaults to 4000 if not specified. "
 844                "If '0' is provided, no limit is applied."
 845            ),
 846            default=4000,
 847        ),
 848    ],
 849    from_tail: Annotated[
 850        bool | None,
 851        Field(
 852            description=(
 853                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
 854                "Defaults to True if `line_offset` is not specified. "
 855                "Cannot combine `from_tail=True` with `line_offset`."
 856            ),
 857            default=None,
 858        ),
 859    ],
 860    line_offset: Annotated[
 861        int | None,
 862        Field(
 863            description=(
 864                "Number of lines to skip from the beginning of the logs. "
 865                "Cannot be combined with `from_tail=True`."
 866            ),
 867            default=None,
 868        ),
 869    ],
 870) -> LogReadResult:
 871    """Get the logs from a sync job attempt on Airbyte Cloud."""
 872    # Validate that line_offset and from_tail are not both set
 873    if line_offset is not None and from_tail:
 874        raise PyAirbyteInputError(
 875            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
 876            context={"line_offset": line_offset, "from_tail": from_tail},
 877        )
 878
 879    if from_tail is None and line_offset is None:
 880        from_tail = True
 881    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 882    connection = workspace.get_connection(connection_id=connection_id)
 883
 884    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 885
 886    if not sync_result:
 887        raise AirbyteMissingResourceError(
 888            resource_type="sync job",
 889            resource_name_or_id=connection_id,
 890        )
 891
 892    attempts = sync_result.get_attempts()
 893
 894    if not attempts:
 895        raise AirbyteMissingResourceError(
 896            resource_type="sync attempt",
 897            resource_name_or_id=str(sync_result.job_id),
 898        )
 899
 900    if attempt_number is not None:
 901        target_attempt = None
 902        for attempt in attempts:
 903            if attempt.attempt_number == attempt_number:
 904                target_attempt = attempt
 905                break
 906
 907        if target_attempt is None:
 908            raise AirbyteMissingResourceError(
 909                resource_type="sync attempt",
 910                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
 911            )
 912    else:
 913        target_attempt = max(attempts, key=lambda a: a.attempt_number)
 914
 915    logs = target_attempt.get_full_log_text()
 916
 917    if not logs:
 918        # Return empty result with zero lines
 919        return LogReadResult(
 920            log_text=(
 921                f"[No logs available for job '{sync_result.job_id}', "
 922                f"attempt {target_attempt.attempt_number}.]"
 923            ),
 924            log_text_start_line=1,
 925            log_text_line_count=0,
 926            total_log_lines_available=0,
 927            job_id=sync_result.job_id,
 928            attempt_number=target_attempt.attempt_number,
 929        )
 930
 931    # Apply line limiting
 932    log_lines = logs.splitlines()
 933    total_lines = len(log_lines)
 934
 935    # Determine effective max_lines (0 means no limit)
 936    effective_max = total_lines if max_lines == 0 else max_lines
 937
 938    # Calculate start_index and slice based on from_tail or line_offset
 939    if from_tail:
 940        start_index = max(0, total_lines - effective_max)
 941        selected_lines = log_lines[start_index:][:effective_max]
 942    else:
 943        start_index = line_offset or 0
 944        selected_lines = log_lines[start_index : start_index + effective_max]
 945
 946    return LogReadResult(
 947        log_text="\n".join(selected_lines),
 948        log_text_start_line=start_index + 1,  # Convert to 1-based index
 949        log_text_line_count=len(selected_lines),
 950        total_log_lines_available=total_lines,
 951        job_id=sync_result.job_id,
 952        attempt_number=target_attempt.attempt_number,
 953    )
 954
 955
 956@mcp_tool(
 957    domain="cloud",
 958    read_only=True,
 959    idempotent=True,
 960    open_world=True,
 961    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 962)
 963def list_deployed_cloud_connections(
 964    *,
 965    workspace_id: Annotated[
 966        str | None,
 967        Field(
 968            description=WORKSPACE_ID_TIP_TEXT,
 969            default=None,
 970        ),
 971    ],
 972    name_contains: Annotated[
 973        str | None,
 974        Field(
 975            description="Optional case-insensitive substring to filter connections by name",
 976            default=None,
 977        ),
 978    ],
 979    max_items_limit: Annotated[
 980        int | None,
 981        Field(
 982            description="Optional maximum number of items to return (default: no limit)",
 983            default=None,
 984        ),
 985    ],
 986    with_connection_status: Annotated[
 987        bool | None,
 988        Field(
 989            description="If True, include status info for each connection's most recent sync job",
 990            default=False,
 991        ),
 992    ],
 993    failing_connections_only: Annotated[
 994        bool | None,
 995        Field(
 996            description="If True, only return connections with failed/cancelled last sync",
 997            default=False,
 998        ),
 999    ],
1000) -> list[CloudConnectionResult]:
1001    """List all deployed connections in the Airbyte Cloud workspace.
1002
1003    When with_connection_status is True, each connection result will include
1004    information about the most recent sync job status, skipping over any
1005    currently in-progress syncs to find the last completed job.
1006
1007    When failing_connections_only is True, only connections where the most
1008    recent completed sync job failed or was cancelled will be returned.
1009    This implicitly enables with_connection_status.
1010    """
1011    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1012    connections = workspace.list_connections()
1013
1014    # Filter by name if requested
1015    if name_contains:
1016        needle = name_contains.lower()
1017        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1018
1019    # If failing_connections_only is True, implicitly enable with_connection_status
1020    if failing_connections_only:
1021        with_connection_status = True
1022
1023    results: list[CloudConnectionResult] = []
1024
1025    for connection in connections:
1026        last_job_status: str | None = None
1027        last_job_id: int | None = None
1028        last_job_time: str | None = None
1029        currently_running_job_id: int | None = None
1030        currently_running_job_start_time: str | None = None
1031
1032        if with_connection_status:
1033            sync_logs = connection.get_previous_sync_logs(limit=5)
1034            last_completed_job_status = None  # Keep enum for comparison
1035
1036            for sync_result in sync_logs:
1037                job_status = sync_result.get_job_status()
1038
1039                if not sync_result.is_job_complete():
1040                    currently_running_job_id = sync_result.job_id
1041                    currently_running_job_start_time = sync_result.start_time.isoformat()
1042                    continue
1043
1044                last_completed_job_status = job_status
1045                last_job_status = str(job_status.value) if job_status else None
1046                last_job_id = sync_result.job_id
1047                last_job_time = sync_result.start_time.isoformat()
1048                break
1049
1050            if failing_connections_only and (
1051                last_completed_job_status is None
1052                or last_completed_job_status not in FAILED_STATUSES
1053            ):
1054                continue
1055
1056        results.append(
1057            CloudConnectionResult(
1058                id=connection.connection_id,
1059                name=cast(str, connection.name),
1060                url=cast(str, connection.connection_url),
1061                source_id=connection.source_id,
1062                destination_id=connection.destination_id,
1063                last_job_status=last_job_status,
1064                last_job_id=last_job_id,
1065                last_job_time=last_job_time,
1066                currently_running_job_id=currently_running_job_id,
1067                currently_running_job_start_time=currently_running_job_start_time,
1068            )
1069        )
1070
1071        if max_items_limit is not None and len(results) >= max_items_limit:
1072            break
1073
1074    return results
1075
1076
1077def _resolve_organization(
1078    organization_id: str | None,
1079    organization_name: str | None,
1080    *,
1081    api_root: str,
1082    client_id: SecretString,
1083    client_secret: SecretString,
1084) -> api_util.models.OrganizationResponse:
1085    """Resolve organization from either ID or exact name match.
1086
1087    Args:
1088        organization_id: The organization ID (if provided directly)
1089        organization_name: The organization name (exact match required)
1090        api_root: The API root URL
1091        client_id: OAuth client ID
1092        client_secret: OAuth client secret
1093
1094    Returns:
1095        The resolved OrganizationResponse object
1096
1097    Raises:
1098        PyAirbyteInputError: If neither or both parameters are provided,
1099            or if no organization matches the exact name
1100        AirbyteMissingResourceError: If the organization is not found
1101    """
1102    if organization_id and organization_name:
1103        raise PyAirbyteInputError(
1104            message="Provide either 'organization_id' or 'organization_name', not both."
1105        )
1106    if not organization_id and not organization_name:
1107        raise PyAirbyteInputError(
1108            message="Either 'organization_id' or 'organization_name' must be provided."
1109        )
1110
1111    # Get all organizations for the user
1112    orgs = api_util.list_organizations_for_user(
1113        api_root=api_root,
1114        client_id=client_id,
1115        client_secret=client_secret,
1116    )
1117
1118    if organization_id:
1119        # Find by ID
1120        matching_orgs = [org for org in orgs if org.organization_id == organization_id]
1121        if not matching_orgs:
1122            raise AirbyteMissingResourceError(
1123                resource_type="organization",
1124                context={
1125                    "organization_id": organization_id,
1126                    "message": f"No organization found with ID '{organization_id}' "
1127                    "for the current user.",
1128                },
1129            )
1130        return matching_orgs[0]
1131
1132    # Find by exact name match (case-sensitive)
1133    matching_orgs = [org for org in orgs if org.organization_name == organization_name]
1134
1135    if not matching_orgs:
1136        raise AirbyteMissingResourceError(
1137            resource_type="organization",
1138            context={
1139                "organization_name": organization_name,
1140                "message": f"No organization found with exact name '{organization_name}' "
1141                "for the current user.",
1142            },
1143        )
1144
1145    if len(matching_orgs) > 1:
1146        raise PyAirbyteInputError(
1147            message=f"Multiple organizations found with name '{organization_name}'. "
1148            "Please use 'organization_id' instead to specify the exact organization."
1149        )
1150
1151    return matching_orgs[0]
1152
1153
1154def _resolve_organization_id(
1155    organization_id: str | None,
1156    organization_name: str | None,
1157    *,
1158    api_root: str,
1159    client_id: SecretString,
1160    client_secret: SecretString,
1161) -> str:
1162    """Resolve organization ID from either ID or exact name match.
1163
1164    This is a convenience wrapper around _resolve_organization that returns just the ID.
1165    """
1166    org = _resolve_organization(
1167        organization_id=organization_id,
1168        organization_name=organization_name,
1169        api_root=api_root,
1170        client_id=client_id,
1171        client_secret=client_secret,
1172    )
1173    return org.organization_id
1174
1175
1176@mcp_tool(
1177    domain="cloud",
1178    read_only=True,
1179    idempotent=True,
1180    open_world=True,
1181    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1182)
1183def list_cloud_workspaces(
1184    *,
1185    organization_id: Annotated[
1186        str | None,
1187        Field(
1188            description="Organization ID. Required if organization_name is not provided.",
1189            default=None,
1190        ),
1191    ],
1192    organization_name: Annotated[
1193        str | None,
1194        Field(
1195            description=(
1196                "Organization name (exact match). " "Required if organization_id is not provided."
1197            ),
1198            default=None,
1199        ),
1200    ],
1201    name_contains: Annotated[
1202        str | None,
1203        Field(
1204            description="Optional substring to filter workspaces by name (server-side filtering)",
1205            default=None,
1206        ),
1207    ],
1208    max_items_limit: Annotated[
1209        int | None,
1210        Field(
1211            description="Optional maximum number of items to return (default: no limit)",
1212            default=None,
1213        ),
1214    ],
1215) -> list[CloudWorkspaceResult]:
1216    """List all workspaces in a specific organization.
1217
1218    Requires either organization_id OR organization_name (exact match) to be provided.
1219    This tool will NOT list workspaces across all organizations - you must specify
1220    which organization to list workspaces from.
1221    """
1222    api_root = resolve_cloud_api_url()
1223    client_id = resolve_cloud_client_id()
1224    client_secret = resolve_cloud_client_secret()
1225
1226    resolved_org_id = _resolve_organization_id(
1227        organization_id=organization_id,
1228        organization_name=organization_name,
1229        api_root=api_root,
1230        client_id=client_id,
1231        client_secret=client_secret,
1232    )
1233
1234    workspaces = api_util.list_workspaces_in_organization(
1235        organization_id=resolved_org_id,
1236        api_root=api_root,
1237        client_id=client_id,
1238        client_secret=client_secret,
1239        name_contains=name_contains,
1240        max_items_limit=max_items_limit,
1241    )
1242
1243    return [
1244        CloudWorkspaceResult(
1245            id=ws.get("workspaceId", ""),
1246            name=ws.get("name", ""),
1247            organization_id=ws.get("organizationId", ""),
1248        )
1249        for ws in workspaces
1250    ]
1251
1252
1253@mcp_tool(
1254    domain="cloud",
1255    read_only=True,
1256    idempotent=True,
1257    open_world=True,
1258    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1259)
1260def describe_cloud_organization(
1261    *,
1262    organization_id: Annotated[
1263        str | None,
1264        Field(
1265            description="Organization ID. Required if organization_name is not provided.",
1266            default=None,
1267        ),
1268    ],
1269    organization_name: Annotated[
1270        str | None,
1271        Field(
1272            description=(
1273                "Organization name (exact match). " "Required if organization_id is not provided."
1274            ),
1275            default=None,
1276        ),
1277    ],
1278) -> CloudOrganizationResult:
1279    """Get details about a specific organization.
1280
1281    Requires either organization_id OR organization_name (exact match) to be provided.
1282    This tool is useful for looking up an organization's ID from its name, or vice versa.
1283    """
1284    api_root = resolve_cloud_api_url()
1285    client_id = resolve_cloud_client_id()
1286    client_secret = resolve_cloud_client_secret()
1287
1288    org = _resolve_organization(
1289        organization_id=organization_id,
1290        organization_name=organization_name,
1291        api_root=api_root,
1292        client_id=client_id,
1293        client_secret=client_secret,
1294    )
1295
1296    return CloudOrganizationResult(
1297        id=org.organization_id,
1298        name=org.organization_name,
1299        email=org.email,
1300    )
1301
1302
1303def _get_custom_source_definition_description(
1304    custom_source: CustomCloudSourceDefinition,
1305) -> str:
1306    return "\n".join(
1307        [
1308            f" - Custom Source Name: {custom_source.name}",
1309            f" - Definition ID: {custom_source.definition_id}",
1310            f" - Definition Version: {custom_source.version}",
1311            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1312            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1313        ]
1314    )
1315
1316
1317@mcp_tool(
1318    domain="cloud",
1319    open_world=True,
1320    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1321)
1322def publish_custom_source_definition(
1323    name: Annotated[
1324        str,
1325        Field(description="The name for the custom connector definition."),
1326    ],
1327    *,
1328    workspace_id: Annotated[
1329        str | None,
1330        Field(
1331            description=WORKSPACE_ID_TIP_TEXT,
1332            default=None,
1333        ),
1334    ],
1335    manifest_yaml: Annotated[
1336        str | Path | None,
1337        Field(
1338            description=(
1339                "The Low-code CDK manifest as a YAML string or file path. "
1340                "Required for YAML connectors."
1341            ),
1342            default=None,
1343        ),
1344    ] = None,
1345    unique: Annotated[
1346        bool,
1347        Field(
1348            description="Whether to require a unique name.",
1349            default=True,
1350        ),
1351    ] = True,
1352    pre_validate: Annotated[
1353        bool,
1354        Field(
1355            description="Whether to validate the manifest client-side before publishing.",
1356            default=True,
1357        ),
1358    ] = True,
1359) -> str:
1360    """Publish a custom YAML source connector definition to Airbyte Cloud.
1361
1362    Note: Only YAML (declarative) connectors are currently supported.
1363    Docker-based custom sources are not yet available.
1364    """
1365    processed_manifest = manifest_yaml
1366    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1367        processed_manifest = Path(manifest_yaml)
1368
1369    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1370    custom_source = workspace.publish_custom_source_definition(
1371        name=name,
1372        manifest_yaml=processed_manifest,
1373        unique=unique,
1374        pre_validate=pre_validate,
1375    )
1376    register_guid_created_in_session(custom_source.definition_id)
1377    return (
1378        "Successfully published custom YAML source definition:\n"
1379        + _get_custom_source_definition_description(
1380            custom_source=custom_source,
1381        )
1382        + "\n"
1383    )
1384
1385
1386@mcp_tool(
1387    domain="cloud",
1388    read_only=True,
1389    idempotent=True,
1390    open_world=True,
1391)
1392def list_custom_source_definitions(
1393    *,
1394    workspace_id: Annotated[
1395        str | None,
1396        Field(
1397            description=WORKSPACE_ID_TIP_TEXT,
1398            default=None,
1399        ),
1400    ],
1401) -> list[dict[str, Any]]:
1402    """List custom YAML source definitions in the Airbyte Cloud workspace.
1403
1404    Note: Only YAML (declarative) connectors are currently supported.
1405    Docker-based custom sources are not yet available.
1406    """
1407    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1408    definitions = workspace.list_custom_source_definitions(
1409        definition_type="yaml",
1410    )
1411
1412    return [
1413        {
1414            "definition_id": d.definition_id,
1415            "name": d.name,
1416            "version": d.version,
1417            "connector_builder_project_url": d.connector_builder_project_url,
1418        }
1419        for d in definitions
1420    ]
1421
1422
1423@mcp_tool(
1424    domain="cloud",
1425    destructive=True,
1426    open_world=True,
1427)
1428def update_custom_source_definition(
1429    definition_id: Annotated[
1430        str,
1431        Field(description="The ID of the definition to update."),
1432    ],
1433    manifest_yaml: Annotated[
1434        str | Path,
1435        Field(
1436            description="New manifest as YAML string or file path.",
1437        ),
1438    ],
1439    *,
1440    workspace_id: Annotated[
1441        str | None,
1442        Field(
1443            description=WORKSPACE_ID_TIP_TEXT,
1444            default=None,
1445        ),
1446    ],
1447    pre_validate: Annotated[
1448        bool,
1449        Field(
1450            description="Whether to validate the manifest client-side before updating.",
1451            default=True,
1452        ),
1453    ] = True,
1454) -> str:
1455    """Update a custom YAML source definition in Airbyte Cloud.
1456
1457    Note: Only YAML (declarative) connectors are currently supported.
1458    Docker-based custom sources are not yet available.
1459    """
1460    check_guid_created_in_session(definition_id)
1461    processed_manifest = manifest_yaml
1462    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1463        processed_manifest = Path(manifest_yaml)
1464
1465    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1466    definition = workspace.get_custom_source_definition(
1467        definition_id=definition_id,
1468        definition_type="yaml",
1469    )
1470    custom_source: CustomCloudSourceDefinition = definition.update_definition(
1471        manifest_yaml=processed_manifest,
1472        pre_validate=pre_validate,
1473    )
1474    return (
1475        "Successfully updated custom YAML source definition:\n"
1476        + _get_custom_source_definition_description(
1477            custom_source=custom_source,
1478        )
1479    )
1480
1481
1482@mcp_tool(
1483    domain="cloud",
1484    destructive=True,
1485    open_world=True,
1486)
1487def permanently_delete_custom_source_definition(
1488    definition_id: Annotated[
1489        str,
1490        Field(description="The ID of the custom source definition to delete."),
1491    ],
1492    name: Annotated[
1493        str,
1494        Field(description="The expected name of the custom source definition (for verification)."),
1495    ],
1496    *,
1497    workspace_id: Annotated[
1498        str | None,
1499        Field(
1500            description=WORKSPACE_ID_TIP_TEXT,
1501            default=None,
1502        ),
1503    ],
1504) -> str:
1505    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1506
1507    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1508    (case insensitive).
1509
1510    If the connector does not meet this requirement, the deletion will be rejected with a
1511    helpful error message. Instruct the user to rename the connector appropriately to authorize
1512    the deletion.
1513
1514    The provided name must match the actual name of the definition for the operation to proceed.
1515    This is a safety measure to ensure you are deleting the correct resource.
1516
1517    Note: Only YAML (declarative) connectors are currently supported.
1518    Docker-based custom sources are not yet available.
1519    """
1520    check_guid_created_in_session(definition_id)
1521    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1522    definition = workspace.get_custom_source_definition(
1523        definition_id=definition_id,
1524        definition_type="yaml",
1525    )
1526    actual_name: str = definition.name
1527
1528    # Verify the name matches
1529    if actual_name != name:
1530        raise PyAirbyteInputError(
1531            message=(
1532                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1533                "The provided name must exactly match the definition's actual name. "
1534                "This is a safety measure to prevent accidental deletion."
1535            ),
1536            context={
1537                "definition_id": definition_id,
1538                "expected_name": name,
1539                "actual_name": actual_name,
1540            },
1541        )
1542
1543    definition.permanently_delete(
1544        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1545    )
1546    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
1547
1548
1549@mcp_tool(
1550    domain="cloud",
1551    destructive=True,
1552    open_world=True,
1553    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1554)
1555def permanently_delete_cloud_source(
1556    source_id: Annotated[
1557        str,
1558        Field(description="The ID of the deployed source to delete."),
1559    ],
1560    name: Annotated[
1561        str,
1562        Field(description="The expected name of the source (for verification)."),
1563    ],
1564) -> str:
1565    """Permanently delete a deployed source connector from Airbyte Cloud.
1566
1567    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1568    (case insensitive).
1569
1570    If the source does not meet this requirement, the deletion will be rejected with a
1571    helpful error message. Instruct the user to rename the source appropriately to authorize
1572    the deletion.
1573
1574    The provided name must match the actual name of the source for the operation to proceed.
1575    This is a safety measure to ensure you are deleting the correct resource.
1576    """
1577    check_guid_created_in_session(source_id)
1578    workspace: CloudWorkspace = _get_cloud_workspace()
1579    source = workspace.get_source(source_id=source_id)
1580    actual_name: str = cast(str, source.name)
1581
1582    # Verify the name matches
1583    if actual_name != name:
1584        raise PyAirbyteInputError(
1585            message=(
1586                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1587                "The provided name must exactly match the source's actual name. "
1588                "This is a safety measure to prevent accidental deletion."
1589            ),
1590            context={
1591                "source_id": source_id,
1592                "expected_name": name,
1593                "actual_name": actual_name,
1594            },
1595        )
1596
1597    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1598    workspace.permanently_delete_source(
1599        source=source_id,
1600        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1601    )
1602    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
1603
1604
1605@mcp_tool(
1606    domain="cloud",
1607    destructive=True,
1608    open_world=True,
1609    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1610)
1611def permanently_delete_cloud_destination(
1612    destination_id: Annotated[
1613        str,
1614        Field(description="The ID of the deployed destination to delete."),
1615    ],
1616    name: Annotated[
1617        str,
1618        Field(description="The expected name of the destination (for verification)."),
1619    ],
1620) -> str:
1621    """Permanently delete a deployed destination connector from Airbyte Cloud.
1622
1623    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1624    (case insensitive).
1625
1626    If the destination does not meet this requirement, the deletion will be rejected with a
1627    helpful error message. Instruct the user to rename the destination appropriately to authorize
1628    the deletion.
1629
1630    The provided name must match the actual name of the destination for the operation to proceed.
1631    This is a safety measure to ensure you are deleting the correct resource.
1632    """
1633    check_guid_created_in_session(destination_id)
1634    workspace: CloudWorkspace = _get_cloud_workspace()
1635    destination = workspace.get_destination(destination_id=destination_id)
1636    actual_name: str = cast(str, destination.name)
1637
1638    # Verify the name matches
1639    if actual_name != name:
1640        raise PyAirbyteInputError(
1641            message=(
1642                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1643                "The provided name must exactly match the destination's actual name. "
1644                "This is a safety measure to prevent accidental deletion."
1645            ),
1646            context={
1647                "destination_id": destination_id,
1648                "expected_name": name,
1649                "actual_name": actual_name,
1650            },
1651        )
1652
1653    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1654    workspace.permanently_delete_destination(
1655        destination=destination_id,
1656        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1657    )
1658    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
1659
1660
1661@mcp_tool(
1662    domain="cloud",
1663    destructive=True,
1664    open_world=True,
1665    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1666)
1667def permanently_delete_cloud_connection(
1668    connection_id: Annotated[
1669        str,
1670        Field(description="The ID of the connection to delete."),
1671    ],
1672    name: Annotated[
1673        str,
1674        Field(description="The expected name of the connection (for verification)."),
1675    ],
1676    *,
1677    cascade_delete_source: Annotated[
1678        bool,
1679        Field(
1680            description=(
1681                "Whether to also delete the source connector associated with this connection."
1682            ),
1683            default=False,
1684        ),
1685    ] = False,
1686    cascade_delete_destination: Annotated[
1687        bool,
1688        Field(
1689            description=(
1690                "Whether to also delete the destination connector associated with this connection."
1691            ),
1692            default=False,
1693        ),
1694    ] = False,
1695) -> str:
1696    """Permanently delete a connection from Airbyte Cloud.
1697
1698    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
1699    (case insensitive).
1700
1701    If the connection does not meet this requirement, the deletion will be rejected with a
1702    helpful error message. Instruct the user to rename the connection appropriately to authorize
1703    the deletion.
1704
1705    The provided name must match the actual name of the connection for the operation to proceed.
1706    This is a safety measure to ensure you are deleting the correct resource.
1707    """
1708    check_guid_created_in_session(connection_id)
1709    workspace: CloudWorkspace = _get_cloud_workspace()
1710    connection = workspace.get_connection(connection_id=connection_id)
1711    actual_name: str = cast(str, connection.name)
1712
1713    # Verify the name matches
1714    if actual_name != name:
1715        raise PyAirbyteInputError(
1716            message=(
1717                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1718                "The provided name must exactly match the connection's actual name. "
1719                "This is a safety measure to prevent accidental deletion."
1720            ),
1721            context={
1722                "connection_id": connection_id,
1723                "expected_name": name,
1724                "actual_name": actual_name,
1725            },
1726        )
1727
1728    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1729    workspace.permanently_delete_connection(
1730        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1731        connection=connection_id,
1732        cascade_delete_source=cascade_delete_source,
1733        cascade_delete_destination=cascade_delete_destination,
1734    )
1735    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
1736
1737
1738@mcp_tool(
1739    domain="cloud",
1740    open_world=True,
1741    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1742)
1743def rename_cloud_source(
1744    source_id: Annotated[
1745        str,
1746        Field(description="The ID of the deployed source to rename."),
1747    ],
1748    name: Annotated[
1749        str,
1750        Field(description="New name for the source."),
1751    ],
1752    *,
1753    workspace_id: Annotated[
1754        str | None,
1755        Field(
1756            description=WORKSPACE_ID_TIP_TEXT,
1757            default=None,
1758        ),
1759    ],
1760) -> str:
1761    """Rename a deployed source connector on Airbyte Cloud."""
1762    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1763    source = workspace.get_source(source_id=source_id)
1764    source.rename(name=name)
1765    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
1766
1767
1768@mcp_tool(
1769    domain="cloud",
1770    destructive=True,
1771    open_world=True,
1772    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1773)
1774def update_cloud_source_config(
1775    source_id: Annotated[
1776        str,
1777        Field(description="The ID of the deployed source to update."),
1778    ],
1779    config: Annotated[
1780        dict | str,
1781        Field(
1782            description="New configuration for the source connector.",
1783        ),
1784    ],
1785    config_secret_name: Annotated[
1786        str | None,
1787        Field(
1788            description="The name of the secret containing the configuration.",
1789            default=None,
1790        ),
1791    ] = None,
1792    *,
1793    workspace_id: Annotated[
1794        str | None,
1795        Field(
1796            description=WORKSPACE_ID_TIP_TEXT,
1797            default=None,
1798        ),
1799    ],
1800) -> str:
1801    """Update a deployed source connector's configuration on Airbyte Cloud.
1802
1803    This is a destructive operation that can break existing connections if the
1804    configuration is changed incorrectly. Use with caution.
1805    """
1806    check_guid_created_in_session(source_id)
1807    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1808    source = workspace.get_source(source_id=source_id)
1809
1810    config_dict = resolve_config(
1811        config=config,
1812        config_secret_name=config_secret_name,
1813        config_spec_jsonschema=None,  # We don't have the spec here
1814    )
1815
1816    source.update_config(config=config_dict)
1817    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
1818
1819
1820@mcp_tool(
1821    domain="cloud",
1822    open_world=True,
1823    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1824)
1825def rename_cloud_destination(
1826    destination_id: Annotated[
1827        str,
1828        Field(description="The ID of the deployed destination to rename."),
1829    ],
1830    name: Annotated[
1831        str,
1832        Field(description="New name for the destination."),
1833    ],
1834    *,
1835    workspace_id: Annotated[
1836        str | None,
1837        Field(
1838            description=WORKSPACE_ID_TIP_TEXT,
1839            default=None,
1840        ),
1841    ],
1842) -> str:
1843    """Rename a deployed destination connector on Airbyte Cloud."""
1844    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1845    destination = workspace.get_destination(destination_id=destination_id)
1846    destination.rename(name=name)
1847    return (
1848        f"Successfully renamed destination '{destination_id}' to '{name}'. "
1849        f"URL: {destination.connector_url}"
1850    )
1851
1852
1853@mcp_tool(
1854    domain="cloud",
1855    destructive=True,
1856    open_world=True,
1857    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1858)
1859def update_cloud_destination_config(
1860    destination_id: Annotated[
1861        str,
1862        Field(description="The ID of the deployed destination to update."),
1863    ],
1864    config: Annotated[
1865        dict | str,
1866        Field(
1867            description="New configuration for the destination connector.",
1868        ),
1869    ],
1870    config_secret_name: Annotated[
1871        str | None,
1872        Field(
1873            description="The name of the secret containing the configuration.",
1874            default=None,
1875        ),
1876    ],
1877    *,
1878    workspace_id: Annotated[
1879        str | None,
1880        Field(
1881            description=WORKSPACE_ID_TIP_TEXT,
1882            default=None,
1883        ),
1884    ],
1885) -> str:
1886    """Update a deployed destination connector's configuration on Airbyte Cloud.
1887
1888    This is a destructive operation that can break existing connections if the
1889    configuration is changed incorrectly. Use with caution.
1890    """
1891    check_guid_created_in_session(destination_id)
1892    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1893    destination = workspace.get_destination(destination_id=destination_id)
1894
1895    config_dict = resolve_config(
1896        config=config,
1897        config_secret_name=config_secret_name,
1898        config_spec_jsonschema=None,  # We don't have the spec here
1899    )
1900
1901    destination.update_config(config=config_dict)
1902    return (
1903        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
1904    )
1905
1906
1907@mcp_tool(
1908    domain="cloud",
1909    open_world=True,
1910    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1911)
1912def rename_cloud_connection(
1913    connection_id: Annotated[
1914        str,
1915        Field(description="The ID of the connection to rename."),
1916    ],
1917    name: Annotated[
1918        str,
1919        Field(description="New name for the connection."),
1920    ],
1921    *,
1922    workspace_id: Annotated[
1923        str | None,
1924        Field(
1925            description=WORKSPACE_ID_TIP_TEXT,
1926            default=None,
1927        ),
1928    ],
1929) -> str:
1930    """Rename a connection on Airbyte Cloud."""
1931    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1932    connection = workspace.get_connection(connection_id=connection_id)
1933    connection.rename(name=name)
1934    return (
1935        f"Successfully renamed connection '{connection_id}' to '{name}'. "
1936        f"URL: {connection.connection_url}"
1937    )
1938
1939
1940@mcp_tool(
1941    domain="cloud",
1942    destructive=True,
1943    open_world=True,
1944    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1945)
1946def set_cloud_connection_table_prefix(
1947    connection_id: Annotated[
1948        str,
1949        Field(description="The ID of the connection to update."),
1950    ],
1951    prefix: Annotated[
1952        str,
1953        Field(description="New table prefix to use when syncing to the destination."),
1954    ],
1955    *,
1956    workspace_id: Annotated[
1957        str | None,
1958        Field(
1959            description=WORKSPACE_ID_TIP_TEXT,
1960            default=None,
1961        ),
1962    ],
1963) -> str:
1964    """Set the table prefix for a connection on Airbyte Cloud.
1965
1966    This is a destructive operation that can break downstream dependencies if the
1967    table prefix is changed incorrectly. Use with caution.
1968    """
1969    check_guid_created_in_session(connection_id)
1970    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1971    connection = workspace.get_connection(connection_id=connection_id)
1972    connection.set_table_prefix(prefix=prefix)
1973    return (
1974        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
1975        f"URL: {connection.connection_url}"
1976    )
1977
1978
1979@mcp_tool(
1980    domain="cloud",
1981    destructive=True,
1982    open_world=True,
1983    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1984)
1985def set_cloud_connection_selected_streams(
1986    connection_id: Annotated[
1987        str,
1988        Field(description="The ID of the connection to update."),
1989    ],
1990    stream_names: Annotated[
1991        str | list[str],
1992        Field(
1993            description=(
1994                "The selected stream names to sync within the connection. "
1995                "Must be an explicit stream name or list of streams."
1996            )
1997        ),
1998    ],
1999    *,
2000    workspace_id: Annotated[
2001        str | None,
2002        Field(
2003            description=WORKSPACE_ID_TIP_TEXT,
2004            default=None,
2005        ),
2006    ],
2007) -> str:
2008    """Set the selected streams for a connection on Airbyte Cloud.
2009
2010    This is a destructive operation that can break existing connections if the
2011    stream selection is changed incorrectly. Use with caution.
2012    """
2013    check_guid_created_in_session(connection_id)
2014    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2015    connection = workspace.get_connection(connection_id=connection_id)
2016
2017    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2018    connection.set_selected_streams(stream_names=resolved_streams_list)
2019
2020    return (
2021        f"Successfully set selected streams for connection '{connection_id}' "
2022        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2023    )
2024
2025
2026def register_cloud_ops_tools(app: FastMCP) -> None:
2027    """@private Register tools with the FastMCP app.
2028
2029    This is an internal function and should not be called directly.
2030
2031    Tools are filtered based on mode settings:
2032    - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered
2033    - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive
2034      operations are protected by runtime session checks
2035    """
2036    register_tools(app, domain="cloud")
CLOUD_AUTH_TIP_TEXT = 'By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables will be used to authenticate with the Airbyte Cloud API.'
WORKSPACE_ID_TIP_TEXT = 'Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.'
class CloudSourceResult(pydantic.main.BaseModel):
42class CloudSourceResult(BaseModel):
43    """Information about a deployed source connector in Airbyte Cloud."""
44
45    id: str
46    """The source ID."""
47    name: str
48    """Display name of the source."""
49    url: str
50    """Web URL for managing this source in Airbyte Cloud."""

Information about a deployed source connector in Airbyte Cloud.

id: str = PydanticUndefined

The source ID.

name: str = PydanticUndefined

Display name of the source.

url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

class CloudDestinationResult(pydantic.main.BaseModel):
53class CloudDestinationResult(BaseModel):
54    """Information about a deployed destination connector in Airbyte Cloud."""
55
56    id: str
57    """The destination ID."""
58    name: str
59    """Display name of the destination."""
60    url: str
61    """Web URL for managing this destination in Airbyte Cloud."""

Information about a deployed destination connector in Airbyte Cloud.

id: str = PydanticUndefined

The destination ID.

name: str = PydanticUndefined

Display name of the destination.

url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

class CloudConnectionResult(pydantic.main.BaseModel):
64class CloudConnectionResult(BaseModel):
65    """Information about a deployed connection in Airbyte Cloud."""
66
67    id: str
68    """The connection ID."""
69    name: str
70    """Display name of the connection."""
71    url: str
72    """Web URL for managing this connection in Airbyte Cloud."""
73    source_id: str
74    """ID of the source used by this connection."""
75    destination_id: str
76    """ID of the destination used by this connection."""
77    last_job_status: str | None = None
78    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
79    Only populated when with_connection_status=True."""
80    last_job_id: int | None = None
81    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
82    last_job_time: str | None = None
83    """ISO 8601 timestamp of the most recent completed sync.
84    Only populated when with_connection_status=True."""
85    currently_running_job_id: int | None = None
86    """Job ID of a currently running sync, if any.
87    Only populated when with_connection_status=True."""
88    currently_running_job_start_time: str | None = None
89    """ISO 8601 timestamp of when the currently running sync started.
90    Only populated when with_connection_status=True."""

Information about a deployed connection in Airbyte Cloud.

id: str = PydanticUndefined

The connection ID.

name: str = PydanticUndefined

Display name of the connection.

url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

last_job_status: str | None = None

Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.

last_job_id: int | None = None

Job ID of the most recent completed sync. Only populated when with_connection_status=True.

last_job_time: str | None = None

ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.

currently_running_job_id: int | None = None

Job ID of a currently running sync, if any. Only populated when with_connection_status=True.

currently_running_job_start_time: str | None = None

ISO 8601 timestamp of when the currently running sync started. Only populated when with_connection_status=True.

class CloudSourceDetails(pydantic.main.BaseModel):
 93class CloudSourceDetails(BaseModel):
 94    """Detailed information about a deployed source connector in Airbyte Cloud."""
 95
 96    source_id: str
 97    """The source ID."""
 98    source_name: str
 99    """Display name of the source."""
100    source_url: str
101    """Web URL for managing this source in Airbyte Cloud."""
102    connector_definition_id: str
103    """The connector definition ID (e.g., the ID for 'source-postgres')."""

Detailed information about a deployed source connector in Airbyte Cloud.

source_id: str = PydanticUndefined

The source ID.

source_name: str = PydanticUndefined

Display name of the source.

source_url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'source-postgres').

class CloudDestinationDetails(pydantic.main.BaseModel):
106class CloudDestinationDetails(BaseModel):
107    """Detailed information about a deployed destination connector in Airbyte Cloud."""
108
109    destination_id: str
110    """The destination ID."""
111    destination_name: str
112    """Display name of the destination."""
113    destination_url: str
114    """Web URL for managing this destination in Airbyte Cloud."""
115    connector_definition_id: str
116    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""

Detailed information about a deployed destination connector in Airbyte Cloud.

destination_id: str = PydanticUndefined

The destination ID.

destination_name: str = PydanticUndefined

Display name of the destination.

destination_url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'destination-snowflake').

class CloudConnectionDetails(pydantic.main.BaseModel):
119class CloudConnectionDetails(BaseModel):
120    """Detailed information about a deployed connection in Airbyte Cloud."""
121
122    connection_id: str
123    """The connection ID."""
124    connection_name: str
125    """Display name of the connection."""
126    connection_url: str
127    """Web URL for managing this connection in Airbyte Cloud."""
128    source_id: str
129    """ID of the source used by this connection."""
130    source_name: str
131    """Display name of the source."""
132    destination_id: str
133    """ID of the destination used by this connection."""
134    destination_name: str
135    """Display name of the destination."""
136    selected_streams: list[str]
137    """List of stream names selected for syncing."""
138    table_prefix: str | None
139    """Table prefix applied when syncing to the destination."""

Detailed information about a deployed connection in Airbyte Cloud.

connection_id: str = PydanticUndefined

The connection ID.

connection_name: str = PydanticUndefined

Display name of the connection.

connection_url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

source_name: str = PydanticUndefined

Display name of the source.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

destination_name: str = PydanticUndefined

Display name of the destination.

selected_streams: list[str] = PydanticUndefined

List of stream names selected for syncing.

table_prefix: str | None = PydanticUndefined

Table prefix applied when syncing to the destination.

class CloudOrganizationResult(pydantic.main.BaseModel):
142class CloudOrganizationResult(BaseModel):
143    """Information about an organization in Airbyte Cloud."""
144
145    id: str
146    """The organization ID."""
147    name: str
148    """Display name of the organization."""
149    email: str
150    """Email associated with the organization."""

Information about an organization in Airbyte Cloud.

id: str = PydanticUndefined

The organization ID.

name: str = PydanticUndefined

Display name of the organization.

email: str = PydanticUndefined

Email associated with the organization.

class CloudWorkspaceResult(pydantic.main.BaseModel):
153class CloudWorkspaceResult(BaseModel):
154    """Information about a workspace in Airbyte Cloud."""
155
156    id: str
157    """The workspace ID."""
158    name: str
159    """Display name of the workspace."""
160    organization_id: str
161    """ID of the organization this workspace belongs to."""

Information about a workspace in Airbyte Cloud.

id: str = PydanticUndefined

The workspace ID.

name: str = PydanticUndefined

Display name of the workspace.

organization_id: str = PydanticUndefined

ID of the organization this workspace belongs to.

class LogReadResult(pydantic.main.BaseModel):
164class LogReadResult(BaseModel):
165    """Result of reading sync logs with pagination support."""
166
167    job_id: int
168    """The job ID the logs belong to."""
169    attempt_number: int
170    """The attempt number the logs belong to."""
171    log_text: str
172    """The string containing the log text we are returning."""
173    log_text_start_line: int
174    """1-based line index of the first line returned."""
175    log_text_line_count: int
176    """Count of lines we are returning."""
177    total_log_lines_available: int
178    """Total number of log lines available, shows if any lines were missed due to the limit."""

Result of reading sync logs with pagination support.

job_id: int = PydanticUndefined

The job ID the logs belong to.

attempt_number: int = PydanticUndefined

The attempt number the logs belong to.

log_text: str = PydanticUndefined

The string containing the log text we are returning.

log_text_start_line: int = PydanticUndefined

1-based line index of the first line returned.

log_text_line_count: int = PydanticUndefined

Count of lines we are returning.

total_log_lines_available: int = PydanticUndefined

Total number of log lines available, shows if any lines were missed due to the limit.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_source_to_cloud( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
196@mcp_tool(
197    domain="cloud",
198    open_world=True,
199    extra_help_text=CLOUD_AUTH_TIP_TEXT,
200)
201def deploy_source_to_cloud(
202    source_name: Annotated[
203        str,
204        Field(description="The name to use when deploying the source."),
205    ],
206    source_connector_name: Annotated[
207        str,
208        Field(description="The name of the source connector (e.g., 'source-faker')."),
209    ],
210    *,
211    workspace_id: Annotated[
212        str | None,
213        Field(
214            description=WORKSPACE_ID_TIP_TEXT,
215            default=None,
216        ),
217    ],
218    config: Annotated[
219        dict | str | None,
220        Field(
221            description="The configuration for the source connector.",
222            default=None,
223        ),
224    ],
225    config_secret_name: Annotated[
226        str | None,
227        Field(
228            description="The name of the secret containing the configuration.",
229            default=None,
230        ),
231    ],
232    unique: Annotated[
233        bool,
234        Field(
235            description="Whether to require a unique name.",
236            default=True,
237        ),
238    ],
239) -> str:
240    """Deploy a source connector to Airbyte Cloud."""
241    source = get_source(
242        source_connector_name,
243        no_executor=True,
244    )
245    config_dict = resolve_config(
246        config=config,
247        config_secret_name=config_secret_name,
248        config_spec_jsonschema=source.config_spec,
249    )
250    source.set_config(config_dict, validate=True)
251
252    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
253    deployed_source = workspace.deploy_source(
254        name=source_name,
255        source=source,
256        unique=unique,
257    )
258
259    register_guid_created_in_session(deployed_source.connector_id)
260    return (
261        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
262        f" and URL: {deployed_source.connector_url}"
263    )

Deploy a source connector to Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_destination_to_cloud( destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
266@mcp_tool(
267    domain="cloud",
268    open_world=True,
269    extra_help_text=CLOUD_AUTH_TIP_TEXT,
270)
271def deploy_destination_to_cloud(
272    destination_name: Annotated[
273        str,
274        Field(description="The name to use when deploying the destination."),
275    ],
276    destination_connector_name: Annotated[
277        str,
278        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
279    ],
280    *,
281    workspace_id: Annotated[
282        str | None,
283        Field(
284            description=WORKSPACE_ID_TIP_TEXT,
285            default=None,
286        ),
287    ],
288    config: Annotated[
289        dict | str | None,
290        Field(
291            description="The configuration for the destination connector.",
292            default=None,
293        ),
294    ],
295    config_secret_name: Annotated[
296        str | None,
297        Field(
298            description="The name of the secret containing the configuration.",
299            default=None,
300        ),
301    ],
302    unique: Annotated[
303        bool,
304        Field(
305            description="Whether to require a unique name.",
306            default=True,
307        ),
308    ],
309) -> str:
310    """Deploy a destination connector to Airbyte Cloud."""
311    destination = get_destination(
312        destination_connector_name,
313        no_executor=True,
314    )
315    config_dict = resolve_config(
316        config=config,
317        config_secret_name=config_secret_name,
318        config_spec_jsonschema=destination.config_spec,
319    )
320    destination.set_config(config_dict, validate=True)
321
322    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
323    deployed_destination = workspace.deploy_destination(
324        name=destination_name,
325        destination=destination,
326        unique=unique,
327    )
328
329    register_guid_created_in_session(deployed_destination.connector_id)
330    return (
331        f"Successfully deployed destination '{destination_name}' "
332        f"with ID: {deployed_destination.connector_id}"
333    )

Deploy a destination connector to Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def create_connection_on_cloud( connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
336@mcp_tool(
337    domain="cloud",
338    open_world=True,
339    extra_help_text=CLOUD_AUTH_TIP_TEXT,
340)
341def create_connection_on_cloud(
342    connection_name: Annotated[
343        str,
344        Field(description="The name of the connection."),
345    ],
346    source_id: Annotated[
347        str,
348        Field(description="The ID of the deployed source."),
349    ],
350    destination_id: Annotated[
351        str,
352        Field(description="The ID of the deployed destination."),
353    ],
354    selected_streams: Annotated[
355        str | list[str],
356        Field(
357            description=(
358                "The selected stream names to sync within the connection. "
359                "Must be an explicit stream name or list of streams. "
360                "Cannot be empty or '*'."
361            )
362        ),
363    ],
364    *,
365    workspace_id: Annotated[
366        str | None,
367        Field(
368            description=WORKSPACE_ID_TIP_TEXT,
369            default=None,
370        ),
371    ],
372    table_prefix: Annotated[
373        str | None,
374        Field(
375            description="Optional table prefix to use when syncing to the destination.",
376            default=None,
377        ),
378    ],
379) -> str:
380    """Create a connection between a deployed source and destination on Airbyte Cloud."""
381    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
382    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
383    deployed_connection = workspace.deploy_connection(
384        connection_name=connection_name,
385        source=source_id,
386        destination=destination_id,
387        selected_streams=resolved_streams_list,
388        table_prefix=table_prefix,
389    )
390
391    register_guid_created_in_session(deployed_connection.connection_id)
392    return (
393        f"Successfully created connection '{connection_name}' "
394        f"with ID '{deployed_connection.connection_id}' and "
395        f"URL: {deployed_connection.connection_url}"
396    )

Create a connection between a deployed source and destination on Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def run_cloud_sync( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
399@mcp_tool(
400    domain="cloud",
401    open_world=True,
402    extra_help_text=CLOUD_AUTH_TIP_TEXT,
403)
404def run_cloud_sync(
405    connection_id: Annotated[
406        str,
407        Field(description="The ID of the Airbyte Cloud connection."),
408    ],
409    *,
410    workspace_id: Annotated[
411        str | None,
412        Field(
413            description=WORKSPACE_ID_TIP_TEXT,
414            default=None,
415        ),
416    ],
417    wait: Annotated[
418        bool,
419        Field(
420            description=(
421                "Whether to wait for the sync to complete. Since a sync can take between several "
422                "minutes and several hours, this option is not recommended for most "
423                "scenarios."
424            ),
425            default=False,
426        ),
427    ],
428    wait_timeout: Annotated[
429        int,
430        Field(
431            description="Maximum time to wait for sync completion (seconds).",
432            default=300,
433        ),
434    ],
435) -> str:
436    """Run a sync job on Airbyte Cloud."""
437    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
438    connection = workspace.get_connection(connection_id=connection_id)
439    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
440
441    if wait:
442        status = sync_result.get_job_status()
443        return (
444            f"Sync completed with status: {status}. "
445            f"Job ID is '{sync_result.job_id}' and "
446            f"job URL is: {sync_result.job_url}"
447        )
448    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"

Run a sync job on Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def check_airbyte_cloud_workspace( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
451@mcp_tool(
452    domain="cloud",
453    read_only=True,
454    idempotent=True,
455    open_world=True,
456    extra_help_text=CLOUD_AUTH_TIP_TEXT,
457)
458def check_airbyte_cloud_workspace(
459    *,
460    workspace_id: Annotated[
461        str | None,
462        Field(
463            description=WORKSPACE_ID_TIP_TEXT,
464            default=None,
465        ),
466    ],
467) -> str:
468    """Check if we have a valid Airbyte Cloud connection and return workspace info.
469
470    Returns workspace ID and workspace URL for verification.
471    """
472    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
473    workspace.connect()
474
475    return (
476        f"✅ Successfully connected to Airbyte Cloud workspace.\n"
477        f"Workspace ID: {workspace.workspace_id}\n"
478        f"Workspace URL: {workspace.workspace_url}"
479    )

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace ID and workspace URL for verification.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_noop_destination_to_cloud( name: str = 'No-op Destination', *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], unique: bool = True) -> str:
482@mcp_tool(
483    domain="cloud",
484    open_world=True,
485    extra_help_text=CLOUD_AUTH_TIP_TEXT,
486)
487def deploy_noop_destination_to_cloud(
488    name: str = "No-op Destination",
489    *,
490    workspace_id: Annotated[
491        str | None,
492        Field(
493            description=WORKSPACE_ID_TIP_TEXT,
494            default=None,
495        ),
496    ],
497    unique: bool = True,
498) -> str:
499    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
500    destination = get_noop_destination()
501    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
502    deployed_destination = workspace.deploy_destination(
503        name=name,
504        destination=destination,
505        unique=unique,
506    )
507    register_guid_created_in_session(deployed_destination.connector_id)
508    return (
509        f"Successfully deployed No-op Destination "
510        f"with ID '{deployed_destination.connector_id}' and "
511        f"URL: {deployed_destination.connector_url}"
512    )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_status( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
515@mcp_tool(
516    domain="cloud",
517    read_only=True,
518    idempotent=True,
519    open_world=True,
520    extra_help_text=CLOUD_AUTH_TIP_TEXT,
521)
522def get_cloud_sync_status(
523    connection_id: Annotated[
524        str,
525        Field(
526            description="The ID of the Airbyte Cloud connection.",
527        ),
528    ],
529    job_id: Annotated[
530        int | None,
531        Field(
532            description="Optional job ID. If not provided, the latest job will be used.",
533            default=None,
534        ),
535    ],
536    *,
537    workspace_id: Annotated[
538        str | None,
539        Field(
540            description=WORKSPACE_ID_TIP_TEXT,
541            default=None,
542        ),
543    ],
544    include_attempts: Annotated[
545        bool,
546        Field(
547            description="Whether to include detailed attempts information.",
548            default=False,
549        ),
550    ],
551) -> dict[str, Any]:
552    """Get the status of a sync job from the Airbyte Cloud."""
553    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
554    connection = workspace.get_connection(connection_id=connection_id)
555
556    # If a job ID is provided, get the job by ID.
557    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
558
559    if not sync_result:
560        return {"status": None, "job_id": None, "attempts": []}
561
562    result = {
563        "status": sync_result.get_job_status(),
564        "job_id": sync_result.job_id,
565        "bytes_synced": sync_result.bytes_synced,
566        "records_synced": sync_result.records_synced,
567        "start_time": sync_result.start_time.isoformat(),
568        "job_url": sync_result.job_url,
569        "attempts": [],
570    }
571
572    if include_attempts:
573        attempts = sync_result.get_attempts()
574        result["attempts"] = [
575            {
576                "attempt_number": attempt.attempt_number,
577                "attempt_id": attempt.attempt_id,
578                "status": attempt.status,
579                "bytes_synced": attempt.bytes_synced,
580                "records_synced": attempt.records_synced,
581                "created_at": attempt.created_at.isoformat(),
582            }
583            for attempt in attempts
584        ]
585
586    return result

Get the status of a sync job from the Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_source_connectors( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter sources by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudSourceResult]:
589@mcp_tool(
590    domain="cloud",
591    read_only=True,
592    idempotent=True,
593    open_world=True,
594    extra_help_text=CLOUD_AUTH_TIP_TEXT,
595)
596def list_deployed_cloud_source_connectors(
597    *,
598    workspace_id: Annotated[
599        str | None,
600        Field(
601            description=WORKSPACE_ID_TIP_TEXT,
602            default=None,
603        ),
604    ],
605    name_contains: Annotated[
606        str | None,
607        Field(
608            description="Optional case-insensitive substring to filter sources by name",
609            default=None,
610        ),
611    ],
612    max_items_limit: Annotated[
613        int | None,
614        Field(
615            description="Optional maximum number of items to return (default: no limit)",
616            default=None,
617        ),
618    ],
619) -> list[CloudSourceResult]:
620    """List all deployed source connectors in the Airbyte Cloud workspace."""
621    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
622    sources = workspace.list_sources()
623
624    # Filter by name if requested
625    if name_contains:
626        needle = name_contains.lower()
627        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
628
629    # Apply limit if requested
630    if max_items_limit is not None:
631        sources = sources[:max_items_limit]
632
633    # Note: name and url are guaranteed non-null from list API responses
634    return [
635        CloudSourceResult(
636            id=source.source_id,
637            name=cast(str, source.name),
638            url=cast(str, source.connector_url),
639        )
640        for source in sources
641    ]

List all deployed source connectors in the Airbyte Cloud workspace.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_destination_connectors( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter destinations by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudDestinationResult]:
644@mcp_tool(
645    domain="cloud",
646    read_only=True,
647    idempotent=True,
648    open_world=True,
649    extra_help_text=CLOUD_AUTH_TIP_TEXT,
650)
651def list_deployed_cloud_destination_connectors(
652    *,
653    workspace_id: Annotated[
654        str | None,
655        Field(
656            description=WORKSPACE_ID_TIP_TEXT,
657            default=None,
658        ),
659    ],
660    name_contains: Annotated[
661        str | None,
662        Field(
663            description="Optional case-insensitive substring to filter destinations by name",
664            default=None,
665        ),
666    ],
667    max_items_limit: Annotated[
668        int | None,
669        Field(
670            description="Optional maximum number of items to return (default: no limit)",
671            default=None,
672        ),
673    ],
674) -> list[CloudDestinationResult]:
675    """List all deployed destination connectors in the Airbyte Cloud workspace."""
676    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
677    destinations = workspace.list_destinations()
678
679    # Filter by name if requested
680    if name_contains:
681        needle = name_contains.lower()
682        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
683
684    # Apply limit if requested
685    if max_items_limit is not None:
686        destinations = destinations[:max_items_limit]
687
688    # Note: name and url are guaranteed non-null from list API responses
689    return [
690        CloudDestinationResult(
691            id=destination.destination_id,
692            name=cast(str, destination.name),
693            url=cast(str, destination.connector_url),
694        )
695        for destination in destinations
696    ]

List all deployed destination connectors in the Airbyte Cloud workspace.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the source to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudSourceDetails:
699@mcp_tool(
700    domain="cloud",
701    read_only=True,
702    idempotent=True,
703    open_world=True,
704    extra_help_text=CLOUD_AUTH_TIP_TEXT,
705)
706def describe_cloud_source(
707    source_id: Annotated[
708        str,
709        Field(description="The ID of the source to describe."),
710    ],
711    *,
712    workspace_id: Annotated[
713        str | None,
714        Field(
715            description=WORKSPACE_ID_TIP_TEXT,
716            default=None,
717        ),
718    ],
719) -> CloudSourceDetails:
720    """Get detailed information about a specific deployed source connector."""
721    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
722    source = workspace.get_source(source_id=source_id)
723
724    # Access name property to ensure _connector_info is populated
725    source_name = cast(str, source.name)
726
727    return CloudSourceDetails(
728        source_id=source.source_id,
729        source_name=source_name,
730        source_url=source.connector_url,
731        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
732    )

Get detailed information about a specific deployed source connector.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the destination to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudDestinationDetails:
735@mcp_tool(
736    domain="cloud",
737    read_only=True,
738    idempotent=True,
739    open_world=True,
740    extra_help_text=CLOUD_AUTH_TIP_TEXT,
741)
742def describe_cloud_destination(
743    destination_id: Annotated[
744        str,
745        Field(description="The ID of the destination to describe."),
746    ],
747    *,
748    workspace_id: Annotated[
749        str | None,
750        Field(
751            description=WORKSPACE_ID_TIP_TEXT,
752            default=None,
753        ),
754    ],
755) -> CloudDestinationDetails:
756    """Get detailed information about a specific deployed destination connector."""
757    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
758    destination = workspace.get_destination(destination_id=destination_id)
759
760    # Access name property to ensure _connector_info is populated
761    destination_name = cast(str, destination.name)
762
763    return CloudDestinationDetails(
764        destination_id=destination.destination_id,
765        destination_name=destination_name,
766        destination_url=destination.connector_url,
767        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
768    )

Get detailed information about a specific deployed destination connector.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudConnectionDetails:
771@mcp_tool(
772    domain="cloud",
773    read_only=True,
774    idempotent=True,
775    open_world=True,
776    extra_help_text=CLOUD_AUTH_TIP_TEXT,
777)
778def describe_cloud_connection(
779    connection_id: Annotated[
780        str,
781        Field(description="The ID of the connection to describe."),
782    ],
783    *,
784    workspace_id: Annotated[
785        str | None,
786        Field(
787            description=WORKSPACE_ID_TIP_TEXT,
788            default=None,
789        ),
790    ],
791) -> CloudConnectionDetails:
792    """Get detailed information about a specific deployed connection."""
793    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
794    connection = workspace.get_connection(connection_id=connection_id)
795
796    return CloudConnectionDetails(
797        connection_id=connection.connection_id,
798        connection_name=cast(str, connection.name),
799        connection_url=cast(str, connection.connection_url),
800        source_id=connection.source_id,
801        source_name=cast(str, connection.source.name),
802        destination_id=connection.destination_id,
803        destination_name=cast(str, connection.destination.name),
804        selected_streams=connection.stream_names,
805        table_prefix=connection.table_prefix,
806    )

Get detailed information about a specific deployed connection.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_logs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_lines: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=4000, description="Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.")], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`.")], line_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`.')]) -> LogReadResult:
809@mcp_tool(
810    domain="cloud",
811    read_only=True,
812    idempotent=True,
813    open_world=True,
814    extra_help_text=CLOUD_AUTH_TIP_TEXT,
815)
816def get_cloud_sync_logs(
817    connection_id: Annotated[
818        str,
819        Field(description="The ID of the Airbyte Cloud connection."),
820    ],
821    job_id: Annotated[
822        int | None,
823        Field(description="Optional job ID. If not provided, the latest job will be used."),
824    ] = None,
825    attempt_number: Annotated[
826        int | None,
827        Field(
828            description="Optional attempt number. If not provided, the latest attempt will be used."
829        ),
830    ] = None,
831    *,
832    workspace_id: Annotated[
833        str | None,
834        Field(
835            description=WORKSPACE_ID_TIP_TEXT,
836            default=None,
837        ),
838    ],
839    max_lines: Annotated[
840        int,
841        Field(
842            description=(
843                "Maximum number of lines to return. "
844                "Defaults to 4000 if not specified. "
845                "If '0' is provided, no limit is applied."
846            ),
847            default=4000,
848        ),
849    ],
850    from_tail: Annotated[
851        bool | None,
852        Field(
853            description=(
854                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
855                "Defaults to True if `line_offset` is not specified. "
856                "Cannot combine `from_tail=True` with `line_offset`."
857            ),
858            default=None,
859        ),
860    ],
861    line_offset: Annotated[
862        int | None,
863        Field(
864            description=(
865                "Number of lines to skip from the beginning of the logs. "
866                "Cannot be combined with `from_tail=True`."
867            ),
868            default=None,
869        ),
870    ],
871) -> LogReadResult:
872    """Get the logs from a sync job attempt on Airbyte Cloud."""
873    # Validate that line_offset and from_tail are not both set
874    if line_offset is not None and from_tail:
875        raise PyAirbyteInputError(
876            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
877            context={"line_offset": line_offset, "from_tail": from_tail},
878        )
879
880    if from_tail is None and line_offset is None:
881        from_tail = True
882    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
883    connection = workspace.get_connection(connection_id=connection_id)
884
885    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
886
887    if not sync_result:
888        raise AirbyteMissingResourceError(
889            resource_type="sync job",
890            resource_name_or_id=connection_id,
891        )
892
893    attempts = sync_result.get_attempts()
894
895    if not attempts:
896        raise AirbyteMissingResourceError(
897            resource_type="sync attempt",
898            resource_name_or_id=str(sync_result.job_id),
899        )
900
901    if attempt_number is not None:
902        target_attempt = None
903        for attempt in attempts:
904            if attempt.attempt_number == attempt_number:
905                target_attempt = attempt
906                break
907
908        if target_attempt is None:
909            raise AirbyteMissingResourceError(
910                resource_type="sync attempt",
911                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
912            )
913    else:
914        target_attempt = max(attempts, key=lambda a: a.attempt_number)
915
916    logs = target_attempt.get_full_log_text()
917
918    if not logs:
919        # Return empty result with zero lines
920        return LogReadResult(
921            log_text=(
922                f"[No logs available for job '{sync_result.job_id}', "
923                f"attempt {target_attempt.attempt_number}.]"
924            ),
925            log_text_start_line=1,
926            log_text_line_count=0,
927            total_log_lines_available=0,
928            job_id=sync_result.job_id,
929            attempt_number=target_attempt.attempt_number,
930        )
931
932    # Apply line limiting
933    log_lines = logs.splitlines()
934    total_lines = len(log_lines)
935
936    # Determine effective max_lines (0 means no limit)
937    effective_max = total_lines if max_lines == 0 else max_lines
938
939    # Calculate start_index and slice based on from_tail or line_offset
940    if from_tail:
941        start_index = max(0, total_lines - effective_max)
942        selected_lines = log_lines[start_index:][:effective_max]
943    else:
944        start_index = line_offset or 0
945        selected_lines = log_lines[start_index : start_index + effective_max]
946
947    return LogReadResult(
948        log_text="\n".join(selected_lines),
949        log_text_start_line=start_index + 1,  # Convert to 1-based index
950        log_text_line_count=len(selected_lines),
951        total_log_lines_available=total_lines,
952        job_id=sync_result.job_id,
953        attempt_number=target_attempt.attempt_number,
954    )

Get the logs from a sync job attempt on Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_connections( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter connections by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')], with_connection_status: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description="If True, include status info for each connection's most recent sync job")], failing_connections_only: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description='If True, only return connections with failed/cancelled last sync')]) -> list[CloudConnectionResult]:
 957@mcp_tool(
 958    domain="cloud",
 959    read_only=True,
 960    idempotent=True,
 961    open_world=True,
 962    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 963)
 964def list_deployed_cloud_connections(
 965    *,
 966    workspace_id: Annotated[
 967        str | None,
 968        Field(
 969            description=WORKSPACE_ID_TIP_TEXT,
 970            default=None,
 971        ),
 972    ],
 973    name_contains: Annotated[
 974        str | None,
 975        Field(
 976            description="Optional case-insensitive substring to filter connections by name",
 977            default=None,
 978        ),
 979    ],
 980    max_items_limit: Annotated[
 981        int | None,
 982        Field(
 983            description="Optional maximum number of items to return (default: no limit)",
 984            default=None,
 985        ),
 986    ],
 987    with_connection_status: Annotated[
 988        bool | None,
 989        Field(
 990            description="If True, include status info for each connection's most recent sync job",
 991            default=False,
 992        ),
 993    ],
 994    failing_connections_only: Annotated[
 995        bool | None,
 996        Field(
 997            description="If True, only return connections with failed/cancelled last sync",
 998            default=False,
 999        ),
1000    ],
1001) -> list[CloudConnectionResult]:
1002    """List all deployed connections in the Airbyte Cloud workspace.
1003
1004    When with_connection_status is True, each connection result will include
1005    information about the most recent sync job status, skipping over any
1006    currently in-progress syncs to find the last completed job.
1007
1008    When failing_connections_only is True, only connections where the most
1009    recent completed sync job failed or was cancelled will be returned.
1010    This implicitly enables with_connection_status.
1011    """
1012    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1013    connections = workspace.list_connections()
1014
1015    # Filter by name if requested
1016    if name_contains:
1017        needle = name_contains.lower()
1018        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1019
1020    # If failing_connections_only is True, implicitly enable with_connection_status
1021    if failing_connections_only:
1022        with_connection_status = True
1023
1024    results: list[CloudConnectionResult] = []
1025
1026    for connection in connections:
1027        last_job_status: str | None = None
1028        last_job_id: int | None = None
1029        last_job_time: str | None = None
1030        currently_running_job_id: int | None = None
1031        currently_running_job_start_time: str | None = None
1032
1033        if with_connection_status:
1034            sync_logs = connection.get_previous_sync_logs(limit=5)
1035            last_completed_job_status = None  # Keep enum for comparison
1036
1037            for sync_result in sync_logs:
1038                job_status = sync_result.get_job_status()
1039
1040                if not sync_result.is_job_complete():
1041                    currently_running_job_id = sync_result.job_id
1042                    currently_running_job_start_time = sync_result.start_time.isoformat()
1043                    continue
1044
1045                last_completed_job_status = job_status
1046                last_job_status = str(job_status.value) if job_status else None
1047                last_job_id = sync_result.job_id
1048                last_job_time = sync_result.start_time.isoformat()
1049                break
1050
1051            if failing_connections_only and (
1052                last_completed_job_status is None
1053                or last_completed_job_status not in FAILED_STATUSES
1054            ):
1055                continue
1056
1057        results.append(
1058            CloudConnectionResult(
1059                id=connection.connection_id,
1060                name=cast(str, connection.name),
1061                url=cast(str, connection.connection_url),
1062                source_id=connection.source_id,
1063                destination_id=connection.destination_id,
1064                last_job_status=last_job_status,
1065                last_job_id=last_job_id,
1066                last_job_time=last_job_time,
1067                currently_running_job_id=currently_running_job_id,
1068                currently_running_job_start_time=currently_running_job_start_time,
1069            )
1070        )
1071
1072        if max_items_limit is not None and len(results) >= max_items_limit:
1073            break
1074
1075    return results

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include information about the most recent sync job status, skipping over any currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most recent completed sync job failed or was cancelled will be returned. This implicitly enables with_connection_status.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_workspaces( *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional substring to filter workspaces by name (server-side filtering)')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudWorkspaceResult]:
1177@mcp_tool(
1178    domain="cloud",
1179    read_only=True,
1180    idempotent=True,
1181    open_world=True,
1182    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1183)
1184def list_cloud_workspaces(
1185    *,
1186    organization_id: Annotated[
1187        str | None,
1188        Field(
1189            description="Organization ID. Required if organization_name is not provided.",
1190            default=None,
1191        ),
1192    ],
1193    organization_name: Annotated[
1194        str | None,
1195        Field(
1196            description=(
1197                "Organization name (exact match). " "Required if organization_id is not provided."
1198            ),
1199            default=None,
1200        ),
1201    ],
1202    name_contains: Annotated[
1203        str | None,
1204        Field(
1205            description="Optional substring to filter workspaces by name (server-side filtering)",
1206            default=None,
1207        ),
1208    ],
1209    max_items_limit: Annotated[
1210        int | None,
1211        Field(
1212            description="Optional maximum number of items to return (default: no limit)",
1213            default=None,
1214        ),
1215    ],
1216) -> list[CloudWorkspaceResult]:
1217    """List all workspaces in a specific organization.
1218
1219    Requires either organization_id OR organization_name (exact match) to be provided.
1220    This tool will NOT list workspaces across all organizations - you must specify
1221    which organization to list workspaces from.
1222    """
1223    api_root = resolve_cloud_api_url()
1224    client_id = resolve_cloud_client_id()
1225    client_secret = resolve_cloud_client_secret()
1226
1227    resolved_org_id = _resolve_organization_id(
1228        organization_id=organization_id,
1229        organization_name=organization_name,
1230        api_root=api_root,
1231        client_id=client_id,
1232        client_secret=client_secret,
1233    )
1234
1235    workspaces = api_util.list_workspaces_in_organization(
1236        organization_id=resolved_org_id,
1237        api_root=api_root,
1238        client_id=client_id,
1239        client_secret=client_secret,
1240        name_contains=name_contains,
1241        max_items_limit=max_items_limit,
1242    )
1243
1244    return [
1245        CloudWorkspaceResult(
1246            id=ws.get("workspaceId", ""),
1247            name=ws.get("name", ""),
1248            organization_id=ws.get("organizationId", ""),
1249        )
1250        for ws in workspaces
1251    ]

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided. This tool will NOT list workspaces across all organizations - you must specify which organization to list workspaces from.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_organization( *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')]) -> CloudOrganizationResult:
1254@mcp_tool(
1255    domain="cloud",
1256    read_only=True,
1257    idempotent=True,
1258    open_world=True,
1259    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1260)
1261def describe_cloud_organization(
1262    *,
1263    organization_id: Annotated[
1264        str | None,
1265        Field(
1266            description="Organization ID. Required if organization_name is not provided.",
1267            default=None,
1268        ),
1269    ],
1270    organization_name: Annotated[
1271        str | None,
1272        Field(
1273            description=(
1274                "Organization name (exact match). " "Required if organization_id is not provided."
1275            ),
1276            default=None,
1277        ),
1278    ],
1279) -> CloudOrganizationResult:
1280    """Get details about a specific organization.
1281
1282    Requires either organization_id OR organization_name (exact match) to be provided.
1283    This tool is useful for looking up an organization's ID from its name, or vice versa.
1284    """
1285    api_root = resolve_cloud_api_url()
1286    client_id = resolve_cloud_client_id()
1287    client_secret = resolve_cloud_client_secret()
1288
1289    org = _resolve_organization(
1290        organization_id=organization_id,
1291        organization_name=organization_name,
1292        api_root=api_root,
1293        client_id=client_id,
1294        client_secret=client_secret,
1295    )
1296
1297    return CloudOrganizationResult(
1298        id=org.organization_id,
1299        name=org.organization_name,
1300        email=org.email,
1301    )

Get details about a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided. This tool is useful for looking up an organization's ID from its name, or vice versa.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def publish_custom_source_definition( name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name for the custom connector definition.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors.')] = None, unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')] = True, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before publishing.')] = True) -> str:
1318@mcp_tool(
1319    domain="cloud",
1320    open_world=True,
1321    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1322)
1323def publish_custom_source_definition(
1324    name: Annotated[
1325        str,
1326        Field(description="The name for the custom connector definition."),
1327    ],
1328    *,
1329    workspace_id: Annotated[
1330        str | None,
1331        Field(
1332            description=WORKSPACE_ID_TIP_TEXT,
1333            default=None,
1334        ),
1335    ],
1336    manifest_yaml: Annotated[
1337        str | Path | None,
1338        Field(
1339            description=(
1340                "The Low-code CDK manifest as a YAML string or file path. "
1341                "Required for YAML connectors."
1342            ),
1343            default=None,
1344        ),
1345    ] = None,
1346    unique: Annotated[
1347        bool,
1348        Field(
1349            description="Whether to require a unique name.",
1350            default=True,
1351        ),
1352    ] = True,
1353    pre_validate: Annotated[
1354        bool,
1355        Field(
1356            description="Whether to validate the manifest client-side before publishing.",
1357            default=True,
1358        ),
1359    ] = True,
1360) -> str:
1361    """Publish a custom YAML source connector definition to Airbyte Cloud.
1362
1363    Note: Only YAML (declarative) connectors are currently supported.
1364    Docker-based custom sources are not yet available.
1365    """
1366    processed_manifest = manifest_yaml
1367    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1368        processed_manifest = Path(manifest_yaml)
1369
1370    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1371    custom_source = workspace.publish_custom_source_definition(
1372        name=name,
1373        manifest_yaml=processed_manifest,
1374        unique=unique,
1375        pre_validate=pre_validate,
1376    )
1377    register_guid_created_in_session(custom_source.definition_id)
1378    return (
1379        "Successfully published custom YAML source definition:\n"
1380        + _get_custom_source_definition_description(
1381            custom_source=custom_source,
1382        )
1383        + "\n"
1384    )

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_custom_source_definitions( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> list[dict[str, typing.Any]]:
1387@mcp_tool(
1388    domain="cloud",
1389    read_only=True,
1390    idempotent=True,
1391    open_world=True,
1392)
1393def list_custom_source_definitions(
1394    *,
1395    workspace_id: Annotated[
1396        str | None,
1397        Field(
1398            description=WORKSPACE_ID_TIP_TEXT,
1399            default=None,
1400        ),
1401    ],
1402) -> list[dict[str, Any]]:
1403    """List custom YAML source definitions in the Airbyte Cloud workspace.
1404
1405    Note: Only YAML (declarative) connectors are currently supported.
1406    Docker-based custom sources are not yet available.
1407    """
1408    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1409    definitions = workspace.list_custom_source_definitions(
1410        definition_type="yaml",
1411    )
1412
1413    return [
1414        {
1415            "definition_id": d.definition_id,
1416            "name": d.name,
1417            "version": d.version,
1418            "connector_builder_project_url": d.connector_builder_project_url,
1419        }
1420        for d in definitions
1421    ]

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def update_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the definition to update.')], manifest_yaml: typing.Annotated[str | pathlib.Path, FieldInfo(annotation=NoneType, required=True, description='New manifest as YAML string or file path.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before updating.')] = True) -> str:
1424@mcp_tool(
1425    domain="cloud",
1426    destructive=True,
1427    open_world=True,
1428)
1429def update_custom_source_definition(
1430    definition_id: Annotated[
1431        str,
1432        Field(description="The ID of the definition to update."),
1433    ],
1434    manifest_yaml: Annotated[
1435        str | Path,
1436        Field(
1437            description="New manifest as YAML string or file path.",
1438        ),
1439    ],
1440    *,
1441    workspace_id: Annotated[
1442        str | None,
1443        Field(
1444            description=WORKSPACE_ID_TIP_TEXT,
1445            default=None,
1446        ),
1447    ],
1448    pre_validate: Annotated[
1449        bool,
1450        Field(
1451            description="Whether to validate the manifest client-side before updating.",
1452            default=True,
1453        ),
1454    ] = True,
1455) -> str:
1456    """Update a custom YAML source definition in Airbyte Cloud.
1457
1458    Note: Only YAML (declarative) connectors are currently supported.
1459    Docker-based custom sources are not yet available.
1460    """
1461    check_guid_created_in_session(definition_id)
1462    processed_manifest = manifest_yaml
1463    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1464        processed_manifest = Path(manifest_yaml)
1465
1466    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1467    definition = workspace.get_custom_source_definition(
1468        definition_id=definition_id,
1469        definition_type="yaml",
1470    )
1471    custom_source: CustomCloudSourceDefinition = definition.update_definition(
1472        manifest_yaml=processed_manifest,
1473        pre_validate=pre_validate,
1474    )
1475    return (
1476        "Successfully updated custom YAML source definition:\n"
1477        + _get_custom_source_definition_description(
1478            custom_source=custom_source,
1479        )
1480    )

Update a custom YAML source definition in Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def permanently_delete_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the custom source definition (for verification).')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1483@mcp_tool(
1484    domain="cloud",
1485    destructive=True,
1486    open_world=True,
1487)
1488def permanently_delete_custom_source_definition(
1489    definition_id: Annotated[
1490        str,
1491        Field(description="The ID of the custom source definition to delete."),
1492    ],
1493    name: Annotated[
1494        str,
1495        Field(description="The expected name of the custom source definition (for verification)."),
1496    ],
1497    *,
1498    workspace_id: Annotated[
1499        str | None,
1500        Field(
1501            description=WORKSPACE_ID_TIP_TEXT,
1502            default=None,
1503        ),
1504    ],
1505) -> str:
1506    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1507
1508    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1509    (case insensitive).
1510
1511    If the connector does not meet this requirement, the deletion will be rejected with a
1512    helpful error message. Instruct the user to rename the connector appropriately to authorize
1513    the deletion.
1514
1515    The provided name must match the actual name of the definition for the operation to proceed.
1516    This is a safety measure to ensure you are deleting the correct resource.
1517
1518    Note: Only YAML (declarative) connectors are currently supported.
1519    Docker-based custom sources are not yet available.
1520    """
1521    check_guid_created_in_session(definition_id)
1522    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1523    definition = workspace.get_custom_source_definition(
1524        definition_id=definition_id,
1525        definition_type="yaml",
1526    )
1527    actual_name: str = definition.name
1528
1529    # Verify the name matches
1530    if actual_name != name:
1531        raise PyAirbyteInputError(
1532            message=(
1533                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1534                "The provided name must exactly match the definition's actual name. "
1535                "This is a safety measure to prevent accidental deletion."
1536            ),
1537            context={
1538                "definition_id": definition_id,
1539                "expected_name": name,
1540                "actual_name": actual_name,
1541            },
1542        )
1543
1544    definition.permanently_delete(
1545        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1546    )
1547    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the source (for verification).')]) -> str:
1550@mcp_tool(
1551    domain="cloud",
1552    destructive=True,
1553    open_world=True,
1554    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1555)
1556def permanently_delete_cloud_source(
1557    source_id: Annotated[
1558        str,
1559        Field(description="The ID of the deployed source to delete."),
1560    ],
1561    name: Annotated[
1562        str,
1563        Field(description="The expected name of the source (for verification)."),
1564    ],
1565) -> str:
1566    """Permanently delete a deployed source connector from Airbyte Cloud.
1567
1568    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1569    (case insensitive).
1570
1571    If the source does not meet this requirement, the deletion will be rejected with a
1572    helpful error message. Instruct the user to rename the source appropriately to authorize
1573    the deletion.
1574
1575    The provided name must match the actual name of the source for the operation to proceed.
1576    This is a safety measure to ensure you are deleting the correct resource.
1577    """
1578    check_guid_created_in_session(source_id)
1579    workspace: CloudWorkspace = _get_cloud_workspace()
1580    source = workspace.get_source(source_id=source_id)
1581    actual_name: str = cast(str, source.name)
1582
1583    # Verify the name matches
1584    if actual_name != name:
1585        raise PyAirbyteInputError(
1586            message=(
1587                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1588                "The provided name must exactly match the source's actual name. "
1589                "This is a safety measure to prevent accidental deletion."
1590            ),
1591            context={
1592                "source_id": source_id,
1593                "expected_name": name,
1594                "actual_name": actual_name,
1595            },
1596        )
1597
1598    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1599    workspace.permanently_delete_source(
1600        source=source_id,
1601        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1602    )
1603    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" (case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the source appropriately to authorize the deletion.

The provided name must match the actual name of the source for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the destination (for verification).')]) -> str:
1606@mcp_tool(
1607    domain="cloud",
1608    destructive=True,
1609    open_world=True,
1610    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1611)
1612def permanently_delete_cloud_destination(
1613    destination_id: Annotated[
1614        str,
1615        Field(description="The ID of the deployed destination to delete."),
1616    ],
1617    name: Annotated[
1618        str,
1619        Field(description="The expected name of the destination (for verification)."),
1620    ],
1621) -> str:
1622    """Permanently delete a deployed destination connector from Airbyte Cloud.
1623
1624    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1625    (case insensitive).
1626
1627    If the destination does not meet this requirement, the deletion will be rejected with a
1628    helpful error message. Instruct the user to rename the destination appropriately to authorize
1629    the deletion.
1630
1631    The provided name must match the actual name of the destination for the operation to proceed.
1632    This is a safety measure to ensure you are deleting the correct resource.
1633    """
1634    check_guid_created_in_session(destination_id)
1635    workspace: CloudWorkspace = _get_cloud_workspace()
1636    destination = workspace.get_destination(destination_id=destination_id)
1637    actual_name: str = cast(str, destination.name)
1638
1639    # Verify the name matches
1640    if actual_name != name:
1641        raise PyAirbyteInputError(
1642            message=(
1643                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1644                "The provided name must exactly match the destination's actual name. "
1645                "This is a safety measure to prevent accidental deletion."
1646            ),
1647            context={
1648                "destination_id": destination_id,
1649                "expected_name": name,
1650                "actual_name": actual_name,
1651            },
1652        )
1653
1654    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1655    workspace.permanently_delete_destination(
1656        destination=destination_id,
1657        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1658    )
1659    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" (case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the destination appropriately to authorize the deletion.

The provided name must match the actual name of the destination for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the connection (for verification).')], *, cascade_delete_source: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the source connector associated with this connection.')] = False, cascade_delete_destination: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the destination connector associated with this connection.')] = False) -> str:
1662@mcp_tool(
1663    domain="cloud",
1664    destructive=True,
1665    open_world=True,
1666    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1667)
1668def permanently_delete_cloud_connection(
1669    connection_id: Annotated[
1670        str,
1671        Field(description="The ID of the connection to delete."),
1672    ],
1673    name: Annotated[
1674        str,
1675        Field(description="The expected name of the connection (for verification)."),
1676    ],
1677    *,
1678    cascade_delete_source: Annotated[
1679        bool,
1680        Field(
1681            description=(
1682                "Whether to also delete the source connector associated with this connection."
1683            ),
1684            default=False,
1685        ),
1686    ] = False,
1687    cascade_delete_destination: Annotated[
1688        bool,
1689        Field(
1690            description=(
1691                "Whether to also delete the destination connector associated with this connection."
1692            ),
1693            default=False,
1694        ),
1695    ] = False,
1696) -> str:
1697    """Permanently delete a connection from Airbyte Cloud.
1698
1699    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
1700    (case insensitive).
1701
1702    If the connection does not meet this requirement, the deletion will be rejected with a
1703    helpful error message. Instruct the user to rename the connection appropriately to authorize
1704    the deletion.
1705
1706    The provided name must match the actual name of the connection for the operation to proceed.
1707    This is a safety measure to ensure you are deleting the correct resource.
1708    """
1709    check_guid_created_in_session(connection_id)
1710    workspace: CloudWorkspace = _get_cloud_workspace()
1711    connection = workspace.get_connection(connection_id=connection_id)
1712    actual_name: str = cast(str, connection.name)
1713
1714    # Verify the name matches
1715    if actual_name != name:
1716        raise PyAirbyteInputError(
1717            message=(
1718                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1719                "The provided name must exactly match the connection's actual name. "
1720                "This is a safety measure to prevent accidental deletion."
1721            ),
1722            context={
1723                "connection_id": connection_id,
1724                "expected_name": name,
1725                "actual_name": actual_name,
1726            },
1727        )
1728
1729    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1730    workspace.permanently_delete_connection(
1731        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1732        connection=connection_id,
1733        cascade_delete_source=cascade_delete_source,
1734        cascade_delete_destination=cascade_delete_destination,
1735    )
1736    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" (case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connection appropriately to authorize the deletion.

The provided name must match the actual name of the connection for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the source.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1739@mcp_tool(
1740    domain="cloud",
1741    open_world=True,
1742    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1743)
1744def rename_cloud_source(
1745    source_id: Annotated[
1746        str,
1747        Field(description="The ID of the deployed source to rename."),
1748    ],
1749    name: Annotated[
1750        str,
1751        Field(description="New name for the source."),
1752    ],
1753    *,
1754    workspace_id: Annotated[
1755        str | None,
1756        Field(
1757            description=WORKSPACE_ID_TIP_TEXT,
1758            default=None,
1759        ),
1760    ],
1761) -> str:
1762    """Rename a deployed source connector on Airbyte Cloud."""
1763    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1764    source = workspace.get_source(source_id=source_id)
1765    source.rename(name=name)
1766    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"

Rename a deployed source connector on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_source_config( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1769@mcp_tool(
1770    domain="cloud",
1771    destructive=True,
1772    open_world=True,
1773    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1774)
1775def update_cloud_source_config(
1776    source_id: Annotated[
1777        str,
1778        Field(description="The ID of the deployed source to update."),
1779    ],
1780    config: Annotated[
1781        dict | str,
1782        Field(
1783            description="New configuration for the source connector.",
1784        ),
1785    ],
1786    config_secret_name: Annotated[
1787        str | None,
1788        Field(
1789            description="The name of the secret containing the configuration.",
1790            default=None,
1791        ),
1792    ] = None,
1793    *,
1794    workspace_id: Annotated[
1795        str | None,
1796        Field(
1797            description=WORKSPACE_ID_TIP_TEXT,
1798            default=None,
1799        ),
1800    ],
1801) -> str:
1802    """Update a deployed source connector's configuration on Airbyte Cloud.
1803
1804    This is a destructive operation that can break existing connections if the
1805    configuration is changed incorrectly. Use with caution.
1806    """
1807    check_guid_created_in_session(source_id)
1808    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1809    source = workspace.get_source(source_id=source_id)
1810
1811    config_dict = resolve_config(
1812        config=config,
1813        config_secret_name=config_secret_name,
1814        config_spec_jsonschema=None,  # We don't have the spec here
1815    )
1816
1817    source.update_config(config=config_dict)
1818    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1821@mcp_tool(
1822    domain="cloud",
1823    open_world=True,
1824    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1825)
1826def rename_cloud_destination(
1827    destination_id: Annotated[
1828        str,
1829        Field(description="The ID of the deployed destination to rename."),
1830    ],
1831    name: Annotated[
1832        str,
1833        Field(description="New name for the destination."),
1834    ],
1835    *,
1836    workspace_id: Annotated[
1837        str | None,
1838        Field(
1839            description=WORKSPACE_ID_TIP_TEXT,
1840            default=None,
1841        ),
1842    ],
1843) -> str:
1844    """Rename a deployed destination connector on Airbyte Cloud."""
1845    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1846    destination = workspace.get_destination(destination_id=destination_id)
1847    destination.rename(name=name)
1848    return (
1849        f"Successfully renamed destination '{destination_id}' to '{name}'. "
1850        f"URL: {destination.connector_url}"
1851    )

Rename a deployed destination connector on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_destination_config( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1854@mcp_tool(
1855    domain="cloud",
1856    destructive=True,
1857    open_world=True,
1858    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1859)
1860def update_cloud_destination_config(
1861    destination_id: Annotated[
1862        str,
1863        Field(description="The ID of the deployed destination to update."),
1864    ],
1865    config: Annotated[
1866        dict | str,
1867        Field(
1868            description="New configuration for the destination connector.",
1869        ),
1870    ],
1871    config_secret_name: Annotated[
1872        str | None,
1873        Field(
1874            description="The name of the secret containing the configuration.",
1875            default=None,
1876        ),
1877    ],
1878    *,
1879    workspace_id: Annotated[
1880        str | None,
1881        Field(
1882            description=WORKSPACE_ID_TIP_TEXT,
1883            default=None,
1884        ),
1885    ],
1886) -> str:
1887    """Update a deployed destination connector's configuration on Airbyte Cloud.
1888
1889    This is a destructive operation that can break existing connections if the
1890    configuration is changed incorrectly. Use with caution.
1891    """
1892    check_guid_created_in_session(destination_id)
1893    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1894    destination = workspace.get_destination(destination_id=destination_id)
1895
1896    config_dict = resolve_config(
1897        config=config,
1898        config_secret_name=config_secret_name,
1899        config_spec_jsonschema=None,  # We don't have the spec here
1900    )
1901
1902    destination.update_config(config=config_dict)
1903    return (
1904        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
1905    )

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1908@mcp_tool(
1909    domain="cloud",
1910    open_world=True,
1911    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1912)
1913def rename_cloud_connection(
1914    connection_id: Annotated[
1915        str,
1916        Field(description="The ID of the connection to rename."),
1917    ],
1918    name: Annotated[
1919        str,
1920        Field(description="New name for the connection."),
1921    ],
1922    *,
1923    workspace_id: Annotated[
1924        str | None,
1925        Field(
1926            description=WORKSPACE_ID_TIP_TEXT,
1927            default=None,
1928        ),
1929    ],
1930) -> str:
1931    """Rename a connection on Airbyte Cloud."""
1932    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1933    connection = workspace.get_connection(connection_id=connection_id)
1934    connection.rename(name=name)
1935    return (
1936        f"Successfully renamed connection '{connection_id}' to '{name}'. "
1937        f"URL: {connection.connection_url}"
1938    )

Rename a connection on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_table_prefix( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], prefix: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New table prefix to use when syncing to the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1941@mcp_tool(
1942    domain="cloud",
1943    destructive=True,
1944    open_world=True,
1945    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1946)
1947def set_cloud_connection_table_prefix(
1948    connection_id: Annotated[
1949        str,
1950        Field(description="The ID of the connection to update."),
1951    ],
1952    prefix: Annotated[
1953        str,
1954        Field(description="New table prefix to use when syncing to the destination."),
1955    ],
1956    *,
1957    workspace_id: Annotated[
1958        str | None,
1959        Field(
1960            description=WORKSPACE_ID_TIP_TEXT,
1961            default=None,
1962        ),
1963    ],
1964) -> str:
1965    """Set the table prefix for a connection on Airbyte Cloud.
1966
1967    This is a destructive operation that can break downstream dependencies if the
1968    table prefix is changed incorrectly. Use with caution.
1969    """
1970    check_guid_created_in_session(connection_id)
1971    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1972    connection = workspace.get_connection(connection_id=connection_id)
1973    connection.set_table_prefix(prefix=prefix)
1974    return (
1975        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
1976        f"URL: {connection.connection_url}"
1977    )

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_selected_streams( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], stream_names: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description='The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1980@mcp_tool(
1981    domain="cloud",
1982    destructive=True,
1983    open_world=True,
1984    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1985)
1986def set_cloud_connection_selected_streams(
1987    connection_id: Annotated[
1988        str,
1989        Field(description="The ID of the connection to update."),
1990    ],
1991    stream_names: Annotated[
1992        str | list[str],
1993        Field(
1994            description=(
1995                "The selected stream names to sync within the connection. "
1996                "Must be an explicit stream name or list of streams."
1997            )
1998        ),
1999    ],
2000    *,
2001    workspace_id: Annotated[
2002        str | None,
2003        Field(
2004            description=WORKSPACE_ID_TIP_TEXT,
2005            default=None,
2006        ),
2007    ],
2008) -> str:
2009    """Set the selected streams for a connection on Airbyte Cloud.
2010
2011    This is a destructive operation that can break existing connections if the
2012    stream selection is changed incorrectly. Use with caution.
2013    """
2014    check_guid_created_in_session(connection_id)
2015    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2016    connection = workspace.get_connection(connection_id=connection_id)
2017
2018    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2019    connection.set_selected_streams(stream_names=resolved_streams_list)
2020
2021    return (
2022        f"Successfully set selected streams for connection '{connection_id}' "
2023        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2024    )

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.