airbyte.mcp.cloud_ops

Airbyte Cloud MCP operations.

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations."""
   3
   4from pathlib import Path
   5from typing import Annotated, Any, Literal, cast
   6
   7from fastmcp import FastMCP
   8from pydantic import BaseModel, Field
   9
  10from airbyte import cloud, get_destination, get_source
  11from airbyte._util import api_util
  12from airbyte.cloud.connectors import CustomCloudSourceDefinition
  13from airbyte.cloud.constants import FAILED_STATUSES
  14from airbyte.cloud.workspaces import CloudWorkspace
  15from airbyte.destinations.util import get_noop_destination
  16from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  17from airbyte.mcp._tool_utils import (
  18    check_guid_created_in_session,
  19    mcp_tool,
  20    register_guid_created_in_session,
  21    register_tools,
  22)
  23from airbyte.mcp._util import (
  24    resolve_cloud_credentials,
  25    resolve_config,
  26    resolve_list_of_strings,
  27    resolve_workspace_id,
  28)
  29from airbyte.secrets import SecretString
  30
  31
  32CLOUD_AUTH_TIP_TEXT = (
  33    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  34    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  35    "will be used to authenticate with the Airbyte Cloud API."
  36)
  37WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  38
  39
  40class CloudSourceResult(BaseModel):
  41    """Information about a deployed source connector in Airbyte Cloud."""
  42
  43    id: str
  44    """The source ID."""
  45    name: str
  46    """Display name of the source."""
  47    url: str
  48    """Web URL for managing this source in Airbyte Cloud."""
  49
  50
  51class CloudDestinationResult(BaseModel):
  52    """Information about a deployed destination connector in Airbyte Cloud."""
  53
  54    id: str
  55    """The destination ID."""
  56    name: str
  57    """Display name of the destination."""
  58    url: str
  59    """Web URL for managing this destination in Airbyte Cloud."""
  60
  61
  62class CloudConnectionResult(BaseModel):
  63    """Information about a deployed connection in Airbyte Cloud."""
  64
  65    id: str
  66    """The connection ID."""
  67    name: str
  68    """Display name of the connection."""
  69    url: str
  70    """Web URL for managing this connection in Airbyte Cloud."""
  71    source_id: str
  72    """ID of the source used by this connection."""
  73    destination_id: str
  74    """ID of the destination used by this connection."""
  75    last_job_status: str | None = None
  76    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  77    Only populated when with_connection_status=True."""
  78    last_job_id: int | None = None
  79    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  80    last_job_time: str | None = None
  81    """ISO 8601 timestamp of the most recent completed sync.
  82    Only populated when with_connection_status=True."""
  83    currently_running_job_id: int | None = None
  84    """Job ID of a currently running sync, if any.
  85    Only populated when with_connection_status=True."""
  86    currently_running_job_start_time: str | None = None
  87    """ISO 8601 timestamp of when the currently running sync started.
  88    Only populated when with_connection_status=True."""
  89
  90
  91class CloudSourceDetails(BaseModel):
  92    """Detailed information about a deployed source connector in Airbyte Cloud."""
  93
  94    source_id: str
  95    """The source ID."""
  96    source_name: str
  97    """Display name of the source."""
  98    source_url: str
  99    """Web URL for managing this source in Airbyte Cloud."""
 100    connector_definition_id: str
 101    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 102
 103
 104class CloudDestinationDetails(BaseModel):
 105    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 106
 107    destination_id: str
 108    """The destination ID."""
 109    destination_name: str
 110    """Display name of the destination."""
 111    destination_url: str
 112    """Web URL for managing this destination in Airbyte Cloud."""
 113    connector_definition_id: str
 114    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 115
 116
 117class CloudConnectionDetails(BaseModel):
 118    """Detailed information about a deployed connection in Airbyte Cloud."""
 119
 120    connection_id: str
 121    """The connection ID."""
 122    connection_name: str
 123    """Display name of the connection."""
 124    connection_url: str
 125    """Web URL for managing this connection in Airbyte Cloud."""
 126    source_id: str
 127    """ID of the source used by this connection."""
 128    source_name: str
 129    """Display name of the source."""
 130    destination_id: str
 131    """ID of the destination used by this connection."""
 132    destination_name: str
 133    """Display name of the destination."""
 134    selected_streams: list[str]
 135    """List of stream names selected for syncing."""
 136    table_prefix: str | None
 137    """Table prefix applied when syncing to the destination."""
 138
 139
 140class CloudOrganizationResult(BaseModel):
 141    """Information about an organization in Airbyte Cloud."""
 142
 143    id: str
 144    """The organization ID."""
 145    name: str
 146    """Display name of the organization."""
 147    email: str
 148    """Email associated with the organization."""
 149
 150
 151class CloudWorkspaceResult(BaseModel):
 152    """Information about a workspace in Airbyte Cloud."""
 153
 154    workspace_id: str
 155    """The workspace ID."""
 156    workspace_name: str
 157    """Display name of the workspace."""
 158    workspace_url: str | None = None
 159    """URL to access the workspace in Airbyte Cloud."""
 160    organization_id: str
 161    """ID of the organization (requires ORGANIZATION_READER permission)."""
 162    organization_name: str | None = None
 163    """Name of the organization (requires ORGANIZATION_READER permission)."""
 164
 165
 166class LogReadResult(BaseModel):
 167    """Result of reading sync logs with pagination support."""
 168
 169    job_id: int
 170    """The job ID the logs belong to."""
 171    attempt_number: int
 172    """The attempt number the logs belong to."""
 173    log_text: str
 174    """The string containing the log text we are returning."""
 175    log_text_start_line: int
 176    """1-based line index of the first line returned."""
 177    log_text_line_count: int
 178    """Count of lines we are returning."""
 179    total_log_lines_available: int
 180    """Total number of log lines available, shows if any lines were missed due to the limit."""
 181
 182
 183class SyncJobResult(BaseModel):
 184    """Information about a sync job."""
 185
 186    job_id: int
 187    """The job ID."""
 188    status: str
 189    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
 190    bytes_synced: int
 191    """Number of bytes synced in this job."""
 192    records_synced: int
 193    """Number of records synced in this job."""
 194    start_time: str
 195    """ISO 8601 timestamp of when the job started."""
 196    job_url: str
 197    """URL to view the job in Airbyte Cloud."""
 198
 199
 200class SyncJobListResult(BaseModel):
 201    """Result of listing sync jobs with pagination support."""
 202
 203    jobs: list[SyncJobResult]
 204    """List of sync jobs."""
 205    jobs_count: int
 206    """Number of jobs returned in this response."""
 207    jobs_offset: int
 208    """Offset used for this request (0 if not specified)."""
 209    from_tail: bool
 210    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
 211
 212
 213def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
 214    """Get an authenticated CloudWorkspace.
 215
 216    Resolves credentials from multiple sources in order:
 217    1. HTTP headers (when running as MCP server with HTTP/SSE transport)
 218    2. Environment variables
 219
 220    Args:
 221        workspace_id: Optional workspace ID. If not provided, uses HTTP headers
 222            or the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
 223    """
 224    credentials = resolve_cloud_credentials()
 225    resolved_workspace_id = resolve_workspace_id(workspace_id)
 226
 227    return CloudWorkspace(
 228        workspace_id=resolved_workspace_id,
 229        client_id=credentials.client_id,
 230        client_secret=credentials.client_secret,
 231        bearer_token=credentials.bearer_token,
 232        api_root=credentials.api_root,
 233    )
 234
 235
 236@mcp_tool(
 237    domain="cloud",
 238    open_world=True,
 239    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 240)
 241def deploy_source_to_cloud(
 242    source_name: Annotated[
 243        str,
 244        Field(description="The name to use when deploying the source."),
 245    ],
 246    source_connector_name: Annotated[
 247        str,
 248        Field(description="The name of the source connector (e.g., 'source-faker')."),
 249    ],
 250    *,
 251    workspace_id: Annotated[
 252        str | None,
 253        Field(
 254            description=WORKSPACE_ID_TIP_TEXT,
 255            default=None,
 256        ),
 257    ],
 258    config: Annotated[
 259        dict | str | None,
 260        Field(
 261            description="The configuration for the source connector.",
 262            default=None,
 263        ),
 264    ],
 265    config_secret_name: Annotated[
 266        str | None,
 267        Field(
 268            description="The name of the secret containing the configuration.",
 269            default=None,
 270        ),
 271    ],
 272    unique: Annotated[
 273        bool,
 274        Field(
 275            description="Whether to require a unique name.",
 276            default=True,
 277        ),
 278    ],
 279) -> str:
 280    """Deploy a source connector to Airbyte Cloud."""
 281    source = get_source(
 282        source_connector_name,
 283        no_executor=True,
 284    )
 285    config_dict = resolve_config(
 286        config=config,
 287        config_secret_name=config_secret_name,
 288        config_spec_jsonschema=source.config_spec,
 289    )
 290    source.set_config(config_dict, validate=True)
 291
 292    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 293    deployed_source = workspace.deploy_source(
 294        name=source_name,
 295        source=source,
 296        unique=unique,
 297    )
 298
 299    register_guid_created_in_session(deployed_source.connector_id)
 300    return (
 301        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 302        f" and URL: {deployed_source.connector_url}"
 303    )
 304
 305
 306@mcp_tool(
 307    domain="cloud",
 308    open_world=True,
 309    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 310)
 311def deploy_destination_to_cloud(
 312    destination_name: Annotated[
 313        str,
 314        Field(description="The name to use when deploying the destination."),
 315    ],
 316    destination_connector_name: Annotated[
 317        str,
 318        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 319    ],
 320    *,
 321    workspace_id: Annotated[
 322        str | None,
 323        Field(
 324            description=WORKSPACE_ID_TIP_TEXT,
 325            default=None,
 326        ),
 327    ],
 328    config: Annotated[
 329        dict | str | None,
 330        Field(
 331            description="The configuration for the destination connector.",
 332            default=None,
 333        ),
 334    ],
 335    config_secret_name: Annotated[
 336        str | None,
 337        Field(
 338            description="The name of the secret containing the configuration.",
 339            default=None,
 340        ),
 341    ],
 342    unique: Annotated[
 343        bool,
 344        Field(
 345            description="Whether to require a unique name.",
 346            default=True,
 347        ),
 348    ],
 349) -> str:
 350    """Deploy a destination connector to Airbyte Cloud."""
 351    destination = get_destination(
 352        destination_connector_name,
 353        no_executor=True,
 354    )
 355    config_dict = resolve_config(
 356        config=config,
 357        config_secret_name=config_secret_name,
 358        config_spec_jsonschema=destination.config_spec,
 359    )
 360    destination.set_config(config_dict, validate=True)
 361
 362    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 363    deployed_destination = workspace.deploy_destination(
 364        name=destination_name,
 365        destination=destination,
 366        unique=unique,
 367    )
 368
 369    register_guid_created_in_session(deployed_destination.connector_id)
 370    return (
 371        f"Successfully deployed destination '{destination_name}' "
 372        f"with ID: {deployed_destination.connector_id}"
 373    )
 374
 375
 376@mcp_tool(
 377    domain="cloud",
 378    open_world=True,
 379    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 380)
 381def create_connection_on_cloud(
 382    connection_name: Annotated[
 383        str,
 384        Field(description="The name of the connection."),
 385    ],
 386    source_id: Annotated[
 387        str,
 388        Field(description="The ID of the deployed source."),
 389    ],
 390    destination_id: Annotated[
 391        str,
 392        Field(description="The ID of the deployed destination."),
 393    ],
 394    selected_streams: Annotated[
 395        str | list[str],
 396        Field(
 397            description=(
 398                "The selected stream names to sync within the connection. "
 399                "Must be an explicit stream name or list of streams. "
 400                "Cannot be empty or '*'."
 401            )
 402        ),
 403    ],
 404    *,
 405    workspace_id: Annotated[
 406        str | None,
 407        Field(
 408            description=WORKSPACE_ID_TIP_TEXT,
 409            default=None,
 410        ),
 411    ],
 412    table_prefix: Annotated[
 413        str | None,
 414        Field(
 415            description="Optional table prefix to use when syncing to the destination.",
 416            default=None,
 417        ),
 418    ],
 419) -> str:
 420    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 421    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 422    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 423    deployed_connection = workspace.deploy_connection(
 424        connection_name=connection_name,
 425        source=source_id,
 426        destination=destination_id,
 427        selected_streams=resolved_streams_list,
 428        table_prefix=table_prefix,
 429    )
 430
 431    register_guid_created_in_session(deployed_connection.connection_id)
 432    return (
 433        f"Successfully created connection '{connection_name}' "
 434        f"with ID '{deployed_connection.connection_id}' and "
 435        f"URL: {deployed_connection.connection_url}"
 436    )
 437
 438
 439@mcp_tool(
 440    domain="cloud",
 441    open_world=True,
 442    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 443)
 444def run_cloud_sync(
 445    connection_id: Annotated[
 446        str,
 447        Field(description="The ID of the Airbyte Cloud connection."),
 448    ],
 449    *,
 450    workspace_id: Annotated[
 451        str | None,
 452        Field(
 453            description=WORKSPACE_ID_TIP_TEXT,
 454            default=None,
 455        ),
 456    ],
 457    wait: Annotated[
 458        bool,
 459        Field(
 460            description=(
 461                "Whether to wait for the sync to complete. Since a sync can take between several "
 462                "minutes and several hours, this option is not recommended for most "
 463                "scenarios."
 464            ),
 465            default=False,
 466        ),
 467    ],
 468    wait_timeout: Annotated[
 469        int,
 470        Field(
 471            description="Maximum time to wait for sync completion (seconds).",
 472            default=300,
 473        ),
 474    ],
 475) -> str:
 476    """Run a sync job on Airbyte Cloud."""
 477    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 478    connection = workspace.get_connection(connection_id=connection_id)
 479    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 480
 481    if wait:
 482        status = sync_result.get_job_status()
 483        return (
 484            f"Sync completed with status: {status}. "
 485            f"Job ID is '{sync_result.job_id}' and "
 486            f"job URL is: {sync_result.job_url}"
 487        )
 488    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 489
 490
 491@mcp_tool(
 492    domain="cloud",
 493    read_only=True,
 494    idempotent=True,
 495    open_world=True,
 496    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 497)
 498def check_airbyte_cloud_workspace(
 499    *,
 500    workspace_id: Annotated[
 501        str | None,
 502        Field(
 503            description=WORKSPACE_ID_TIP_TEXT,
 504            default=None,
 505        ),
 506    ],
 507) -> CloudWorkspaceResult:
 508    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 509
 510    Returns workspace details including workspace ID, name, and organization info.
 511    """
 512    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 513
 514    # Get workspace details from the public API using workspace's credentials
 515    workspace_response = api_util.get_workspace(
 516        workspace_id=workspace.workspace_id,
 517        api_root=workspace.api_root,
 518        client_id=workspace.client_id,
 519        client_secret=workspace.client_secret,
 520        bearer_token=workspace.bearer_token,
 521    )
 522
 523    # Try to get organization info, but fail gracefully if we don't have permissions.
 524    # Fetching organization info requires ORGANIZATION_READER permissions on the organization,
 525    # which may not be available with workspace-scoped credentials.
 526    organization = workspace.get_organization(raise_on_error=False)
 527
 528    return CloudWorkspaceResult(
 529        workspace_id=workspace_response.workspace_id,
 530        workspace_name=workspace_response.name,
 531        workspace_url=workspace.workspace_url,
 532        organization_id=(
 533            organization.organization_id
 534            if organization
 535            else "[unavailable - requires ORGANIZATION_READER permission]"
 536        ),
 537        organization_name=organization.organization_name if organization else None,
 538    )
 539
 540
 541@mcp_tool(
 542    domain="cloud",
 543    open_world=True,
 544    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 545)
 546def deploy_noop_destination_to_cloud(
 547    name: str = "No-op Destination",
 548    *,
 549    workspace_id: Annotated[
 550        str | None,
 551        Field(
 552            description=WORKSPACE_ID_TIP_TEXT,
 553            default=None,
 554        ),
 555    ],
 556    unique: bool = True,
 557) -> str:
 558    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 559    destination = get_noop_destination()
 560    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 561    deployed_destination = workspace.deploy_destination(
 562        name=name,
 563        destination=destination,
 564        unique=unique,
 565    )
 566    register_guid_created_in_session(deployed_destination.connector_id)
 567    return (
 568        f"Successfully deployed No-op Destination "
 569        f"with ID '{deployed_destination.connector_id}' and "
 570        f"URL: {deployed_destination.connector_url}"
 571    )
 572
 573
 574@mcp_tool(
 575    domain="cloud",
 576    read_only=True,
 577    idempotent=True,
 578    open_world=True,
 579    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 580)
 581def get_cloud_sync_status(
 582    connection_id: Annotated[
 583        str,
 584        Field(
 585            description="The ID of the Airbyte Cloud connection.",
 586        ),
 587    ],
 588    job_id: Annotated[
 589        int | None,
 590        Field(
 591            description="Optional job ID. If not provided, the latest job will be used.",
 592            default=None,
 593        ),
 594    ],
 595    *,
 596    workspace_id: Annotated[
 597        str | None,
 598        Field(
 599            description=WORKSPACE_ID_TIP_TEXT,
 600            default=None,
 601        ),
 602    ],
 603    include_attempts: Annotated[
 604        bool,
 605        Field(
 606            description="Whether to include detailed attempts information.",
 607            default=False,
 608        ),
 609    ],
 610) -> dict[str, Any]:
 611    """Get the status of a sync job from the Airbyte Cloud."""
 612    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 613    connection = workspace.get_connection(connection_id=connection_id)
 614
 615    # If a job ID is provided, get the job by ID.
 616    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 617
 618    if not sync_result:
 619        return {"status": None, "job_id": None, "attempts": []}
 620
 621    result = {
 622        "status": sync_result.get_job_status(),
 623        "job_id": sync_result.job_id,
 624        "bytes_synced": sync_result.bytes_synced,
 625        "records_synced": sync_result.records_synced,
 626        "start_time": sync_result.start_time.isoformat(),
 627        "job_url": sync_result.job_url,
 628        "attempts": [],
 629    }
 630
 631    if include_attempts:
 632        attempts = sync_result.get_attempts()
 633        result["attempts"] = [
 634            {
 635                "attempt_number": attempt.attempt_number,
 636                "attempt_id": attempt.attempt_id,
 637                "status": attempt.status,
 638                "bytes_synced": attempt.bytes_synced,
 639                "records_synced": attempt.records_synced,
 640                "created_at": attempt.created_at.isoformat(),
 641            }
 642            for attempt in attempts
 643        ]
 644
 645    return result
 646
 647
 648@mcp_tool(
 649    domain="cloud",
 650    read_only=True,
 651    idempotent=True,
 652    open_world=True,
 653    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 654)
 655def list_cloud_sync_jobs(
 656    connection_id: Annotated[
 657        str,
 658        Field(description="The ID of the Airbyte Cloud connection."),
 659    ],
 660    *,
 661    workspace_id: Annotated[
 662        str | None,
 663        Field(
 664            description=WORKSPACE_ID_TIP_TEXT,
 665            default=None,
 666        ),
 667    ],
 668    max_jobs: Annotated[
 669        int,
 670        Field(
 671            description=(
 672                "Maximum number of jobs to return. "
 673                "Defaults to 20 if not specified. "
 674                "Maximum allowed value is 500."
 675            ),
 676            default=20,
 677        ),
 678    ],
 679    from_tail: Annotated[
 680        bool | None,
 681        Field(
 682            description=(
 683                "When True, jobs are ordered newest-first (createdAt DESC). "
 684                "When False, jobs are ordered oldest-first (createdAt ASC). "
 685                "Defaults to True if `jobs_offset` is not specified. "
 686                "Cannot combine `from_tail=True` with `jobs_offset`."
 687            ),
 688            default=None,
 689        ),
 690    ],
 691    jobs_offset: Annotated[
 692        int | None,
 693        Field(
 694            description=(
 695                "Number of jobs to skip from the beginning. "
 696                "Cannot be combined with `from_tail=True`."
 697            ),
 698            default=None,
 699        ),
 700    ],
 701) -> SyncJobListResult:
 702    """List sync jobs for a connection with pagination support.
 703
 704    This tool allows you to retrieve a list of sync jobs for a connection,
 705    with control over ordering and pagination. By default, jobs are returned
 706    newest-first (from_tail=True).
 707    """
 708    # Validate that jobs_offset and from_tail are not both set
 709    if jobs_offset is not None and from_tail is True:
 710        raise PyAirbyteInputError(
 711            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
 712            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
 713        )
 714
 715    # Default to from_tail=True if neither is specified
 716    if from_tail is None and jobs_offset is None:
 717        from_tail = True
 718    elif from_tail is None:
 719        from_tail = False
 720
 721    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 722    connection = workspace.get_connection(connection_id=connection_id)
 723
 724    # Cap at 500 to avoid overloading agent context
 725    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
 726
 727    sync_results = connection.get_previous_sync_logs(
 728        limit=effective_limit,
 729        offset=jobs_offset,
 730        from_tail=from_tail,
 731    )
 732
 733    jobs = [
 734        SyncJobResult(
 735            job_id=sync_result.job_id,
 736            status=str(sync_result.get_job_status()),
 737            bytes_synced=sync_result.bytes_synced,
 738            records_synced=sync_result.records_synced,
 739            start_time=sync_result.start_time.isoformat(),
 740            job_url=sync_result.job_url,
 741        )
 742        for sync_result in sync_results
 743    ]
 744
 745    return SyncJobListResult(
 746        jobs=jobs,
 747        jobs_count=len(jobs),
 748        jobs_offset=jobs_offset or 0,
 749        from_tail=from_tail,
 750    )
 751
 752
 753@mcp_tool(
 754    domain="cloud",
 755    read_only=True,
 756    idempotent=True,
 757    open_world=True,
 758    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 759)
 760def list_deployed_cloud_source_connectors(
 761    *,
 762    workspace_id: Annotated[
 763        str | None,
 764        Field(
 765            description=WORKSPACE_ID_TIP_TEXT,
 766            default=None,
 767        ),
 768    ],
 769    name_contains: Annotated[
 770        str | None,
 771        Field(
 772            description="Optional case-insensitive substring to filter sources by name",
 773            default=None,
 774        ),
 775    ],
 776    max_items_limit: Annotated[
 777        int | None,
 778        Field(
 779            description="Optional maximum number of items to return (default: no limit)",
 780            default=None,
 781        ),
 782    ],
 783) -> list[CloudSourceResult]:
 784    """List all deployed source connectors in the Airbyte Cloud workspace."""
 785    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 786    sources = workspace.list_sources()
 787
 788    # Filter by name if requested
 789    if name_contains:
 790        needle = name_contains.lower()
 791        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 792
 793    # Apply limit if requested
 794    if max_items_limit is not None:
 795        sources = sources[:max_items_limit]
 796
 797    # Note: name and url are guaranteed non-null from list API responses
 798    return [
 799        CloudSourceResult(
 800            id=source.source_id,
 801            name=cast(str, source.name),
 802            url=cast(str, source.connector_url),
 803        )
 804        for source in sources
 805    ]
 806
 807
 808@mcp_tool(
 809    domain="cloud",
 810    read_only=True,
 811    idempotent=True,
 812    open_world=True,
 813    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 814)
 815def list_deployed_cloud_destination_connectors(
 816    *,
 817    workspace_id: Annotated[
 818        str | None,
 819        Field(
 820            description=WORKSPACE_ID_TIP_TEXT,
 821            default=None,
 822        ),
 823    ],
 824    name_contains: Annotated[
 825        str | None,
 826        Field(
 827            description="Optional case-insensitive substring to filter destinations by name",
 828            default=None,
 829        ),
 830    ],
 831    max_items_limit: Annotated[
 832        int | None,
 833        Field(
 834            description="Optional maximum number of items to return (default: no limit)",
 835            default=None,
 836        ),
 837    ],
 838) -> list[CloudDestinationResult]:
 839    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 840    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 841    destinations = workspace.list_destinations()
 842
 843    # Filter by name if requested
 844    if name_contains:
 845        needle = name_contains.lower()
 846        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 847
 848    # Apply limit if requested
 849    if max_items_limit is not None:
 850        destinations = destinations[:max_items_limit]
 851
 852    # Note: name and url are guaranteed non-null from list API responses
 853    return [
 854        CloudDestinationResult(
 855            id=destination.destination_id,
 856            name=cast(str, destination.name),
 857            url=cast(str, destination.connector_url),
 858        )
 859        for destination in destinations
 860    ]
 861
 862
 863@mcp_tool(
 864    domain="cloud",
 865    read_only=True,
 866    idempotent=True,
 867    open_world=True,
 868    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 869)
 870def describe_cloud_source(
 871    source_id: Annotated[
 872        str,
 873        Field(description="The ID of the source to describe."),
 874    ],
 875    *,
 876    workspace_id: Annotated[
 877        str | None,
 878        Field(
 879            description=WORKSPACE_ID_TIP_TEXT,
 880            default=None,
 881        ),
 882    ],
 883) -> CloudSourceDetails:
 884    """Get detailed information about a specific deployed source connector."""
 885    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 886    source = workspace.get_source(source_id=source_id)
 887
 888    # Access name property to ensure _connector_info is populated
 889    source_name = cast(str, source.name)
 890
 891    return CloudSourceDetails(
 892        source_id=source.source_id,
 893        source_name=source_name,
 894        source_url=source.connector_url,
 895        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 896    )
 897
 898
 899@mcp_tool(
 900    domain="cloud",
 901    read_only=True,
 902    idempotent=True,
 903    open_world=True,
 904    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 905)
 906def describe_cloud_destination(
 907    destination_id: Annotated[
 908        str,
 909        Field(description="The ID of the destination to describe."),
 910    ],
 911    *,
 912    workspace_id: Annotated[
 913        str | None,
 914        Field(
 915            description=WORKSPACE_ID_TIP_TEXT,
 916            default=None,
 917        ),
 918    ],
 919) -> CloudDestinationDetails:
 920    """Get detailed information about a specific deployed destination connector."""
 921    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 922    destination = workspace.get_destination(destination_id=destination_id)
 923
 924    # Access name property to ensure _connector_info is populated
 925    destination_name = cast(str, destination.name)
 926
 927    return CloudDestinationDetails(
 928        destination_id=destination.destination_id,
 929        destination_name=destination_name,
 930        destination_url=destination.connector_url,
 931        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 932    )
 933
 934
 935@mcp_tool(
 936    domain="cloud",
 937    read_only=True,
 938    idempotent=True,
 939    open_world=True,
 940    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 941)
 942def describe_cloud_connection(
 943    connection_id: Annotated[
 944        str,
 945        Field(description="The ID of the connection to describe."),
 946    ],
 947    *,
 948    workspace_id: Annotated[
 949        str | None,
 950        Field(
 951            description=WORKSPACE_ID_TIP_TEXT,
 952            default=None,
 953        ),
 954    ],
 955) -> CloudConnectionDetails:
 956    """Get detailed information about a specific deployed connection."""
 957    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
 958    connection = workspace.get_connection(connection_id=connection_id)
 959
 960    return CloudConnectionDetails(
 961        connection_id=connection.connection_id,
 962        connection_name=cast(str, connection.name),
 963        connection_url=cast(str, connection.connection_url),
 964        source_id=connection.source_id,
 965        source_name=cast(str, connection.source.name),
 966        destination_id=connection.destination_id,
 967        destination_name=cast(str, connection.destination.name),
 968        selected_streams=connection.stream_names,
 969        table_prefix=connection.table_prefix,
 970    )
 971
 972
 973@mcp_tool(
 974    domain="cloud",
 975    read_only=True,
 976    idempotent=True,
 977    open_world=True,
 978    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 979)
 980def get_cloud_sync_logs(
 981    connection_id: Annotated[
 982        str,
 983        Field(description="The ID of the Airbyte Cloud connection."),
 984    ],
 985    job_id: Annotated[
 986        int | None,
 987        Field(description="Optional job ID. If not provided, the latest job will be used."),
 988    ] = None,
 989    attempt_number: Annotated[
 990        int | None,
 991        Field(
 992            description="Optional attempt number. If not provided, the latest attempt will be used."
 993        ),
 994    ] = None,
 995    *,
 996    workspace_id: Annotated[
 997        str | None,
 998        Field(
 999            description=WORKSPACE_ID_TIP_TEXT,
1000            default=None,
1001        ),
1002    ],
1003    max_lines: Annotated[
1004        int,
1005        Field(
1006            description=(
1007                "Maximum number of lines to return. "
1008                "Defaults to 4000 if not specified. "
1009                "If '0' is provided, no limit is applied."
1010            ),
1011            default=4000,
1012        ),
1013    ],
1014    from_tail: Annotated[
1015        bool | None,
1016        Field(
1017            description=(
1018                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1019                "Defaults to True if `line_offset` is not specified. "
1020                "Cannot combine `from_tail=True` with `line_offset`."
1021            ),
1022            default=None,
1023        ),
1024    ],
1025    line_offset: Annotated[
1026        int | None,
1027        Field(
1028            description=(
1029                "Number of lines to skip from the beginning of the logs. "
1030                "Cannot be combined with `from_tail=True`."
1031            ),
1032            default=None,
1033        ),
1034    ],
1035) -> LogReadResult:
1036    """Get the logs from a sync job attempt on Airbyte Cloud."""
1037    # Validate that line_offset and from_tail are not both set
1038    if line_offset is not None and from_tail:
1039        raise PyAirbyteInputError(
1040            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1041            context={"line_offset": line_offset, "from_tail": from_tail},
1042        )
1043
1044    if from_tail is None and line_offset is None:
1045        from_tail = True
1046    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1047    connection = workspace.get_connection(connection_id=connection_id)
1048
1049    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1050
1051    if not sync_result:
1052        raise AirbyteMissingResourceError(
1053            resource_type="sync job",
1054            resource_name_or_id=connection_id,
1055        )
1056
1057    attempts = sync_result.get_attempts()
1058
1059    if not attempts:
1060        raise AirbyteMissingResourceError(
1061            resource_type="sync attempt",
1062            resource_name_or_id=str(sync_result.job_id),
1063        )
1064
1065    if attempt_number is not None:
1066        target_attempt = None
1067        for attempt in attempts:
1068            if attempt.attempt_number == attempt_number:
1069                target_attempt = attempt
1070                break
1071
1072        if target_attempt is None:
1073            raise AirbyteMissingResourceError(
1074                resource_type="sync attempt",
1075                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1076            )
1077    else:
1078        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1079
1080    logs = target_attempt.get_full_log_text()
1081
1082    if not logs:
1083        # Return empty result with zero lines
1084        return LogReadResult(
1085            log_text=(
1086                f"[No logs available for job '{sync_result.job_id}', "
1087                f"attempt {target_attempt.attempt_number}.]"
1088            ),
1089            log_text_start_line=1,
1090            log_text_line_count=0,
1091            total_log_lines_available=0,
1092            job_id=sync_result.job_id,
1093            attempt_number=target_attempt.attempt_number,
1094        )
1095
1096    # Apply line limiting
1097    log_lines = logs.splitlines()
1098    total_lines = len(log_lines)
1099
1100    # Determine effective max_lines (0 means no limit)
1101    effective_max = total_lines if max_lines == 0 else max_lines
1102
1103    # Calculate start_index and slice based on from_tail or line_offset
1104    if from_tail:
1105        start_index = max(0, total_lines - effective_max)
1106        selected_lines = log_lines[start_index:][:effective_max]
1107    else:
1108        start_index = line_offset or 0
1109        selected_lines = log_lines[start_index : start_index + effective_max]
1110
1111    return LogReadResult(
1112        log_text="\n".join(selected_lines),
1113        log_text_start_line=start_index + 1,  # Convert to 1-based index
1114        log_text_line_count=len(selected_lines),
1115        total_log_lines_available=total_lines,
1116        job_id=sync_result.job_id,
1117        attempt_number=target_attempt.attempt_number,
1118    )
1119
1120
1121@mcp_tool(
1122    domain="cloud",
1123    read_only=True,
1124    idempotent=True,
1125    open_world=True,
1126    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1127)
1128def list_deployed_cloud_connections(
1129    *,
1130    workspace_id: Annotated[
1131        str | None,
1132        Field(
1133            description=WORKSPACE_ID_TIP_TEXT,
1134            default=None,
1135        ),
1136    ],
1137    name_contains: Annotated[
1138        str | None,
1139        Field(
1140            description="Optional case-insensitive substring to filter connections by name",
1141            default=None,
1142        ),
1143    ],
1144    max_items_limit: Annotated[
1145        int | None,
1146        Field(
1147            description="Optional maximum number of items to return (default: no limit)",
1148            default=None,
1149        ),
1150    ],
1151    with_connection_status: Annotated[
1152        bool | None,
1153        Field(
1154            description="If True, include status info for each connection's most recent sync job",
1155            default=False,
1156        ),
1157    ],
1158    failing_connections_only: Annotated[
1159        bool | None,
1160        Field(
1161            description="If True, only return connections with failed/cancelled last sync",
1162            default=False,
1163        ),
1164    ],
1165) -> list[CloudConnectionResult]:
1166    """List all deployed connections in the Airbyte Cloud workspace.
1167
1168    When with_connection_status is True, each connection result will include
1169    information about the most recent sync job status, skipping over any
1170    currently in-progress syncs to find the last completed job.
1171
1172    When failing_connections_only is True, only connections where the most
1173    recent completed sync job failed or was cancelled will be returned.
1174    This implicitly enables with_connection_status.
1175    """
1176    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1177    connections = workspace.list_connections()
1178
1179    # Filter by name if requested
1180    if name_contains:
1181        needle = name_contains.lower()
1182        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1183
1184    # If failing_connections_only is True, implicitly enable with_connection_status
1185    if failing_connections_only:
1186        with_connection_status = True
1187
1188    results: list[CloudConnectionResult] = []
1189
1190    for connection in connections:
1191        last_job_status: str | None = None
1192        last_job_id: int | None = None
1193        last_job_time: str | None = None
1194        currently_running_job_id: int | None = None
1195        currently_running_job_start_time: str | None = None
1196
1197        if with_connection_status:
1198            sync_logs = connection.get_previous_sync_logs(limit=5)
1199            last_completed_job_status = None  # Keep enum for comparison
1200
1201            for sync_result in sync_logs:
1202                job_status = sync_result.get_job_status()
1203
1204                if not sync_result.is_job_complete():
1205                    currently_running_job_id = sync_result.job_id
1206                    currently_running_job_start_time = sync_result.start_time.isoformat()
1207                    continue
1208
1209                last_completed_job_status = job_status
1210                last_job_status = str(job_status.value) if job_status else None
1211                last_job_id = sync_result.job_id
1212                last_job_time = sync_result.start_time.isoformat()
1213                break
1214
1215            if failing_connections_only and (
1216                last_completed_job_status is None
1217                or last_completed_job_status not in FAILED_STATUSES
1218            ):
1219                continue
1220
1221        results.append(
1222            CloudConnectionResult(
1223                id=connection.connection_id,
1224                name=cast(str, connection.name),
1225                url=cast(str, connection.connection_url),
1226                source_id=connection.source_id,
1227                destination_id=connection.destination_id,
1228                last_job_status=last_job_status,
1229                last_job_id=last_job_id,
1230                last_job_time=last_job_time,
1231                currently_running_job_id=currently_running_job_id,
1232                currently_running_job_start_time=currently_running_job_start_time,
1233            )
1234        )
1235
1236        if max_items_limit is not None and len(results) >= max_items_limit:
1237            break
1238
1239    return results
1240
1241
1242def _resolve_organization(
1243    organization_id: str | None,
1244    organization_name: str | None,
1245    *,
1246    api_root: str,
1247    client_id: SecretString | None,
1248    client_secret: SecretString | None,
1249    bearer_token: SecretString | None = None,
1250) -> api_util.models.OrganizationResponse:
1251    """Resolve organization from either ID or exact name match.
1252
1253    Args:
1254        organization_id: The organization ID (if provided directly)
1255        organization_name: The organization name (exact match required)
1256        api_root: The API root URL
1257        client_id: OAuth client ID (optional if bearer_token is provided)
1258        client_secret: OAuth client secret (optional if bearer_token is provided)
1259        bearer_token: Bearer token for authentication (optional if client credentials provided)
1260
1261    Returns:
1262        The resolved OrganizationResponse object
1263
1264    Raises:
1265        PyAirbyteInputError: If neither or both parameters are provided,
1266            or if no organization matches the exact name
1267        AirbyteMissingResourceError: If the organization is not found
1268    """
1269    if organization_id and organization_name:
1270        raise PyAirbyteInputError(
1271            message="Provide either 'organization_id' or 'organization_name', not both."
1272        )
1273    if not organization_id and not organization_name:
1274        raise PyAirbyteInputError(
1275            message="Either 'organization_id' or 'organization_name' must be provided."
1276        )
1277
1278    # Get all organizations for the user
1279    orgs = api_util.list_organizations_for_user(
1280        api_root=api_root,
1281        client_id=client_id,
1282        client_secret=client_secret,
1283        bearer_token=bearer_token,
1284    )
1285
1286    if organization_id:
1287        # Find by ID
1288        matching_orgs = [org for org in orgs if org.organization_id == organization_id]
1289        if not matching_orgs:
1290            raise AirbyteMissingResourceError(
1291                resource_type="organization",
1292                context={
1293                    "organization_id": organization_id,
1294                    "message": f"No organization found with ID '{organization_id}' "
1295                    "for the current user.",
1296                },
1297            )
1298        return matching_orgs[0]
1299
1300    # Find by exact name match (case-sensitive)
1301    matching_orgs = [org for org in orgs if org.organization_name == organization_name]
1302
1303    if not matching_orgs:
1304        raise AirbyteMissingResourceError(
1305            resource_type="organization",
1306            context={
1307                "organization_name": organization_name,
1308                "message": f"No organization found with exact name '{organization_name}' "
1309                "for the current user.",
1310            },
1311        )
1312
1313    if len(matching_orgs) > 1:
1314        raise PyAirbyteInputError(
1315            message=f"Multiple organizations found with name '{organization_name}'. "
1316            "Please use 'organization_id' instead to specify the exact organization."
1317        )
1318
1319    return matching_orgs[0]
1320
1321
1322def _resolve_organization_id(
1323    organization_id: str | None,
1324    organization_name: str | None,
1325    *,
1326    api_root: str,
1327    client_id: SecretString | None,
1328    client_secret: SecretString | None,
1329    bearer_token: SecretString | None = None,
1330) -> str:
1331    """Resolve organization ID from either ID or exact name match.
1332
1333    This is a convenience wrapper around _resolve_organization that returns just the ID.
1334    """
1335    org = _resolve_organization(
1336        organization_id=organization_id,
1337        organization_name=organization_name,
1338        api_root=api_root,
1339        client_id=client_id,
1340        client_secret=client_secret,
1341        bearer_token=bearer_token,
1342    )
1343    return org.organization_id
1344
1345
1346@mcp_tool(
1347    domain="cloud",
1348    read_only=True,
1349    idempotent=True,
1350    open_world=True,
1351    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1352)
1353def list_cloud_workspaces(
1354    *,
1355    organization_id: Annotated[
1356        str | None,
1357        Field(
1358            description="Organization ID. Required if organization_name is not provided.",
1359            default=None,
1360        ),
1361    ],
1362    organization_name: Annotated[
1363        str | None,
1364        Field(
1365            description=(
1366                "Organization name (exact match). " "Required if organization_id is not provided."
1367            ),
1368            default=None,
1369        ),
1370    ],
1371    name_contains: Annotated[
1372        str | None,
1373        Field(
1374            description="Optional substring to filter workspaces by name (server-side filtering)",
1375            default=None,
1376        ),
1377    ],
1378    max_items_limit: Annotated[
1379        int | None,
1380        Field(
1381            description="Optional maximum number of items to return (default: no limit)",
1382            default=None,
1383        ),
1384    ],
1385) -> list[CloudWorkspaceResult]:
1386    """List all workspaces in a specific organization.
1387
1388    Requires either organization_id OR organization_name (exact match) to be provided.
1389    This tool will NOT list workspaces across all organizations - you must specify
1390    which organization to list workspaces from.
1391    """
1392    credentials = resolve_cloud_credentials()
1393
1394    resolved_org_id = _resolve_organization_id(
1395        organization_id=organization_id,
1396        organization_name=organization_name,
1397        api_root=credentials.api_root,
1398        client_id=credentials.client_id,
1399        client_secret=credentials.client_secret,
1400        bearer_token=credentials.bearer_token,
1401    )
1402
1403    workspaces = api_util.list_workspaces_in_organization(
1404        organization_id=resolved_org_id,
1405        api_root=credentials.api_root,
1406        client_id=credentials.client_id,
1407        client_secret=credentials.client_secret,
1408        bearer_token=credentials.bearer_token,
1409        name_contains=name_contains,
1410        max_items_limit=max_items_limit,
1411    )
1412
1413    return [
1414        CloudWorkspaceResult(
1415            workspace_id=ws.get("workspaceId", ""),
1416            workspace_name=ws.get("name", ""),
1417            organization_id=ws.get("organizationId", ""),
1418        )
1419        for ws in workspaces
1420    ]
1421
1422
1423@mcp_tool(
1424    domain="cloud",
1425    read_only=True,
1426    idempotent=True,
1427    open_world=True,
1428    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1429)
1430def describe_cloud_organization(
1431    *,
1432    organization_id: Annotated[
1433        str | None,
1434        Field(
1435            description="Organization ID. Required if organization_name is not provided.",
1436            default=None,
1437        ),
1438    ],
1439    organization_name: Annotated[
1440        str | None,
1441        Field(
1442            description=(
1443                "Organization name (exact match). " "Required if organization_id is not provided."
1444            ),
1445            default=None,
1446        ),
1447    ],
1448) -> CloudOrganizationResult:
1449    """Get details about a specific organization.
1450
1451    Requires either organization_id OR organization_name (exact match) to be provided.
1452    This tool is useful for looking up an organization's ID from its name, or vice versa.
1453    """
1454    credentials = resolve_cloud_credentials()
1455
1456    org = _resolve_organization(
1457        organization_id=organization_id,
1458        organization_name=organization_name,
1459        api_root=credentials.api_root,
1460        client_id=credentials.client_id,
1461        client_secret=credentials.client_secret,
1462        bearer_token=credentials.bearer_token,
1463    )
1464
1465    return CloudOrganizationResult(
1466        id=org.organization_id,
1467        name=org.organization_name,
1468        email=org.email,
1469    )
1470
1471
1472def _get_custom_source_definition_description(
1473    custom_source: CustomCloudSourceDefinition,
1474) -> str:
1475    return "\n".join(
1476        [
1477            f" - Custom Source Name: {custom_source.name}",
1478            f" - Definition ID: {custom_source.definition_id}",
1479            f" - Definition Version: {custom_source.version}",
1480            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1481            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1482        ]
1483    )
1484
1485
1486@mcp_tool(
1487    domain="cloud",
1488    open_world=True,
1489    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1490)
1491def publish_custom_source_definition(
1492    name: Annotated[
1493        str,
1494        Field(description="The name for the custom connector definition."),
1495    ],
1496    *,
1497    workspace_id: Annotated[
1498        str | None,
1499        Field(
1500            description=WORKSPACE_ID_TIP_TEXT,
1501            default=None,
1502        ),
1503    ],
1504    manifest_yaml: Annotated[
1505        str | Path | None,
1506        Field(
1507            description=(
1508                "The Low-code CDK manifest as a YAML string or file path. "
1509                "Required for YAML connectors."
1510            ),
1511            default=None,
1512        ),
1513    ] = None,
1514    unique: Annotated[
1515        bool,
1516        Field(
1517            description="Whether to require a unique name.",
1518            default=True,
1519        ),
1520    ] = True,
1521    pre_validate: Annotated[
1522        bool,
1523        Field(
1524            description="Whether to validate the manifest client-side before publishing.",
1525            default=True,
1526        ),
1527    ] = True,
1528    testing_values: Annotated[
1529        dict | str | None,
1530        Field(
1531            description=(
1532                "Optional testing configuration values for the Builder UI. "
1533                "Can be provided as a JSON object or JSON string. "
1534                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1535                "If provided, these values replace any existing testing values "
1536                "for the connector builder project, allowing immediate test read operations."
1537            ),
1538            default=None,
1539        ),
1540    ],
1541    testing_values_secret_name: Annotated[
1542        str | None,
1543        Field(
1544            description=(
1545                "Optional name of a secret containing testing configuration values "
1546                "in JSON or YAML format. The secret will be resolved by the MCP "
1547                "server and merged into testing_values, with secret values taking "
1548                "precedence. This lets the agent reference secrets without sending "
1549                "raw values as tool arguments."
1550            ),
1551            default=None,
1552        ),
1553    ],
1554) -> str:
1555    """Publish a custom YAML source connector definition to Airbyte Cloud.
1556
1557    Note: Only YAML (declarative) connectors are currently supported.
1558    Docker-based custom sources are not yet available.
1559    """
1560    processed_manifest = manifest_yaml
1561    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1562        processed_manifest = Path(manifest_yaml)
1563
1564    # Resolve testing values from inline config and/or secret
1565    testing_values_dict: dict[str, Any] | None = None
1566    if testing_values is not None or testing_values_secret_name is not None:
1567        testing_values_dict = (
1568            resolve_config(
1569                config=testing_values,
1570                config_secret_name=testing_values_secret_name,
1571            )
1572            or None
1573        )
1574
1575    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1576    custom_source = workspace.publish_custom_source_definition(
1577        name=name,
1578        manifest_yaml=processed_manifest,
1579        unique=unique,
1580        pre_validate=pre_validate,
1581        testing_values=testing_values_dict,
1582    )
1583    register_guid_created_in_session(custom_source.definition_id)
1584    return (
1585        "Successfully published custom YAML source definition:\n"
1586        + _get_custom_source_definition_description(
1587            custom_source=custom_source,
1588        )
1589        + "\n"
1590    )
1591
1592
1593@mcp_tool(
1594    domain="cloud",
1595    read_only=True,
1596    idempotent=True,
1597    open_world=True,
1598)
1599def list_custom_source_definitions(
1600    *,
1601    workspace_id: Annotated[
1602        str | None,
1603        Field(
1604            description=WORKSPACE_ID_TIP_TEXT,
1605            default=None,
1606        ),
1607    ],
1608) -> list[dict[str, Any]]:
1609    """List custom YAML source definitions in the Airbyte Cloud workspace.
1610
1611    Note: Only YAML (declarative) connectors are currently supported.
1612    Docker-based custom sources are not yet available.
1613    """
1614    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1615    definitions = workspace.list_custom_source_definitions(
1616        definition_type="yaml",
1617    )
1618
1619    return [
1620        {
1621            "definition_id": d.definition_id,
1622            "name": d.name,
1623            "version": d.version,
1624            "connector_builder_project_url": d.connector_builder_project_url,
1625        }
1626        for d in definitions
1627    ]
1628
1629
1630@mcp_tool(
1631    domain="cloud",
1632    destructive=True,
1633    open_world=True,
1634)
1635def update_custom_source_definition(
1636    definition_id: Annotated[
1637        str,
1638        Field(description="The ID of the definition to update."),
1639    ],
1640    manifest_yaml: Annotated[
1641        str | Path | None,
1642        Field(
1643            description=(
1644                "New manifest as YAML string or file path. "
1645                "Optional; omit to update only testing values."
1646            ),
1647            default=None,
1648        ),
1649    ] = None,
1650    *,
1651    workspace_id: Annotated[
1652        str | None,
1653        Field(
1654            description=WORKSPACE_ID_TIP_TEXT,
1655            default=None,
1656        ),
1657    ],
1658    pre_validate: Annotated[
1659        bool,
1660        Field(
1661            description="Whether to validate the manifest client-side before updating.",
1662            default=True,
1663        ),
1664    ] = True,
1665    testing_values: Annotated[
1666        dict | str | None,
1667        Field(
1668            description=(
1669                "Optional testing configuration values for the Builder UI. "
1670                "Can be provided as a JSON object or JSON string. "
1671                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1672                "If provided, these values replace any existing testing values "
1673                "for the connector builder project. The entire testing values object "
1674                "is overwritten, so pass the full set of values you want to persist."
1675            ),
1676            default=None,
1677        ),
1678    ],
1679    testing_values_secret_name: Annotated[
1680        str | None,
1681        Field(
1682            description=(
1683                "Optional name of a secret containing testing configuration values "
1684                "in JSON or YAML format. The secret will be resolved by the MCP "
1685                "server and merged into testing_values, with secret values taking "
1686                "precedence. This lets the agent reference secrets without sending "
1687                "raw values as tool arguments."
1688            ),
1689            default=None,
1690        ),
1691    ],
1692) -> str:
1693    """Update a custom YAML source definition in Airbyte Cloud.
1694
1695    Updates the manifest and/or testing values for an existing custom source definition.
1696    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1697    """
1698    check_guid_created_in_session(definition_id)
1699
1700    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1701
1702    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1703        raise PyAirbyteInputError(
1704            message=(
1705                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1706                "must be provided to update a custom source definition."
1707            ),
1708            context={
1709                "definition_id": definition_id,
1710                "workspace_id": workspace.workspace_id,
1711            },
1712        )
1713
1714    processed_manifest: str | Path | None = manifest_yaml
1715    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1716        processed_manifest = Path(manifest_yaml)
1717
1718    # Resolve testing values from inline config and/or secret
1719    testing_values_dict: dict[str, Any] | None = None
1720    if testing_values is not None or testing_values_secret_name is not None:
1721        testing_values_dict = (
1722            resolve_config(
1723                config=testing_values,
1724                config_secret_name=testing_values_secret_name,
1725            )
1726            or None
1727        )
1728
1729    definition = workspace.get_custom_source_definition(
1730        definition_id=definition_id,
1731        definition_type="yaml",
1732    )
1733    custom_source: CustomCloudSourceDefinition = definition
1734
1735    if processed_manifest is not None:
1736        custom_source = definition.update_definition(
1737            manifest_yaml=processed_manifest,
1738            pre_validate=pre_validate,
1739        )
1740
1741    if testing_values_dict is not None:
1742        custom_source.set_testing_values(testing_values_dict)
1743
1744    return (
1745        "Successfully updated custom YAML source definition:\n"
1746        + _get_custom_source_definition_description(
1747            custom_source=custom_source,
1748        )
1749    )
1750
1751
1752@mcp_tool(
1753    domain="cloud",
1754    destructive=True,
1755    open_world=True,
1756)
1757def permanently_delete_custom_source_definition(
1758    definition_id: Annotated[
1759        str,
1760        Field(description="The ID of the custom source definition to delete."),
1761    ],
1762    name: Annotated[
1763        str,
1764        Field(description="The expected name of the custom source definition (for verification)."),
1765    ],
1766    *,
1767    workspace_id: Annotated[
1768        str | None,
1769        Field(
1770            description=WORKSPACE_ID_TIP_TEXT,
1771            default=None,
1772        ),
1773    ],
1774) -> str:
1775    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1776
1777    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1778    (case insensitive).
1779
1780    If the connector does not meet this requirement, the deletion will be rejected with a
1781    helpful error message. Instruct the user to rename the connector appropriately to authorize
1782    the deletion.
1783
1784    The provided name must match the actual name of the definition for the operation to proceed.
1785    This is a safety measure to ensure you are deleting the correct resource.
1786
1787    Note: Only YAML (declarative) connectors are currently supported.
1788    Docker-based custom sources are not yet available.
1789    """
1790    check_guid_created_in_session(definition_id)
1791    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1792    definition = workspace.get_custom_source_definition(
1793        definition_id=definition_id,
1794        definition_type="yaml",
1795    )
1796    actual_name: str = definition.name
1797
1798    # Verify the name matches
1799    if actual_name != name:
1800        raise PyAirbyteInputError(
1801            message=(
1802                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1803                "The provided name must exactly match the definition's actual name. "
1804                "This is a safety measure to prevent accidental deletion."
1805            ),
1806            context={
1807                "definition_id": definition_id,
1808                "expected_name": name,
1809                "actual_name": actual_name,
1810            },
1811        )
1812
1813    definition.permanently_delete(
1814        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1815    )
1816    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
1817
1818
1819@mcp_tool(
1820    domain="cloud",
1821    destructive=True,
1822    open_world=True,
1823    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1824)
1825def permanently_delete_cloud_source(
1826    source_id: Annotated[
1827        str,
1828        Field(description="The ID of the deployed source to delete."),
1829    ],
1830    name: Annotated[
1831        str,
1832        Field(description="The expected name of the source (for verification)."),
1833    ],
1834) -> str:
1835    """Permanently delete a deployed source connector from Airbyte Cloud.
1836
1837    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1838    (case insensitive).
1839
1840    If the source does not meet this requirement, the deletion will be rejected with a
1841    helpful error message. Instruct the user to rename the source appropriately to authorize
1842    the deletion.
1843
1844    The provided name must match the actual name of the source for the operation to proceed.
1845    This is a safety measure to ensure you are deleting the correct resource.
1846    """
1847    check_guid_created_in_session(source_id)
1848    workspace: CloudWorkspace = _get_cloud_workspace()
1849    source = workspace.get_source(source_id=source_id)
1850    actual_name: str = cast(str, source.name)
1851
1852    # Verify the name matches
1853    if actual_name != name:
1854        raise PyAirbyteInputError(
1855            message=(
1856                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1857                "The provided name must exactly match the source's actual name. "
1858                "This is a safety measure to prevent accidental deletion."
1859            ),
1860            context={
1861                "source_id": source_id,
1862                "expected_name": name,
1863                "actual_name": actual_name,
1864            },
1865        )
1866
1867    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1868    workspace.permanently_delete_source(
1869        source=source_id,
1870        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1871    )
1872    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
1873
1874
1875@mcp_tool(
1876    domain="cloud",
1877    destructive=True,
1878    open_world=True,
1879    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1880)
1881def permanently_delete_cloud_destination(
1882    destination_id: Annotated[
1883        str,
1884        Field(description="The ID of the deployed destination to delete."),
1885    ],
1886    name: Annotated[
1887        str,
1888        Field(description="The expected name of the destination (for verification)."),
1889    ],
1890) -> str:
1891    """Permanently delete a deployed destination connector from Airbyte Cloud.
1892
1893    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1894    (case insensitive).
1895
1896    If the destination does not meet this requirement, the deletion will be rejected with a
1897    helpful error message. Instruct the user to rename the destination appropriately to authorize
1898    the deletion.
1899
1900    The provided name must match the actual name of the destination for the operation to proceed.
1901    This is a safety measure to ensure you are deleting the correct resource.
1902    """
1903    check_guid_created_in_session(destination_id)
1904    workspace: CloudWorkspace = _get_cloud_workspace()
1905    destination = workspace.get_destination(destination_id=destination_id)
1906    actual_name: str = cast(str, destination.name)
1907
1908    # Verify the name matches
1909    if actual_name != name:
1910        raise PyAirbyteInputError(
1911            message=(
1912                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1913                "The provided name must exactly match the destination's actual name. "
1914                "This is a safety measure to prevent accidental deletion."
1915            ),
1916            context={
1917                "destination_id": destination_id,
1918                "expected_name": name,
1919                "actual_name": actual_name,
1920            },
1921        )
1922
1923    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1924    workspace.permanently_delete_destination(
1925        destination=destination_id,
1926        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1927    )
1928    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
1929
1930
1931@mcp_tool(
1932    domain="cloud",
1933    destructive=True,
1934    open_world=True,
1935    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1936)
1937def permanently_delete_cloud_connection(
1938    connection_id: Annotated[
1939        str,
1940        Field(description="The ID of the connection to delete."),
1941    ],
1942    name: Annotated[
1943        str,
1944        Field(description="The expected name of the connection (for verification)."),
1945    ],
1946    *,
1947    cascade_delete_source: Annotated[
1948        bool,
1949        Field(
1950            description=(
1951                "Whether to also delete the source connector associated with this connection."
1952            ),
1953            default=False,
1954        ),
1955    ] = False,
1956    cascade_delete_destination: Annotated[
1957        bool,
1958        Field(
1959            description=(
1960                "Whether to also delete the destination connector associated with this connection."
1961            ),
1962            default=False,
1963        ),
1964    ] = False,
1965) -> str:
1966    """Permanently delete a connection from Airbyte Cloud.
1967
1968    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
1969    (case insensitive).
1970
1971    If the connection does not meet this requirement, the deletion will be rejected with a
1972    helpful error message. Instruct the user to rename the connection appropriately to authorize
1973    the deletion.
1974
1975    The provided name must match the actual name of the connection for the operation to proceed.
1976    This is a safety measure to ensure you are deleting the correct resource.
1977    """
1978    check_guid_created_in_session(connection_id)
1979    workspace: CloudWorkspace = _get_cloud_workspace()
1980    connection = workspace.get_connection(connection_id=connection_id)
1981    actual_name: str = cast(str, connection.name)
1982
1983    # Verify the name matches
1984    if actual_name != name:
1985        raise PyAirbyteInputError(
1986            message=(
1987                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1988                "The provided name must exactly match the connection's actual name. "
1989                "This is a safety measure to prevent accidental deletion."
1990            ),
1991            context={
1992                "connection_id": connection_id,
1993                "expected_name": name,
1994                "actual_name": actual_name,
1995            },
1996        )
1997
1998    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1999    workspace.permanently_delete_connection(
2000        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2001        connection=connection_id,
2002        cascade_delete_source=cascade_delete_source,
2003        cascade_delete_destination=cascade_delete_destination,
2004    )
2005    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
2006
2007
2008@mcp_tool(
2009    domain="cloud",
2010    open_world=True,
2011    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2012)
2013def rename_cloud_source(
2014    source_id: Annotated[
2015        str,
2016        Field(description="The ID of the deployed source to rename."),
2017    ],
2018    name: Annotated[
2019        str,
2020        Field(description="New name for the source."),
2021    ],
2022    *,
2023    workspace_id: Annotated[
2024        str | None,
2025        Field(
2026            description=WORKSPACE_ID_TIP_TEXT,
2027            default=None,
2028        ),
2029    ],
2030) -> str:
2031    """Rename a deployed source connector on Airbyte Cloud."""
2032    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2033    source = workspace.get_source(source_id=source_id)
2034    source.rename(name=name)
2035    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
2036
2037
2038@mcp_tool(
2039    domain="cloud",
2040    destructive=True,
2041    open_world=True,
2042    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2043)
2044def update_cloud_source_config(
2045    source_id: Annotated[
2046        str,
2047        Field(description="The ID of the deployed source to update."),
2048    ],
2049    config: Annotated[
2050        dict | str,
2051        Field(
2052            description="New configuration for the source connector.",
2053        ),
2054    ],
2055    config_secret_name: Annotated[
2056        str | None,
2057        Field(
2058            description="The name of the secret containing the configuration.",
2059            default=None,
2060        ),
2061    ] = None,
2062    *,
2063    workspace_id: Annotated[
2064        str | None,
2065        Field(
2066            description=WORKSPACE_ID_TIP_TEXT,
2067            default=None,
2068        ),
2069    ],
2070) -> str:
2071    """Update a deployed source connector's configuration on Airbyte Cloud.
2072
2073    This is a destructive operation that can break existing connections if the
2074    configuration is changed incorrectly. Use with caution.
2075    """
2076    check_guid_created_in_session(source_id)
2077    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2078    source = workspace.get_source(source_id=source_id)
2079
2080    config_dict = resolve_config(
2081        config=config,
2082        config_secret_name=config_secret_name,
2083        config_spec_jsonschema=None,  # We don't have the spec here
2084    )
2085
2086    source.update_config(config=config_dict)
2087    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
2088
2089
2090@mcp_tool(
2091    domain="cloud",
2092    open_world=True,
2093    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2094)
2095def rename_cloud_destination(
2096    destination_id: Annotated[
2097        str,
2098        Field(description="The ID of the deployed destination to rename."),
2099    ],
2100    name: Annotated[
2101        str,
2102        Field(description="New name for the destination."),
2103    ],
2104    *,
2105    workspace_id: Annotated[
2106        str | None,
2107        Field(
2108            description=WORKSPACE_ID_TIP_TEXT,
2109            default=None,
2110        ),
2111    ],
2112) -> str:
2113    """Rename a deployed destination connector on Airbyte Cloud."""
2114    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2115    destination = workspace.get_destination(destination_id=destination_id)
2116    destination.rename(name=name)
2117    return (
2118        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2119        f"URL: {destination.connector_url}"
2120    )
2121
2122
2123@mcp_tool(
2124    domain="cloud",
2125    destructive=True,
2126    open_world=True,
2127    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2128)
2129def update_cloud_destination_config(
2130    destination_id: Annotated[
2131        str,
2132        Field(description="The ID of the deployed destination to update."),
2133    ],
2134    config: Annotated[
2135        dict | str,
2136        Field(
2137            description="New configuration for the destination connector.",
2138        ),
2139    ],
2140    config_secret_name: Annotated[
2141        str | None,
2142        Field(
2143            description="The name of the secret containing the configuration.",
2144            default=None,
2145        ),
2146    ],
2147    *,
2148    workspace_id: Annotated[
2149        str | None,
2150        Field(
2151            description=WORKSPACE_ID_TIP_TEXT,
2152            default=None,
2153        ),
2154    ],
2155) -> str:
2156    """Update a deployed destination connector's configuration on Airbyte Cloud.
2157
2158    This is a destructive operation that can break existing connections if the
2159    configuration is changed incorrectly. Use with caution.
2160    """
2161    check_guid_created_in_session(destination_id)
2162    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2163    destination = workspace.get_destination(destination_id=destination_id)
2164
2165    config_dict = resolve_config(
2166        config=config,
2167        config_secret_name=config_secret_name,
2168        config_spec_jsonschema=None,  # We don't have the spec here
2169    )
2170
2171    destination.update_config(config=config_dict)
2172    return (
2173        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2174    )
2175
2176
2177@mcp_tool(
2178    domain="cloud",
2179    open_world=True,
2180    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2181)
2182def rename_cloud_connection(
2183    connection_id: Annotated[
2184        str,
2185        Field(description="The ID of the connection to rename."),
2186    ],
2187    name: Annotated[
2188        str,
2189        Field(description="New name for the connection."),
2190    ],
2191    *,
2192    workspace_id: Annotated[
2193        str | None,
2194        Field(
2195            description=WORKSPACE_ID_TIP_TEXT,
2196            default=None,
2197        ),
2198    ],
2199) -> str:
2200    """Rename a connection on Airbyte Cloud."""
2201    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2202    connection = workspace.get_connection(connection_id=connection_id)
2203    connection.rename(name=name)
2204    return (
2205        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2206        f"URL: {connection.connection_url}"
2207    )
2208
2209
2210@mcp_tool(
2211    domain="cloud",
2212    destructive=True,
2213    open_world=True,
2214    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2215)
2216def set_cloud_connection_table_prefix(
2217    connection_id: Annotated[
2218        str,
2219        Field(description="The ID of the connection to update."),
2220    ],
2221    prefix: Annotated[
2222        str,
2223        Field(description="New table prefix to use when syncing to the destination."),
2224    ],
2225    *,
2226    workspace_id: Annotated[
2227        str | None,
2228        Field(
2229            description=WORKSPACE_ID_TIP_TEXT,
2230            default=None,
2231        ),
2232    ],
2233) -> str:
2234    """Set the table prefix for a connection on Airbyte Cloud.
2235
2236    This is a destructive operation that can break downstream dependencies if the
2237    table prefix is changed incorrectly. Use with caution.
2238    """
2239    check_guid_created_in_session(connection_id)
2240    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2241    connection = workspace.get_connection(connection_id=connection_id)
2242    connection.set_table_prefix(prefix=prefix)
2243    return (
2244        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2245        f"URL: {connection.connection_url}"
2246    )
2247
2248
2249@mcp_tool(
2250    domain="cloud",
2251    destructive=True,
2252    open_world=True,
2253    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2254)
2255def set_cloud_connection_selected_streams(
2256    connection_id: Annotated[
2257        str,
2258        Field(description="The ID of the connection to update."),
2259    ],
2260    stream_names: Annotated[
2261        str | list[str],
2262        Field(
2263            description=(
2264                "The selected stream names to sync within the connection. "
2265                "Must be an explicit stream name or list of streams."
2266            )
2267        ),
2268    ],
2269    *,
2270    workspace_id: Annotated[
2271        str | None,
2272        Field(
2273            description=WORKSPACE_ID_TIP_TEXT,
2274            default=None,
2275        ),
2276    ],
2277) -> str:
2278    """Set the selected streams for a connection on Airbyte Cloud.
2279
2280    This is a destructive operation that can break existing connections if the
2281    stream selection is changed incorrectly. Use with caution.
2282    """
2283    check_guid_created_in_session(connection_id)
2284    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2285    connection = workspace.get_connection(connection_id=connection_id)
2286
2287    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2288    connection.set_selected_streams(stream_names=resolved_streams_list)
2289
2290    return (
2291        f"Successfully set selected streams for connection '{connection_id}' "
2292        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2293    )
2294
2295
2296@mcp_tool(
2297    domain="cloud",
2298    read_only=True,
2299    idempotent=True,
2300    open_world=True,
2301    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2302)
2303def get_connection_artifact(
2304    connection_id: Annotated[
2305        str,
2306        Field(description="The ID of the Airbyte Cloud connection."),
2307    ],
2308    artifact_type: Annotated[
2309        Literal["state", "catalog"],
2310        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2311    ],
2312    *,
2313    workspace_id: Annotated[
2314        str | None,
2315        Field(
2316            description=WORKSPACE_ID_TIP_TEXT,
2317            default=None,
2318        ),
2319    ],
2320) -> dict[str, Any] | list[dict[str, Any]]:
2321    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2322
2323    Retrieves the specified artifact for a connection:
2324    - 'state': Returns the persisted state for incremental syncs as a list of
2325      stream state objects, or {"ERROR": "..."} if no state is set.
2326    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2327      or {"ERROR": "..."} if not found.
2328    """
2329    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2330    connection = workspace.get_connection(connection_id=connection_id)
2331
2332    if artifact_type == "state":
2333        result = connection.get_state_artifacts()
2334        if result is None:
2335            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2336        return result
2337
2338    # artifact_type == "catalog"
2339    result = connection.get_catalog_artifact()
2340    if result is None:
2341        return {"ERROR": "No catalog found for this connection"}
2342    return result
2343
2344
2345def register_cloud_ops_tools(app: FastMCP) -> None:
2346    """@private Register tools with the FastMCP app.
2347
2348    This is an internal function and should not be called directly.
2349
2350    Tools are filtered based on mode settings:
2351    - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered
2352    - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive
2353      operations are protected by runtime session checks
2354    """
2355    register_tools(app, domain="cloud")
CLOUD_AUTH_TIP_TEXT = 'By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables will be used to authenticate with the Airbyte Cloud API.'
WORKSPACE_ID_TIP_TEXT = 'Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.'
class CloudSourceResult(pydantic.main.BaseModel):
41class CloudSourceResult(BaseModel):
42    """Information about a deployed source connector in Airbyte Cloud."""
43
44    id: str
45    """The source ID."""
46    name: str
47    """Display name of the source."""
48    url: str
49    """Web URL for managing this source in Airbyte Cloud."""

Information about a deployed source connector in Airbyte Cloud.

id: str = PydanticUndefined

The source ID.

name: str = PydanticUndefined

Display name of the source.

url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

class CloudDestinationResult(pydantic.main.BaseModel):
52class CloudDestinationResult(BaseModel):
53    """Information about a deployed destination connector in Airbyte Cloud."""
54
55    id: str
56    """The destination ID."""
57    name: str
58    """Display name of the destination."""
59    url: str
60    """Web URL for managing this destination in Airbyte Cloud."""

Information about a deployed destination connector in Airbyte Cloud.

id: str = PydanticUndefined

The destination ID.

name: str = PydanticUndefined

Display name of the destination.

url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

class CloudConnectionResult(pydantic.main.BaseModel):
63class CloudConnectionResult(BaseModel):
64    """Information about a deployed connection in Airbyte Cloud."""
65
66    id: str
67    """The connection ID."""
68    name: str
69    """Display name of the connection."""
70    url: str
71    """Web URL for managing this connection in Airbyte Cloud."""
72    source_id: str
73    """ID of the source used by this connection."""
74    destination_id: str
75    """ID of the destination used by this connection."""
76    last_job_status: str | None = None
77    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
78    Only populated when with_connection_status=True."""
79    last_job_id: int | None = None
80    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
81    last_job_time: str | None = None
82    """ISO 8601 timestamp of the most recent completed sync.
83    Only populated when with_connection_status=True."""
84    currently_running_job_id: int | None = None
85    """Job ID of a currently running sync, if any.
86    Only populated when with_connection_status=True."""
87    currently_running_job_start_time: str | None = None
88    """ISO 8601 timestamp of when the currently running sync started.
89    Only populated when with_connection_status=True."""

Information about a deployed connection in Airbyte Cloud.

id: str = PydanticUndefined

The connection ID.

name: str = PydanticUndefined

Display name of the connection.

url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

last_job_status: str | None = None

Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.

last_job_id: int | None = None

Job ID of the most recent completed sync. Only populated when with_connection_status=True.

last_job_time: str | None = None

ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.

currently_running_job_id: int | None = None

Job ID of a currently running sync, if any. Only populated when with_connection_status=True.

currently_running_job_start_time: str | None = None

ISO 8601 timestamp of when the currently running sync started. Only populated when with_connection_status=True.

class CloudSourceDetails(pydantic.main.BaseModel):
 92class CloudSourceDetails(BaseModel):
 93    """Detailed information about a deployed source connector in Airbyte Cloud."""
 94
 95    source_id: str
 96    """The source ID."""
 97    source_name: str
 98    """Display name of the source."""
 99    source_url: str
100    """Web URL for managing this source in Airbyte Cloud."""
101    connector_definition_id: str
102    """The connector definition ID (e.g., the ID for 'source-postgres')."""

Detailed information about a deployed source connector in Airbyte Cloud.

source_id: str = PydanticUndefined

The source ID.

source_name: str = PydanticUndefined

Display name of the source.

source_url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'source-postgres').

class CloudDestinationDetails(pydantic.main.BaseModel):
105class CloudDestinationDetails(BaseModel):
106    """Detailed information about a deployed destination connector in Airbyte Cloud."""
107
108    destination_id: str
109    """The destination ID."""
110    destination_name: str
111    """Display name of the destination."""
112    destination_url: str
113    """Web URL for managing this destination in Airbyte Cloud."""
114    connector_definition_id: str
115    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""

Detailed information about a deployed destination connector in Airbyte Cloud.

destination_id: str = PydanticUndefined

The destination ID.

destination_name: str = PydanticUndefined

Display name of the destination.

destination_url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'destination-snowflake').

class CloudConnectionDetails(pydantic.main.BaseModel):
118class CloudConnectionDetails(BaseModel):
119    """Detailed information about a deployed connection in Airbyte Cloud."""
120
121    connection_id: str
122    """The connection ID."""
123    connection_name: str
124    """Display name of the connection."""
125    connection_url: str
126    """Web URL for managing this connection in Airbyte Cloud."""
127    source_id: str
128    """ID of the source used by this connection."""
129    source_name: str
130    """Display name of the source."""
131    destination_id: str
132    """ID of the destination used by this connection."""
133    destination_name: str
134    """Display name of the destination."""
135    selected_streams: list[str]
136    """List of stream names selected for syncing."""
137    table_prefix: str | None
138    """Table prefix applied when syncing to the destination."""

Detailed information about a deployed connection in Airbyte Cloud.

connection_id: str = PydanticUndefined

The connection ID.

connection_name: str = PydanticUndefined

Display name of the connection.

connection_url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

source_name: str = PydanticUndefined

Display name of the source.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

destination_name: str = PydanticUndefined

Display name of the destination.

selected_streams: list[str] = PydanticUndefined

List of stream names selected for syncing.

table_prefix: str | None = PydanticUndefined

Table prefix applied when syncing to the destination.

class CloudOrganizationResult(pydantic.main.BaseModel):
141class CloudOrganizationResult(BaseModel):
142    """Information about an organization in Airbyte Cloud."""
143
144    id: str
145    """The organization ID."""
146    name: str
147    """Display name of the organization."""
148    email: str
149    """Email associated with the organization."""

Information about an organization in Airbyte Cloud.

id: str = PydanticUndefined

The organization ID.

name: str = PydanticUndefined

Display name of the organization.

email: str = PydanticUndefined

Email associated with the organization.

class CloudWorkspaceResult(pydantic.main.BaseModel):
152class CloudWorkspaceResult(BaseModel):
153    """Information about a workspace in Airbyte Cloud."""
154
155    workspace_id: str
156    """The workspace ID."""
157    workspace_name: str
158    """Display name of the workspace."""
159    workspace_url: str | None = None
160    """URL to access the workspace in Airbyte Cloud."""
161    organization_id: str
162    """ID of the organization (requires ORGANIZATION_READER permission)."""
163    organization_name: str | None = None
164    """Name of the organization (requires ORGANIZATION_READER permission)."""

Information about a workspace in Airbyte Cloud.

workspace_id: str = PydanticUndefined

The workspace ID.

workspace_name: str = PydanticUndefined

Display name of the workspace.

workspace_url: str | None = None

URL to access the workspace in Airbyte Cloud.

organization_id: str = PydanticUndefined

ID of the organization (requires ORGANIZATION_READER permission).

organization_name: str | None = None

Name of the organization (requires ORGANIZATION_READER permission).

class LogReadResult(pydantic.main.BaseModel):
167class LogReadResult(BaseModel):
168    """Result of reading sync logs with pagination support."""
169
170    job_id: int
171    """The job ID the logs belong to."""
172    attempt_number: int
173    """The attempt number the logs belong to."""
174    log_text: str
175    """The string containing the log text we are returning."""
176    log_text_start_line: int
177    """1-based line index of the first line returned."""
178    log_text_line_count: int
179    """Count of lines we are returning."""
180    total_log_lines_available: int
181    """Total number of log lines available, shows if any lines were missed due to the limit."""

Result of reading sync logs with pagination support.

job_id: int = PydanticUndefined

The job ID the logs belong to.

attempt_number: int = PydanticUndefined

The attempt number the logs belong to.

log_text: str = PydanticUndefined

The string containing the log text we are returning.

log_text_start_line: int = PydanticUndefined

1-based line index of the first line returned.

log_text_line_count: int = PydanticUndefined

Count of lines we are returning.

total_log_lines_available: int = PydanticUndefined

Total number of log lines available, shows if any lines were missed due to the limit.

class SyncJobResult(pydantic.main.BaseModel):
184class SyncJobResult(BaseModel):
185    """Information about a sync job."""
186
187    job_id: int
188    """The job ID."""
189    status: str
190    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
191    bytes_synced: int
192    """Number of bytes synced in this job."""
193    records_synced: int
194    """Number of records synced in this job."""
195    start_time: str
196    """ISO 8601 timestamp of when the job started."""
197    job_url: str
198    """URL to view the job in Airbyte Cloud."""

Information about a sync job.

job_id: int = PydanticUndefined

The job ID.

status: str = PydanticUndefined

The job status (e.g., 'succeeded', 'failed', 'running', 'pending').

bytes_synced: int = PydanticUndefined

Number of bytes synced in this job.

records_synced: int = PydanticUndefined

Number of records synced in this job.

start_time: str = PydanticUndefined

ISO 8601 timestamp of when the job started.

job_url: str = PydanticUndefined

URL to view the job in Airbyte Cloud.

class SyncJobListResult(pydantic.main.BaseModel):
201class SyncJobListResult(BaseModel):
202    """Result of listing sync jobs with pagination support."""
203
204    jobs: list[SyncJobResult]
205    """List of sync jobs."""
206    jobs_count: int
207    """Number of jobs returned in this response."""
208    jobs_offset: int
209    """Offset used for this request (0 if not specified)."""
210    from_tail: bool
211    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""

Result of listing sync jobs with pagination support.

jobs: list[SyncJobResult] = PydanticUndefined

List of sync jobs.

jobs_count: int = PydanticUndefined

Number of jobs returned in this response.

jobs_offset: int = PydanticUndefined

Offset used for this request (0 if not specified).

from_tail: bool = PydanticUndefined

Whether jobs are ordered newest-first (True) or oldest-first (False).

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_source_to_cloud( source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
237@mcp_tool(
238    domain="cloud",
239    open_world=True,
240    extra_help_text=CLOUD_AUTH_TIP_TEXT,
241)
242def deploy_source_to_cloud(
243    source_name: Annotated[
244        str,
245        Field(description="The name to use when deploying the source."),
246    ],
247    source_connector_name: Annotated[
248        str,
249        Field(description="The name of the source connector (e.g., 'source-faker')."),
250    ],
251    *,
252    workspace_id: Annotated[
253        str | None,
254        Field(
255            description=WORKSPACE_ID_TIP_TEXT,
256            default=None,
257        ),
258    ],
259    config: Annotated[
260        dict | str | None,
261        Field(
262            description="The configuration for the source connector.",
263            default=None,
264        ),
265    ],
266    config_secret_name: Annotated[
267        str | None,
268        Field(
269            description="The name of the secret containing the configuration.",
270            default=None,
271        ),
272    ],
273    unique: Annotated[
274        bool,
275        Field(
276            description="Whether to require a unique name.",
277            default=True,
278        ),
279    ],
280) -> str:
281    """Deploy a source connector to Airbyte Cloud."""
282    source = get_source(
283        source_connector_name,
284        no_executor=True,
285    )
286    config_dict = resolve_config(
287        config=config,
288        config_secret_name=config_secret_name,
289        config_spec_jsonschema=source.config_spec,
290    )
291    source.set_config(config_dict, validate=True)
292
293    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
294    deployed_source = workspace.deploy_source(
295        name=source_name,
296        source=source,
297        unique=unique,
298    )
299
300    register_guid_created_in_session(deployed_source.connector_id)
301    return (
302        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
303        f" and URL: {deployed_source.connector_url}"
304    )

Deploy a source connector to Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_destination_to_cloud( destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
307@mcp_tool(
308    domain="cloud",
309    open_world=True,
310    extra_help_text=CLOUD_AUTH_TIP_TEXT,
311)
312def deploy_destination_to_cloud(
313    destination_name: Annotated[
314        str,
315        Field(description="The name to use when deploying the destination."),
316    ],
317    destination_connector_name: Annotated[
318        str,
319        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
320    ],
321    *,
322    workspace_id: Annotated[
323        str | None,
324        Field(
325            description=WORKSPACE_ID_TIP_TEXT,
326            default=None,
327        ),
328    ],
329    config: Annotated[
330        dict | str | None,
331        Field(
332            description="The configuration for the destination connector.",
333            default=None,
334        ),
335    ],
336    config_secret_name: Annotated[
337        str | None,
338        Field(
339            description="The name of the secret containing the configuration.",
340            default=None,
341        ),
342    ],
343    unique: Annotated[
344        bool,
345        Field(
346            description="Whether to require a unique name.",
347            default=True,
348        ),
349    ],
350) -> str:
351    """Deploy a destination connector to Airbyte Cloud."""
352    destination = get_destination(
353        destination_connector_name,
354        no_executor=True,
355    )
356    config_dict = resolve_config(
357        config=config,
358        config_secret_name=config_secret_name,
359        config_spec_jsonschema=destination.config_spec,
360    )
361    destination.set_config(config_dict, validate=True)
362
363    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
364    deployed_destination = workspace.deploy_destination(
365        name=destination_name,
366        destination=destination,
367        unique=unique,
368    )
369
370    register_guid_created_in_session(deployed_destination.connector_id)
371    return (
372        f"Successfully deployed destination '{destination_name}' "
373        f"with ID: {deployed_destination.connector_id}"
374    )

Deploy a destination connector to Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def create_connection_on_cloud( connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
377@mcp_tool(
378    domain="cloud",
379    open_world=True,
380    extra_help_text=CLOUD_AUTH_TIP_TEXT,
381)
382def create_connection_on_cloud(
383    connection_name: Annotated[
384        str,
385        Field(description="The name of the connection."),
386    ],
387    source_id: Annotated[
388        str,
389        Field(description="The ID of the deployed source."),
390    ],
391    destination_id: Annotated[
392        str,
393        Field(description="The ID of the deployed destination."),
394    ],
395    selected_streams: Annotated[
396        str | list[str],
397        Field(
398            description=(
399                "The selected stream names to sync within the connection. "
400                "Must be an explicit stream name or list of streams. "
401                "Cannot be empty or '*'."
402            )
403        ),
404    ],
405    *,
406    workspace_id: Annotated[
407        str | None,
408        Field(
409            description=WORKSPACE_ID_TIP_TEXT,
410            default=None,
411        ),
412    ],
413    table_prefix: Annotated[
414        str | None,
415        Field(
416            description="Optional table prefix to use when syncing to the destination.",
417            default=None,
418        ),
419    ],
420) -> str:
421    """Create a connection between a deployed source and destination on Airbyte Cloud."""
422    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
423    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
424    deployed_connection = workspace.deploy_connection(
425        connection_name=connection_name,
426        source=source_id,
427        destination=destination_id,
428        selected_streams=resolved_streams_list,
429        table_prefix=table_prefix,
430    )
431
432    register_guid_created_in_session(deployed_connection.connection_id)
433    return (
434        f"Successfully created connection '{connection_name}' "
435        f"with ID '{deployed_connection.connection_id}' and "
436        f"URL: {deployed_connection.connection_url}"
437    )

Create a connection between a deployed source and destination on Airbyte Cloud.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def run_cloud_sync( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
440@mcp_tool(
441    domain="cloud",
442    open_world=True,
443    extra_help_text=CLOUD_AUTH_TIP_TEXT,
444)
445def run_cloud_sync(
446    connection_id: Annotated[
447        str,
448        Field(description="The ID of the Airbyte Cloud connection."),
449    ],
450    *,
451    workspace_id: Annotated[
452        str | None,
453        Field(
454            description=WORKSPACE_ID_TIP_TEXT,
455            default=None,
456        ),
457    ],
458    wait: Annotated[
459        bool,
460        Field(
461            description=(
462                "Whether to wait for the sync to complete. Since a sync can take between several "
463                "minutes and several hours, this option is not recommended for most "
464                "scenarios."
465            ),
466            default=False,
467        ),
468    ],
469    wait_timeout: Annotated[
470        int,
471        Field(
472            description="Maximum time to wait for sync completion (seconds).",
473            default=300,
474        ),
475    ],
476) -> str:
477    """Run a sync job on Airbyte Cloud."""
478    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
479    connection = workspace.get_connection(connection_id=connection_id)
480    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
481
482    if wait:
483        status = sync_result.get_job_status()
484        return (
485            f"Sync completed with status: {status}. "
486            f"Job ID is '{sync_result.job_id}' and "
487            f"job URL is: {sync_result.job_url}"
488        )
489    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"

Run a sync job on Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def check_airbyte_cloud_workspace( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudWorkspaceResult:
492@mcp_tool(
493    domain="cloud",
494    read_only=True,
495    idempotent=True,
496    open_world=True,
497    extra_help_text=CLOUD_AUTH_TIP_TEXT,
498)
499def check_airbyte_cloud_workspace(
500    *,
501    workspace_id: Annotated[
502        str | None,
503        Field(
504            description=WORKSPACE_ID_TIP_TEXT,
505            default=None,
506        ),
507    ],
508) -> CloudWorkspaceResult:
509    """Check if we have a valid Airbyte Cloud connection and return workspace info.
510
511    Returns workspace details including workspace ID, name, and organization info.
512    """
513    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
514
515    # Get workspace details from the public API using workspace's credentials
516    workspace_response = api_util.get_workspace(
517        workspace_id=workspace.workspace_id,
518        api_root=workspace.api_root,
519        client_id=workspace.client_id,
520        client_secret=workspace.client_secret,
521        bearer_token=workspace.bearer_token,
522    )
523
524    # Try to get organization info, but fail gracefully if we don't have permissions.
525    # Fetching organization info requires ORGANIZATION_READER permissions on the organization,
526    # which may not be available with workspace-scoped credentials.
527    organization = workspace.get_organization(raise_on_error=False)
528
529    return CloudWorkspaceResult(
530        workspace_id=workspace_response.workspace_id,
531        workspace_name=workspace_response.name,
532        workspace_url=workspace.workspace_url,
533        organization_id=(
534            organization.organization_id
535            if organization
536            else "[unavailable - requires ORGANIZATION_READER permission]"
537        ),
538        organization_name=organization.organization_name if organization else None,
539    )

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace details including workspace ID, name, and organization info.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_noop_destination_to_cloud( name: str = 'No-op Destination', *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], unique: bool = True) -> str:
542@mcp_tool(
543    domain="cloud",
544    open_world=True,
545    extra_help_text=CLOUD_AUTH_TIP_TEXT,
546)
547def deploy_noop_destination_to_cloud(
548    name: str = "No-op Destination",
549    *,
550    workspace_id: Annotated[
551        str | None,
552        Field(
553            description=WORKSPACE_ID_TIP_TEXT,
554            default=None,
555        ),
556    ],
557    unique: bool = True,
558) -> str:
559    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
560    destination = get_noop_destination()
561    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
562    deployed_destination = workspace.deploy_destination(
563        name=name,
564        destination=destination,
565        unique=unique,
566    )
567    register_guid_created_in_session(deployed_destination.connector_id)
568    return (
569        f"Successfully deployed No-op Destination "
570        f"with ID '{deployed_destination.connector_id}' and "
571        f"URL: {deployed_destination.connector_url}"
572    )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_status( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
575@mcp_tool(
576    domain="cloud",
577    read_only=True,
578    idempotent=True,
579    open_world=True,
580    extra_help_text=CLOUD_AUTH_TIP_TEXT,
581)
582def get_cloud_sync_status(
583    connection_id: Annotated[
584        str,
585        Field(
586            description="The ID of the Airbyte Cloud connection.",
587        ),
588    ],
589    job_id: Annotated[
590        int | None,
591        Field(
592            description="Optional job ID. If not provided, the latest job will be used.",
593            default=None,
594        ),
595    ],
596    *,
597    workspace_id: Annotated[
598        str | None,
599        Field(
600            description=WORKSPACE_ID_TIP_TEXT,
601            default=None,
602        ),
603    ],
604    include_attempts: Annotated[
605        bool,
606        Field(
607            description="Whether to include detailed attempts information.",
608            default=False,
609        ),
610    ],
611) -> dict[str, Any]:
612    """Get the status of a sync job from the Airbyte Cloud."""
613    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
614    connection = workspace.get_connection(connection_id=connection_id)
615
616    # If a job ID is provided, get the job by ID.
617    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
618
619    if not sync_result:
620        return {"status": None, "job_id": None, "attempts": []}
621
622    result = {
623        "status": sync_result.get_job_status(),
624        "job_id": sync_result.job_id,
625        "bytes_synced": sync_result.bytes_synced,
626        "records_synced": sync_result.records_synced,
627        "start_time": sync_result.start_time.isoformat(),
628        "job_url": sync_result.job_url,
629        "attempts": [],
630    }
631
632    if include_attempts:
633        attempts = sync_result.get_attempts()
634        result["attempts"] = [
635            {
636                "attempt_number": attempt.attempt_number,
637                "attempt_id": attempt.attempt_id,
638                "status": attempt.status,
639                "bytes_synced": attempt.bytes_synced,
640                "records_synced": attempt.records_synced,
641                "created_at": attempt.created_at.isoformat(),
642            }
643            for attempt in attempts
644        ]
645
646    return result

Get the status of a sync job from the Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_sync_jobs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_jobs: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=20, description='Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.')], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description='When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if `jobs_offset` is not specified. Cannot combine `from_tail=True` with `jobs_offset`.')], jobs_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of jobs to skip from the beginning. Cannot be combined with `from_tail=True`.')]) -> SyncJobListResult:
649@mcp_tool(
650    domain="cloud",
651    read_only=True,
652    idempotent=True,
653    open_world=True,
654    extra_help_text=CLOUD_AUTH_TIP_TEXT,
655)
656def list_cloud_sync_jobs(
657    connection_id: Annotated[
658        str,
659        Field(description="The ID of the Airbyte Cloud connection."),
660    ],
661    *,
662    workspace_id: Annotated[
663        str | None,
664        Field(
665            description=WORKSPACE_ID_TIP_TEXT,
666            default=None,
667        ),
668    ],
669    max_jobs: Annotated[
670        int,
671        Field(
672            description=(
673                "Maximum number of jobs to return. "
674                "Defaults to 20 if not specified. "
675                "Maximum allowed value is 500."
676            ),
677            default=20,
678        ),
679    ],
680    from_tail: Annotated[
681        bool | None,
682        Field(
683            description=(
684                "When True, jobs are ordered newest-first (createdAt DESC). "
685                "When False, jobs are ordered oldest-first (createdAt ASC). "
686                "Defaults to True if `jobs_offset` is not specified. "
687                "Cannot combine `from_tail=True` with `jobs_offset`."
688            ),
689            default=None,
690        ),
691    ],
692    jobs_offset: Annotated[
693        int | None,
694        Field(
695            description=(
696                "Number of jobs to skip from the beginning. "
697                "Cannot be combined with `from_tail=True`."
698            ),
699            default=None,
700        ),
701    ],
702) -> SyncJobListResult:
703    """List sync jobs for a connection with pagination support.
704
705    This tool allows you to retrieve a list of sync jobs for a connection,
706    with control over ordering and pagination. By default, jobs are returned
707    newest-first (from_tail=True).
708    """
709    # Validate that jobs_offset and from_tail are not both set
710    if jobs_offset is not None and from_tail is True:
711        raise PyAirbyteInputError(
712            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
713            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
714        )
715
716    # Default to from_tail=True if neither is specified
717    if from_tail is None and jobs_offset is None:
718        from_tail = True
719    elif from_tail is None:
720        from_tail = False
721
722    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
723    connection = workspace.get_connection(connection_id=connection_id)
724
725    # Cap at 500 to avoid overloading agent context
726    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
727
728    sync_results = connection.get_previous_sync_logs(
729        limit=effective_limit,
730        offset=jobs_offset,
731        from_tail=from_tail,
732    )
733
734    jobs = [
735        SyncJobResult(
736            job_id=sync_result.job_id,
737            status=str(sync_result.get_job_status()),
738            bytes_synced=sync_result.bytes_synced,
739            records_synced=sync_result.records_synced,
740            start_time=sync_result.start_time.isoformat(),
741            job_url=sync_result.job_url,
742        )
743        for sync_result in sync_results
744    ]
745
746    return SyncJobListResult(
747        jobs=jobs,
748        jobs_count=len(jobs),
749        jobs_offset=jobs_offset or 0,
750        from_tail=from_tail,
751    )

List sync jobs for a connection with pagination support.

This tool allows you to retrieve a list of sync jobs for a connection, with control over ordering and pagination. By default, jobs are returned newest-first (from_tail=True).

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_source_connectors( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter sources by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudSourceResult]:
754@mcp_tool(
755    domain="cloud",
756    read_only=True,
757    idempotent=True,
758    open_world=True,
759    extra_help_text=CLOUD_AUTH_TIP_TEXT,
760)
761def list_deployed_cloud_source_connectors(
762    *,
763    workspace_id: Annotated[
764        str | None,
765        Field(
766            description=WORKSPACE_ID_TIP_TEXT,
767            default=None,
768        ),
769    ],
770    name_contains: Annotated[
771        str | None,
772        Field(
773            description="Optional case-insensitive substring to filter sources by name",
774            default=None,
775        ),
776    ],
777    max_items_limit: Annotated[
778        int | None,
779        Field(
780            description="Optional maximum number of items to return (default: no limit)",
781            default=None,
782        ),
783    ],
784) -> list[CloudSourceResult]:
785    """List all deployed source connectors in the Airbyte Cloud workspace."""
786    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
787    sources = workspace.list_sources()
788
789    # Filter by name if requested
790    if name_contains:
791        needle = name_contains.lower()
792        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
793
794    # Apply limit if requested
795    if max_items_limit is not None:
796        sources = sources[:max_items_limit]
797
798    # Note: name and url are guaranteed non-null from list API responses
799    return [
800        CloudSourceResult(
801            id=source.source_id,
802            name=cast(str, source.name),
803            url=cast(str, source.connector_url),
804        )
805        for source in sources
806    ]

List all deployed source connectors in the Airbyte Cloud workspace.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_destination_connectors( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter destinations by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudDestinationResult]:
809@mcp_tool(
810    domain="cloud",
811    read_only=True,
812    idempotent=True,
813    open_world=True,
814    extra_help_text=CLOUD_AUTH_TIP_TEXT,
815)
816def list_deployed_cloud_destination_connectors(
817    *,
818    workspace_id: Annotated[
819        str | None,
820        Field(
821            description=WORKSPACE_ID_TIP_TEXT,
822            default=None,
823        ),
824    ],
825    name_contains: Annotated[
826        str | None,
827        Field(
828            description="Optional case-insensitive substring to filter destinations by name",
829            default=None,
830        ),
831    ],
832    max_items_limit: Annotated[
833        int | None,
834        Field(
835            description="Optional maximum number of items to return (default: no limit)",
836            default=None,
837        ),
838    ],
839) -> list[CloudDestinationResult]:
840    """List all deployed destination connectors in the Airbyte Cloud workspace."""
841    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
842    destinations = workspace.list_destinations()
843
844    # Filter by name if requested
845    if name_contains:
846        needle = name_contains.lower()
847        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
848
849    # Apply limit if requested
850    if max_items_limit is not None:
851        destinations = destinations[:max_items_limit]
852
853    # Note: name and url are guaranteed non-null from list API responses
854    return [
855        CloudDestinationResult(
856            id=destination.destination_id,
857            name=cast(str, destination.name),
858            url=cast(str, destination.connector_url),
859        )
860        for destination in destinations
861    ]

List all deployed destination connectors in the Airbyte Cloud workspace.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the source to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudSourceDetails:
864@mcp_tool(
865    domain="cloud",
866    read_only=True,
867    idempotent=True,
868    open_world=True,
869    extra_help_text=CLOUD_AUTH_TIP_TEXT,
870)
871def describe_cloud_source(
872    source_id: Annotated[
873        str,
874        Field(description="The ID of the source to describe."),
875    ],
876    *,
877    workspace_id: Annotated[
878        str | None,
879        Field(
880            description=WORKSPACE_ID_TIP_TEXT,
881            default=None,
882        ),
883    ],
884) -> CloudSourceDetails:
885    """Get detailed information about a specific deployed source connector."""
886    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
887    source = workspace.get_source(source_id=source_id)
888
889    # Access name property to ensure _connector_info is populated
890    source_name = cast(str, source.name)
891
892    return CloudSourceDetails(
893        source_id=source.source_id,
894        source_name=source_name,
895        source_url=source.connector_url,
896        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
897    )

Get detailed information about a specific deployed source connector.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the destination to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudDestinationDetails:
900@mcp_tool(
901    domain="cloud",
902    read_only=True,
903    idempotent=True,
904    open_world=True,
905    extra_help_text=CLOUD_AUTH_TIP_TEXT,
906)
907def describe_cloud_destination(
908    destination_id: Annotated[
909        str,
910        Field(description="The ID of the destination to describe."),
911    ],
912    *,
913    workspace_id: Annotated[
914        str | None,
915        Field(
916            description=WORKSPACE_ID_TIP_TEXT,
917            default=None,
918        ),
919    ],
920) -> CloudDestinationDetails:
921    """Get detailed information about a specific deployed destination connector."""
922    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
923    destination = workspace.get_destination(destination_id=destination_id)
924
925    # Access name property to ensure _connector_info is populated
926    destination_name = cast(str, destination.name)
927
928    return CloudDestinationDetails(
929        destination_id=destination.destination_id,
930        destination_name=destination_name,
931        destination_url=destination.connector_url,
932        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
933    )

Get detailed information about a specific deployed destination connector.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudConnectionDetails:
936@mcp_tool(
937    domain="cloud",
938    read_only=True,
939    idempotent=True,
940    open_world=True,
941    extra_help_text=CLOUD_AUTH_TIP_TEXT,
942)
943def describe_cloud_connection(
944    connection_id: Annotated[
945        str,
946        Field(description="The ID of the connection to describe."),
947    ],
948    *,
949    workspace_id: Annotated[
950        str | None,
951        Field(
952            description=WORKSPACE_ID_TIP_TEXT,
953            default=None,
954        ),
955    ],
956) -> CloudConnectionDetails:
957    """Get detailed information about a specific deployed connection."""
958    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
959    connection = workspace.get_connection(connection_id=connection_id)
960
961    return CloudConnectionDetails(
962        connection_id=connection.connection_id,
963        connection_name=cast(str, connection.name),
964        connection_url=cast(str, connection.connection_url),
965        source_id=connection.source_id,
966        source_name=cast(str, connection.source.name),
967        destination_id=connection.destination_id,
968        destination_name=cast(str, connection.destination.name),
969        selected_streams=connection.stream_names,
970        table_prefix=connection.table_prefix,
971    )

Get detailed information about a specific deployed connection.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_logs( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_lines: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=4000, description="Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.")], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`.")], line_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`.')]) -> LogReadResult:
 974@mcp_tool(
 975    domain="cloud",
 976    read_only=True,
 977    idempotent=True,
 978    open_world=True,
 979    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 980)
 981def get_cloud_sync_logs(
 982    connection_id: Annotated[
 983        str,
 984        Field(description="The ID of the Airbyte Cloud connection."),
 985    ],
 986    job_id: Annotated[
 987        int | None,
 988        Field(description="Optional job ID. If not provided, the latest job will be used."),
 989    ] = None,
 990    attempt_number: Annotated[
 991        int | None,
 992        Field(
 993            description="Optional attempt number. If not provided, the latest attempt will be used."
 994        ),
 995    ] = None,
 996    *,
 997    workspace_id: Annotated[
 998        str | None,
 999        Field(
1000            description=WORKSPACE_ID_TIP_TEXT,
1001            default=None,
1002        ),
1003    ],
1004    max_lines: Annotated[
1005        int,
1006        Field(
1007            description=(
1008                "Maximum number of lines to return. "
1009                "Defaults to 4000 if not specified. "
1010                "If '0' is provided, no limit is applied."
1011            ),
1012            default=4000,
1013        ),
1014    ],
1015    from_tail: Annotated[
1016        bool | None,
1017        Field(
1018            description=(
1019                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1020                "Defaults to True if `line_offset` is not specified. "
1021                "Cannot combine `from_tail=True` with `line_offset`."
1022            ),
1023            default=None,
1024        ),
1025    ],
1026    line_offset: Annotated[
1027        int | None,
1028        Field(
1029            description=(
1030                "Number of lines to skip from the beginning of the logs. "
1031                "Cannot be combined with `from_tail=True`."
1032            ),
1033            default=None,
1034        ),
1035    ],
1036) -> LogReadResult:
1037    """Get the logs from a sync job attempt on Airbyte Cloud."""
1038    # Validate that line_offset and from_tail are not both set
1039    if line_offset is not None and from_tail:
1040        raise PyAirbyteInputError(
1041            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1042            context={"line_offset": line_offset, "from_tail": from_tail},
1043        )
1044
1045    if from_tail is None and line_offset is None:
1046        from_tail = True
1047    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1048    connection = workspace.get_connection(connection_id=connection_id)
1049
1050    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1051
1052    if not sync_result:
1053        raise AirbyteMissingResourceError(
1054            resource_type="sync job",
1055            resource_name_or_id=connection_id,
1056        )
1057
1058    attempts = sync_result.get_attempts()
1059
1060    if not attempts:
1061        raise AirbyteMissingResourceError(
1062            resource_type="sync attempt",
1063            resource_name_or_id=str(sync_result.job_id),
1064        )
1065
1066    if attempt_number is not None:
1067        target_attempt = None
1068        for attempt in attempts:
1069            if attempt.attempt_number == attempt_number:
1070                target_attempt = attempt
1071                break
1072
1073        if target_attempt is None:
1074            raise AirbyteMissingResourceError(
1075                resource_type="sync attempt",
1076                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1077            )
1078    else:
1079        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1080
1081    logs = target_attempt.get_full_log_text()
1082
1083    if not logs:
1084        # Return empty result with zero lines
1085        return LogReadResult(
1086            log_text=(
1087                f"[No logs available for job '{sync_result.job_id}', "
1088                f"attempt {target_attempt.attempt_number}.]"
1089            ),
1090            log_text_start_line=1,
1091            log_text_line_count=0,
1092            total_log_lines_available=0,
1093            job_id=sync_result.job_id,
1094            attempt_number=target_attempt.attempt_number,
1095        )
1096
1097    # Apply line limiting
1098    log_lines = logs.splitlines()
1099    total_lines = len(log_lines)
1100
1101    # Determine effective max_lines (0 means no limit)
1102    effective_max = total_lines if max_lines == 0 else max_lines
1103
1104    # Calculate start_index and slice based on from_tail or line_offset
1105    if from_tail:
1106        start_index = max(0, total_lines - effective_max)
1107        selected_lines = log_lines[start_index:][:effective_max]
1108    else:
1109        start_index = line_offset or 0
1110        selected_lines = log_lines[start_index : start_index + effective_max]
1111
1112    return LogReadResult(
1113        log_text="\n".join(selected_lines),
1114        log_text_start_line=start_index + 1,  # Convert to 1-based index
1115        log_text_line_count=len(selected_lines),
1116        total_log_lines_available=total_lines,
1117        job_id=sync_result.job_id,
1118        attempt_number=target_attempt.attempt_number,
1119    )

Get the logs from a sync job attempt on Airbyte Cloud.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_connections( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter connections by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')], with_connection_status: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description="If True, include status info for each connection's most recent sync job")], failing_connections_only: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description='If True, only return connections with failed/cancelled last sync')]) -> list[CloudConnectionResult]:
1122@mcp_tool(
1123    domain="cloud",
1124    read_only=True,
1125    idempotent=True,
1126    open_world=True,
1127    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1128)
1129def list_deployed_cloud_connections(
1130    *,
1131    workspace_id: Annotated[
1132        str | None,
1133        Field(
1134            description=WORKSPACE_ID_TIP_TEXT,
1135            default=None,
1136        ),
1137    ],
1138    name_contains: Annotated[
1139        str | None,
1140        Field(
1141            description="Optional case-insensitive substring to filter connections by name",
1142            default=None,
1143        ),
1144    ],
1145    max_items_limit: Annotated[
1146        int | None,
1147        Field(
1148            description="Optional maximum number of items to return (default: no limit)",
1149            default=None,
1150        ),
1151    ],
1152    with_connection_status: Annotated[
1153        bool | None,
1154        Field(
1155            description="If True, include status info for each connection's most recent sync job",
1156            default=False,
1157        ),
1158    ],
1159    failing_connections_only: Annotated[
1160        bool | None,
1161        Field(
1162            description="If True, only return connections with failed/cancelled last sync",
1163            default=False,
1164        ),
1165    ],
1166) -> list[CloudConnectionResult]:
1167    """List all deployed connections in the Airbyte Cloud workspace.
1168
1169    When with_connection_status is True, each connection result will include
1170    information about the most recent sync job status, skipping over any
1171    currently in-progress syncs to find the last completed job.
1172
1173    When failing_connections_only is True, only connections where the most
1174    recent completed sync job failed or was cancelled will be returned.
1175    This implicitly enables with_connection_status.
1176    """
1177    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1178    connections = workspace.list_connections()
1179
1180    # Filter by name if requested
1181    if name_contains:
1182        needle = name_contains.lower()
1183        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1184
1185    # If failing_connections_only is True, implicitly enable with_connection_status
1186    if failing_connections_only:
1187        with_connection_status = True
1188
1189    results: list[CloudConnectionResult] = []
1190
1191    for connection in connections:
1192        last_job_status: str | None = None
1193        last_job_id: int | None = None
1194        last_job_time: str | None = None
1195        currently_running_job_id: int | None = None
1196        currently_running_job_start_time: str | None = None
1197
1198        if with_connection_status:
1199            sync_logs = connection.get_previous_sync_logs(limit=5)
1200            last_completed_job_status = None  # Keep enum for comparison
1201
1202            for sync_result in sync_logs:
1203                job_status = sync_result.get_job_status()
1204
1205                if not sync_result.is_job_complete():
1206                    currently_running_job_id = sync_result.job_id
1207                    currently_running_job_start_time = sync_result.start_time.isoformat()
1208                    continue
1209
1210                last_completed_job_status = job_status
1211                last_job_status = str(job_status.value) if job_status else None
1212                last_job_id = sync_result.job_id
1213                last_job_time = sync_result.start_time.isoformat()
1214                break
1215
1216            if failing_connections_only and (
1217                last_completed_job_status is None
1218                or last_completed_job_status not in FAILED_STATUSES
1219            ):
1220                continue
1221
1222        results.append(
1223            CloudConnectionResult(
1224                id=connection.connection_id,
1225                name=cast(str, connection.name),
1226                url=cast(str, connection.connection_url),
1227                source_id=connection.source_id,
1228                destination_id=connection.destination_id,
1229                last_job_status=last_job_status,
1230                last_job_id=last_job_id,
1231                last_job_time=last_job_time,
1232                currently_running_job_id=currently_running_job_id,
1233                currently_running_job_start_time=currently_running_job_start_time,
1234            )
1235        )
1236
1237        if max_items_limit is not None and len(results) >= max_items_limit:
1238            break
1239
1240    return results

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include information about the most recent sync job status, skipping over any currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most recent completed sync job failed or was cancelled will be returned. This implicitly enables with_connection_status.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_workspaces( *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional substring to filter workspaces by name (server-side filtering)')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudWorkspaceResult]:
1347@mcp_tool(
1348    domain="cloud",
1349    read_only=True,
1350    idempotent=True,
1351    open_world=True,
1352    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1353)
1354def list_cloud_workspaces(
1355    *,
1356    organization_id: Annotated[
1357        str | None,
1358        Field(
1359            description="Organization ID. Required if organization_name is not provided.",
1360            default=None,
1361        ),
1362    ],
1363    organization_name: Annotated[
1364        str | None,
1365        Field(
1366            description=(
1367                "Organization name (exact match). " "Required if organization_id is not provided."
1368            ),
1369            default=None,
1370        ),
1371    ],
1372    name_contains: Annotated[
1373        str | None,
1374        Field(
1375            description="Optional substring to filter workspaces by name (server-side filtering)",
1376            default=None,
1377        ),
1378    ],
1379    max_items_limit: Annotated[
1380        int | None,
1381        Field(
1382            description="Optional maximum number of items to return (default: no limit)",
1383            default=None,
1384        ),
1385    ],
1386) -> list[CloudWorkspaceResult]:
1387    """List all workspaces in a specific organization.
1388
1389    Requires either organization_id OR organization_name (exact match) to be provided.
1390    This tool will NOT list workspaces across all organizations - you must specify
1391    which organization to list workspaces from.
1392    """
1393    credentials = resolve_cloud_credentials()
1394
1395    resolved_org_id = _resolve_organization_id(
1396        organization_id=organization_id,
1397        organization_name=organization_name,
1398        api_root=credentials.api_root,
1399        client_id=credentials.client_id,
1400        client_secret=credentials.client_secret,
1401        bearer_token=credentials.bearer_token,
1402    )
1403
1404    workspaces = api_util.list_workspaces_in_organization(
1405        organization_id=resolved_org_id,
1406        api_root=credentials.api_root,
1407        client_id=credentials.client_id,
1408        client_secret=credentials.client_secret,
1409        bearer_token=credentials.bearer_token,
1410        name_contains=name_contains,
1411        max_items_limit=max_items_limit,
1412    )
1413
1414    return [
1415        CloudWorkspaceResult(
1416            workspace_id=ws.get("workspaceId", ""),
1417            workspace_name=ws.get("name", ""),
1418            organization_id=ws.get("organizationId", ""),
1419        )
1420        for ws in workspaces
1421    ]

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided. This tool will NOT list workspaces across all organizations - you must specify which organization to list workspaces from.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_organization( *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')]) -> CloudOrganizationResult:
1424@mcp_tool(
1425    domain="cloud",
1426    read_only=True,
1427    idempotent=True,
1428    open_world=True,
1429    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1430)
1431def describe_cloud_organization(
1432    *,
1433    organization_id: Annotated[
1434        str | None,
1435        Field(
1436            description="Organization ID. Required if organization_name is not provided.",
1437            default=None,
1438        ),
1439    ],
1440    organization_name: Annotated[
1441        str | None,
1442        Field(
1443            description=(
1444                "Organization name (exact match). " "Required if organization_id is not provided."
1445            ),
1446            default=None,
1447        ),
1448    ],
1449) -> CloudOrganizationResult:
1450    """Get details about a specific organization.
1451
1452    Requires either organization_id OR organization_name (exact match) to be provided.
1453    This tool is useful for looking up an organization's ID from its name, or vice versa.
1454    """
1455    credentials = resolve_cloud_credentials()
1456
1457    org = _resolve_organization(
1458        organization_id=organization_id,
1459        organization_name=organization_name,
1460        api_root=credentials.api_root,
1461        client_id=credentials.client_id,
1462        client_secret=credentials.client_secret,
1463        bearer_token=credentials.bearer_token,
1464    )
1465
1466    return CloudOrganizationResult(
1467        id=org.organization_id,
1468        name=org.organization_name,
1469        email=org.email,
1470    )

Get details about a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided. This tool is useful for looking up an organization's ID from its name, or vice versa.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def publish_custom_source_definition( name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name for the custom connector definition.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors.')] = None, unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')] = True, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before publishing.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1487@mcp_tool(
1488    domain="cloud",
1489    open_world=True,
1490    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1491)
1492def publish_custom_source_definition(
1493    name: Annotated[
1494        str,
1495        Field(description="The name for the custom connector definition."),
1496    ],
1497    *,
1498    workspace_id: Annotated[
1499        str | None,
1500        Field(
1501            description=WORKSPACE_ID_TIP_TEXT,
1502            default=None,
1503        ),
1504    ],
1505    manifest_yaml: Annotated[
1506        str | Path | None,
1507        Field(
1508            description=(
1509                "The Low-code CDK manifest as a YAML string or file path. "
1510                "Required for YAML connectors."
1511            ),
1512            default=None,
1513        ),
1514    ] = None,
1515    unique: Annotated[
1516        bool,
1517        Field(
1518            description="Whether to require a unique name.",
1519            default=True,
1520        ),
1521    ] = True,
1522    pre_validate: Annotated[
1523        bool,
1524        Field(
1525            description="Whether to validate the manifest client-side before publishing.",
1526            default=True,
1527        ),
1528    ] = True,
1529    testing_values: Annotated[
1530        dict | str | None,
1531        Field(
1532            description=(
1533                "Optional testing configuration values for the Builder UI. "
1534                "Can be provided as a JSON object or JSON string. "
1535                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1536                "If provided, these values replace any existing testing values "
1537                "for the connector builder project, allowing immediate test read operations."
1538            ),
1539            default=None,
1540        ),
1541    ],
1542    testing_values_secret_name: Annotated[
1543        str | None,
1544        Field(
1545            description=(
1546                "Optional name of a secret containing testing configuration values "
1547                "in JSON or YAML format. The secret will be resolved by the MCP "
1548                "server and merged into testing_values, with secret values taking "
1549                "precedence. This lets the agent reference secrets without sending "
1550                "raw values as tool arguments."
1551            ),
1552            default=None,
1553        ),
1554    ],
1555) -> str:
1556    """Publish a custom YAML source connector definition to Airbyte Cloud.
1557
1558    Note: Only YAML (declarative) connectors are currently supported.
1559    Docker-based custom sources are not yet available.
1560    """
1561    processed_manifest = manifest_yaml
1562    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1563        processed_manifest = Path(manifest_yaml)
1564
1565    # Resolve testing values from inline config and/or secret
1566    testing_values_dict: dict[str, Any] | None = None
1567    if testing_values is not None or testing_values_secret_name is not None:
1568        testing_values_dict = (
1569            resolve_config(
1570                config=testing_values,
1571                config_secret_name=testing_values_secret_name,
1572            )
1573            or None
1574        )
1575
1576    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1577    custom_source = workspace.publish_custom_source_definition(
1578        name=name,
1579        manifest_yaml=processed_manifest,
1580        unique=unique,
1581        pre_validate=pre_validate,
1582        testing_values=testing_values_dict,
1583    )
1584    register_guid_created_in_session(custom_source.definition_id)
1585    return (
1586        "Successfully published custom YAML source definition:\n"
1587        + _get_custom_source_definition_description(
1588            custom_source=custom_source,
1589        )
1590        + "\n"
1591    )

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True)
def list_custom_source_definitions( *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> list[dict[str, typing.Any]]:
1594@mcp_tool(
1595    domain="cloud",
1596    read_only=True,
1597    idempotent=True,
1598    open_world=True,
1599)
1600def list_custom_source_definitions(
1601    *,
1602    workspace_id: Annotated[
1603        str | None,
1604        Field(
1605            description=WORKSPACE_ID_TIP_TEXT,
1606            default=None,
1607        ),
1608    ],
1609) -> list[dict[str, Any]]:
1610    """List custom YAML source definitions in the Airbyte Cloud workspace.
1611
1612    Note: Only YAML (declarative) connectors are currently supported.
1613    Docker-based custom sources are not yet available.
1614    """
1615    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1616    definitions = workspace.list_custom_source_definitions(
1617        definition_type="yaml",
1618    )
1619
1620    return [
1621        {
1622            "definition_id": d.definition_id,
1623            "name": d.name,
1624            "version": d.version,
1625            "connector_builder_project_url": d.connector_builder_project_url,
1626        }
1627        for d in definitions
1628    ]

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def update_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the definition to update.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='New manifest as YAML string or file path. Optional; omit to update only testing values.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before updating.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1631@mcp_tool(
1632    domain="cloud",
1633    destructive=True,
1634    open_world=True,
1635)
1636def update_custom_source_definition(
1637    definition_id: Annotated[
1638        str,
1639        Field(description="The ID of the definition to update."),
1640    ],
1641    manifest_yaml: Annotated[
1642        str | Path | None,
1643        Field(
1644            description=(
1645                "New manifest as YAML string or file path. "
1646                "Optional; omit to update only testing values."
1647            ),
1648            default=None,
1649        ),
1650    ] = None,
1651    *,
1652    workspace_id: Annotated[
1653        str | None,
1654        Field(
1655            description=WORKSPACE_ID_TIP_TEXT,
1656            default=None,
1657        ),
1658    ],
1659    pre_validate: Annotated[
1660        bool,
1661        Field(
1662            description="Whether to validate the manifest client-side before updating.",
1663            default=True,
1664        ),
1665    ] = True,
1666    testing_values: Annotated[
1667        dict | str | None,
1668        Field(
1669            description=(
1670                "Optional testing configuration values for the Builder UI. "
1671                "Can be provided as a JSON object or JSON string. "
1672                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1673                "If provided, these values replace any existing testing values "
1674                "for the connector builder project. The entire testing values object "
1675                "is overwritten, so pass the full set of values you want to persist."
1676            ),
1677            default=None,
1678        ),
1679    ],
1680    testing_values_secret_name: Annotated[
1681        str | None,
1682        Field(
1683            description=(
1684                "Optional name of a secret containing testing configuration values "
1685                "in JSON or YAML format. The secret will be resolved by the MCP "
1686                "server and merged into testing_values, with secret values taking "
1687                "precedence. This lets the agent reference secrets without sending "
1688                "raw values as tool arguments."
1689            ),
1690            default=None,
1691        ),
1692    ],
1693) -> str:
1694    """Update a custom YAML source definition in Airbyte Cloud.
1695
1696    Updates the manifest and/or testing values for an existing custom source definition.
1697    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1698    """
1699    check_guid_created_in_session(definition_id)
1700
1701    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1702
1703    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1704        raise PyAirbyteInputError(
1705            message=(
1706                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1707                "must be provided to update a custom source definition."
1708            ),
1709            context={
1710                "definition_id": definition_id,
1711                "workspace_id": workspace.workspace_id,
1712            },
1713        )
1714
1715    processed_manifest: str | Path | None = manifest_yaml
1716    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1717        processed_manifest = Path(manifest_yaml)
1718
1719    # Resolve testing values from inline config and/or secret
1720    testing_values_dict: dict[str, Any] | None = None
1721    if testing_values is not None or testing_values_secret_name is not None:
1722        testing_values_dict = (
1723            resolve_config(
1724                config=testing_values,
1725                config_secret_name=testing_values_secret_name,
1726            )
1727            or None
1728        )
1729
1730    definition = workspace.get_custom_source_definition(
1731        definition_id=definition_id,
1732        definition_type="yaml",
1733    )
1734    custom_source: CustomCloudSourceDefinition = definition
1735
1736    if processed_manifest is not None:
1737        custom_source = definition.update_definition(
1738            manifest_yaml=processed_manifest,
1739            pre_validate=pre_validate,
1740        )
1741
1742    if testing_values_dict is not None:
1743        custom_source.set_testing_values(testing_values_dict)
1744
1745    return (
1746        "Successfully updated custom YAML source definition:\n"
1747        + _get_custom_source_definition_description(
1748            custom_source=custom_source,
1749        )
1750    )

Update a custom YAML source definition in Airbyte Cloud.

Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.

@mcp_tool(domain='cloud', destructive=True, open_world=True)
def permanently_delete_custom_source_definition( definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the custom source definition (for verification).')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1753@mcp_tool(
1754    domain="cloud",
1755    destructive=True,
1756    open_world=True,
1757)
1758def permanently_delete_custom_source_definition(
1759    definition_id: Annotated[
1760        str,
1761        Field(description="The ID of the custom source definition to delete."),
1762    ],
1763    name: Annotated[
1764        str,
1765        Field(description="The expected name of the custom source definition (for verification)."),
1766    ],
1767    *,
1768    workspace_id: Annotated[
1769        str | None,
1770        Field(
1771            description=WORKSPACE_ID_TIP_TEXT,
1772            default=None,
1773        ),
1774    ],
1775) -> str:
1776    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1777
1778    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1779    (case insensitive).
1780
1781    If the connector does not meet this requirement, the deletion will be rejected with a
1782    helpful error message. Instruct the user to rename the connector appropriately to authorize
1783    the deletion.
1784
1785    The provided name must match the actual name of the definition for the operation to proceed.
1786    This is a safety measure to ensure you are deleting the correct resource.
1787
1788    Note: Only YAML (declarative) connectors are currently supported.
1789    Docker-based custom sources are not yet available.
1790    """
1791    check_guid_created_in_session(definition_id)
1792    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
1793    definition = workspace.get_custom_source_definition(
1794        definition_id=definition_id,
1795        definition_type="yaml",
1796    )
1797    actual_name: str = definition.name
1798
1799    # Verify the name matches
1800    if actual_name != name:
1801        raise PyAirbyteInputError(
1802            message=(
1803                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1804                "The provided name must exactly match the definition's actual name. "
1805                "This is a safety measure to prevent accidental deletion."
1806            ),
1807            context={
1808                "definition_id": definition_id,
1809                "expected_name": name,
1810                "actual_name": actual_name,
1811            },
1812        )
1813
1814    definition.permanently_delete(
1815        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1816    )
1817    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the source (for verification).')]) -> str:
1820@mcp_tool(
1821    domain="cloud",
1822    destructive=True,
1823    open_world=True,
1824    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1825)
1826def permanently_delete_cloud_source(
1827    source_id: Annotated[
1828        str,
1829        Field(description="The ID of the deployed source to delete."),
1830    ],
1831    name: Annotated[
1832        str,
1833        Field(description="The expected name of the source (for verification)."),
1834    ],
1835) -> str:
1836    """Permanently delete a deployed source connector from Airbyte Cloud.
1837
1838    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1839    (case insensitive).
1840
1841    If the source does not meet this requirement, the deletion will be rejected with a
1842    helpful error message. Instruct the user to rename the source appropriately to authorize
1843    the deletion.
1844
1845    The provided name must match the actual name of the source for the operation to proceed.
1846    This is a safety measure to ensure you are deleting the correct resource.
1847    """
1848    check_guid_created_in_session(source_id)
1849    workspace: CloudWorkspace = _get_cloud_workspace()
1850    source = workspace.get_source(source_id=source_id)
1851    actual_name: str = cast(str, source.name)
1852
1853    # Verify the name matches
1854    if actual_name != name:
1855        raise PyAirbyteInputError(
1856            message=(
1857                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1858                "The provided name must exactly match the source's actual name. "
1859                "This is a safety measure to prevent accidental deletion."
1860            ),
1861            context={
1862                "source_id": source_id,
1863                "expected_name": name,
1864                "actual_name": actual_name,
1865            },
1866        )
1867
1868    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1869    workspace.permanently_delete_source(
1870        source=source_id,
1871        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1872    )
1873    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" (case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the source appropriately to authorize the deletion.

The provided name must match the actual name of the source for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the destination (for verification).')]) -> str:
1876@mcp_tool(
1877    domain="cloud",
1878    destructive=True,
1879    open_world=True,
1880    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1881)
1882def permanently_delete_cloud_destination(
1883    destination_id: Annotated[
1884        str,
1885        Field(description="The ID of the deployed destination to delete."),
1886    ],
1887    name: Annotated[
1888        str,
1889        Field(description="The expected name of the destination (for verification)."),
1890    ],
1891) -> str:
1892    """Permanently delete a deployed destination connector from Airbyte Cloud.
1893
1894    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
1895    (case insensitive).
1896
1897    If the destination does not meet this requirement, the deletion will be rejected with a
1898    helpful error message. Instruct the user to rename the destination appropriately to authorize
1899    the deletion.
1900
1901    The provided name must match the actual name of the destination for the operation to proceed.
1902    This is a safety measure to ensure you are deleting the correct resource.
1903    """
1904    check_guid_created_in_session(destination_id)
1905    workspace: CloudWorkspace = _get_cloud_workspace()
1906    destination = workspace.get_destination(destination_id=destination_id)
1907    actual_name: str = cast(str, destination.name)
1908
1909    # Verify the name matches
1910    if actual_name != name:
1911        raise PyAirbyteInputError(
1912            message=(
1913                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1914                "The provided name must exactly match the destination's actual name. "
1915                "This is a safety measure to prevent accidental deletion."
1916            ),
1917            context={
1918                "destination_id": destination_id,
1919                "expected_name": name,
1920                "actual_name": actual_name,
1921            },
1922        )
1923
1924    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1925    workspace.permanently_delete_destination(
1926        destination=destination_id,
1927        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
1928    )
1929    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" (case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the destination appropriately to authorize the deletion.

The provided name must match the actual name of the destination for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the connection (for verification).')], *, cascade_delete_source: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the source connector associated with this connection.')] = False, cascade_delete_destination: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the destination connector associated with this connection.')] = False) -> str:
1932@mcp_tool(
1933    domain="cloud",
1934    destructive=True,
1935    open_world=True,
1936    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1937)
1938def permanently_delete_cloud_connection(
1939    connection_id: Annotated[
1940        str,
1941        Field(description="The ID of the connection to delete."),
1942    ],
1943    name: Annotated[
1944        str,
1945        Field(description="The expected name of the connection (for verification)."),
1946    ],
1947    *,
1948    cascade_delete_source: Annotated[
1949        bool,
1950        Field(
1951            description=(
1952                "Whether to also delete the source connector associated with this connection."
1953            ),
1954            default=False,
1955        ),
1956    ] = False,
1957    cascade_delete_destination: Annotated[
1958        bool,
1959        Field(
1960            description=(
1961                "Whether to also delete the destination connector associated with this connection."
1962            ),
1963            default=False,
1964        ),
1965    ] = False,
1966) -> str:
1967    """Permanently delete a connection from Airbyte Cloud.
1968
1969    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
1970    (case insensitive).
1971
1972    If the connection does not meet this requirement, the deletion will be rejected with a
1973    helpful error message. Instruct the user to rename the connection appropriately to authorize
1974    the deletion.
1975
1976    The provided name must match the actual name of the connection for the operation to proceed.
1977    This is a safety measure to ensure you are deleting the correct resource.
1978    """
1979    check_guid_created_in_session(connection_id)
1980    workspace: CloudWorkspace = _get_cloud_workspace()
1981    connection = workspace.get_connection(connection_id=connection_id)
1982    actual_name: str = cast(str, connection.name)
1983
1984    # Verify the name matches
1985    if actual_name != name:
1986        raise PyAirbyteInputError(
1987            message=(
1988                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1989                "The provided name must exactly match the connection's actual name. "
1990                "This is a safety measure to prevent accidental deletion."
1991            ),
1992            context={
1993                "connection_id": connection_id,
1994                "expected_name": name,
1995                "actual_name": actual_name,
1996            },
1997        )
1998
1999    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2000    workspace.permanently_delete_connection(
2001        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2002        connection=connection_id,
2003        cascade_delete_source=cascade_delete_source,
2004        cascade_delete_destination=cascade_delete_destination,
2005    )
2006    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" (case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connection appropriately to authorize the deletion.

The provided name must match the actual name of the connection for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_source( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the source.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2009@mcp_tool(
2010    domain="cloud",
2011    open_world=True,
2012    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2013)
2014def rename_cloud_source(
2015    source_id: Annotated[
2016        str,
2017        Field(description="The ID of the deployed source to rename."),
2018    ],
2019    name: Annotated[
2020        str,
2021        Field(description="New name for the source."),
2022    ],
2023    *,
2024    workspace_id: Annotated[
2025        str | None,
2026        Field(
2027            description=WORKSPACE_ID_TIP_TEXT,
2028            default=None,
2029        ),
2030    ],
2031) -> str:
2032    """Rename a deployed source connector on Airbyte Cloud."""
2033    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2034    source = workspace.get_source(source_id=source_id)
2035    source.rename(name=name)
2036    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"

Rename a deployed source connector on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_source_config( source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2039@mcp_tool(
2040    domain="cloud",
2041    destructive=True,
2042    open_world=True,
2043    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2044)
2045def update_cloud_source_config(
2046    source_id: Annotated[
2047        str,
2048        Field(description="The ID of the deployed source to update."),
2049    ],
2050    config: Annotated[
2051        dict | str,
2052        Field(
2053            description="New configuration for the source connector.",
2054        ),
2055    ],
2056    config_secret_name: Annotated[
2057        str | None,
2058        Field(
2059            description="The name of the secret containing the configuration.",
2060            default=None,
2061        ),
2062    ] = None,
2063    *,
2064    workspace_id: Annotated[
2065        str | None,
2066        Field(
2067            description=WORKSPACE_ID_TIP_TEXT,
2068            default=None,
2069        ),
2070    ],
2071) -> str:
2072    """Update a deployed source connector's configuration on Airbyte Cloud.
2073
2074    This is a destructive operation that can break existing connections if the
2075    configuration is changed incorrectly. Use with caution.
2076    """
2077    check_guid_created_in_session(source_id)
2078    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2079    source = workspace.get_source(source_id=source_id)
2080
2081    config_dict = resolve_config(
2082        config=config,
2083        config_secret_name=config_secret_name,
2084        config_spec_jsonschema=None,  # We don't have the spec here
2085    )
2086
2087    source.update_config(config=config_dict)
2088    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_destination( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2091@mcp_tool(
2092    domain="cloud",
2093    open_world=True,
2094    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2095)
2096def rename_cloud_destination(
2097    destination_id: Annotated[
2098        str,
2099        Field(description="The ID of the deployed destination to rename."),
2100    ],
2101    name: Annotated[
2102        str,
2103        Field(description="New name for the destination."),
2104    ],
2105    *,
2106    workspace_id: Annotated[
2107        str | None,
2108        Field(
2109            description=WORKSPACE_ID_TIP_TEXT,
2110            default=None,
2111        ),
2112    ],
2113) -> str:
2114    """Rename a deployed destination connector on Airbyte Cloud."""
2115    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2116    destination = workspace.get_destination(destination_id=destination_id)
2117    destination.rename(name=name)
2118    return (
2119        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2120        f"URL: {destination.connector_url}"
2121    )

Rename a deployed destination connector on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_destination_config( destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2124@mcp_tool(
2125    domain="cloud",
2126    destructive=True,
2127    open_world=True,
2128    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2129)
2130def update_cloud_destination_config(
2131    destination_id: Annotated[
2132        str,
2133        Field(description="The ID of the deployed destination to update."),
2134    ],
2135    config: Annotated[
2136        dict | str,
2137        Field(
2138            description="New configuration for the destination connector.",
2139        ),
2140    ],
2141    config_secret_name: Annotated[
2142        str | None,
2143        Field(
2144            description="The name of the secret containing the configuration.",
2145            default=None,
2146        ),
2147    ],
2148    *,
2149    workspace_id: Annotated[
2150        str | None,
2151        Field(
2152            description=WORKSPACE_ID_TIP_TEXT,
2153            default=None,
2154        ),
2155    ],
2156) -> str:
2157    """Update a deployed destination connector's configuration on Airbyte Cloud.
2158
2159    This is a destructive operation that can break existing connections if the
2160    configuration is changed incorrectly. Use with caution.
2161    """
2162    check_guid_created_in_session(destination_id)
2163    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2164    destination = workspace.get_destination(destination_id=destination_id)
2165
2166    config_dict = resolve_config(
2167        config=config,
2168        config_secret_name=config_secret_name,
2169        config_spec_jsonschema=None,  # We don't have the spec here
2170    )
2171
2172    destination.update_config(config=config_dict)
2173    return (
2174        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2175    )

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_connection( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2178@mcp_tool(
2179    domain="cloud",
2180    open_world=True,
2181    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2182)
2183def rename_cloud_connection(
2184    connection_id: Annotated[
2185        str,
2186        Field(description="The ID of the connection to rename."),
2187    ],
2188    name: Annotated[
2189        str,
2190        Field(description="New name for the connection."),
2191    ],
2192    *,
2193    workspace_id: Annotated[
2194        str | None,
2195        Field(
2196            description=WORKSPACE_ID_TIP_TEXT,
2197            default=None,
2198        ),
2199    ],
2200) -> str:
2201    """Rename a connection on Airbyte Cloud."""
2202    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2203    connection = workspace.get_connection(connection_id=connection_id)
2204    connection.rename(name=name)
2205    return (
2206        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2207        f"URL: {connection.connection_url}"
2208    )

Rename a connection on Airbyte Cloud.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_table_prefix( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], prefix: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New table prefix to use when syncing to the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2211@mcp_tool(
2212    domain="cloud",
2213    destructive=True,
2214    open_world=True,
2215    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2216)
2217def set_cloud_connection_table_prefix(
2218    connection_id: Annotated[
2219        str,
2220        Field(description="The ID of the connection to update."),
2221    ],
2222    prefix: Annotated[
2223        str,
2224        Field(description="New table prefix to use when syncing to the destination."),
2225    ],
2226    *,
2227    workspace_id: Annotated[
2228        str | None,
2229        Field(
2230            description=WORKSPACE_ID_TIP_TEXT,
2231            default=None,
2232        ),
2233    ],
2234) -> str:
2235    """Set the table prefix for a connection on Airbyte Cloud.
2236
2237    This is a destructive operation that can break downstream dependencies if the
2238    table prefix is changed incorrectly. Use with caution.
2239    """
2240    check_guid_created_in_session(connection_id)
2241    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2242    connection = workspace.get_connection(connection_id=connection_id)
2243    connection.set_table_prefix(prefix=prefix)
2244    return (
2245        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2246        f"URL: {connection.connection_url}"
2247    )

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_selected_streams( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], stream_names: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description='The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2250@mcp_tool(
2251    domain="cloud",
2252    destructive=True,
2253    open_world=True,
2254    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2255)
2256def set_cloud_connection_selected_streams(
2257    connection_id: Annotated[
2258        str,
2259        Field(description="The ID of the connection to update."),
2260    ],
2261    stream_names: Annotated[
2262        str | list[str],
2263        Field(
2264            description=(
2265                "The selected stream names to sync within the connection. "
2266                "Must be an explicit stream name or list of streams."
2267            )
2268        ),
2269    ],
2270    *,
2271    workspace_id: Annotated[
2272        str | None,
2273        Field(
2274            description=WORKSPACE_ID_TIP_TEXT,
2275            default=None,
2276        ),
2277    ],
2278) -> str:
2279    """Set the selected streams for a connection on Airbyte Cloud.
2280
2281    This is a destructive operation that can break existing connections if the
2282    stream selection is changed incorrectly. Use with caution.
2283    """
2284    check_guid_created_in_session(connection_id)
2285    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2286    connection = workspace.get_connection(connection_id=connection_id)
2287
2288    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2289    connection.set_selected_streams(stream_names=resolved_streams_list)
2290
2291    return (
2292        f"Successfully set selected streams for connection '{connection_id}' "
2293        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2294    )

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

@mcp_tool(domain='cloud', read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_connection_artifact( connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], artifact_type: Annotated[Literal['state', 'catalog'], FieldInfo(annotation=NoneType, required=True, description="The type of artifact to retrieve: 'state' or 'catalog'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
2297@mcp_tool(
2298    domain="cloud",
2299    read_only=True,
2300    idempotent=True,
2301    open_world=True,
2302    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2303)
2304def get_connection_artifact(
2305    connection_id: Annotated[
2306        str,
2307        Field(description="The ID of the Airbyte Cloud connection."),
2308    ],
2309    artifact_type: Annotated[
2310        Literal["state", "catalog"],
2311        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2312    ],
2313    *,
2314    workspace_id: Annotated[
2315        str | None,
2316        Field(
2317            description=WORKSPACE_ID_TIP_TEXT,
2318            default=None,
2319        ),
2320    ],
2321) -> dict[str, Any] | list[dict[str, Any]]:
2322    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2323
2324    Retrieves the specified artifact for a connection:
2325    - 'state': Returns the persisted state for incremental syncs as a list of
2326      stream state objects, or {"ERROR": "..."} if no state is set.
2327    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2328      or {"ERROR": "..."} if not found.
2329    """
2330    workspace: CloudWorkspace = _get_cloud_workspace(workspace_id)
2331    connection = workspace.get_connection(connection_id=connection_id)
2332
2333    if artifact_type == "state":
2334        result = connection.get_state_artifacts()
2335        if result is None:
2336            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2337        return result
2338
2339    # artifact_type == "catalog"
2340    result = connection.get_catalog_artifact()
2341    if result is None:
2342        return {"ERROR": "No catalog found for this connection"}
2343    return result

Get a connection artifact (state or catalog) from Airbyte Cloud.

Retrieves the specified artifact for a connection:

  • 'state': Returns the persisted state for incremental syncs as a list of stream state objects, or {"ERROR": "..."} if no state is set.
  • 'catalog': Returns the configured catalog (syncCatalog) as a dict, or {"ERROR": "..."} if not found.