airbyte.mcp.cloud

Airbyte Cloud MCP operations.

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations."""
   3
   4from pathlib import Path
   5from typing import Annotated, Any, Literal, cast
   6
   7from airbyte_api.models import JobTypeEnum
   8from fastmcp import Context, FastMCP
   9from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
  10from pydantic import BaseModel, Field
  11
  12from airbyte import cloud, get_destination, get_source
  13from airbyte._util import api_util
  14from airbyte.cloud.connectors import CustomCloudSourceDefinition
  15from airbyte.cloud.constants import FAILED_STATUSES
  16from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace
  17from airbyte.constants import (
  18    MCP_CONFIG_API_URL,
  19    MCP_CONFIG_BEARER_TOKEN,
  20    MCP_CONFIG_CLIENT_ID,
  21    MCP_CONFIG_CLIENT_SECRET,
  22    MCP_CONFIG_WORKSPACE_ID,
  23)
  24from airbyte.destinations.util import get_noop_destination
  25from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  26from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
  27from airbyte.mcp._tool_utils import (
  28    AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET,
  29    check_guid_created_in_session,
  30    register_guid_created_in_session,
  31)
  32from airbyte.secrets import SecretString
  33
  34
  35CLOUD_AUTH_TIP_TEXT = (
  36    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  37    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  38    "will be used to authenticate with the Airbyte Cloud API."
  39)
  40WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  41
  42
  43class CloudSourceResult(BaseModel):
  44    """Information about a deployed source connector in Airbyte Cloud."""
  45
  46    id: str
  47    """The source ID."""
  48    name: str
  49    """Display name of the source."""
  50    url: str
  51    """Web URL for managing this source in Airbyte Cloud."""
  52
  53
  54class CloudDestinationResult(BaseModel):
  55    """Information about a deployed destination connector in Airbyte Cloud."""
  56
  57    id: str
  58    """The destination ID."""
  59    name: str
  60    """Display name of the destination."""
  61    url: str
  62    """Web URL for managing this destination in Airbyte Cloud."""
  63
  64
  65class CloudConnectionResult(BaseModel):
  66    """Information about a deployed connection in Airbyte Cloud."""
  67
  68    id: str
  69    """The connection ID."""
  70    name: str
  71    """Display name of the connection."""
  72    url: str
  73    """Web URL for managing this connection in Airbyte Cloud."""
  74    source_id: str
  75    """ID of the source used by this connection."""
  76    destination_id: str
  77    """ID of the destination used by this connection."""
  78    last_job_status: str | None = None
  79    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  80    Only populated when with_connection_status=True."""
  81    last_job_id: int | None = None
  82    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  83    last_job_time: str | None = None
  84    """ISO 8601 timestamp of the most recent completed sync.
  85    Only populated when with_connection_status=True."""
  86    currently_running_job_id: int | None = None
  87    """Job ID of a currently running sync, if any.
  88    Only populated when with_connection_status=True."""
  89    currently_running_job_start_time: str | None = None
  90    """ISO 8601 timestamp of when the currently running sync started.
  91    Only populated when with_connection_status=True."""
  92
  93
  94class CloudSourceDetails(BaseModel):
  95    """Detailed information about a deployed source connector in Airbyte Cloud."""
  96
  97    source_id: str
  98    """The source ID."""
  99    source_name: str
 100    """Display name of the source."""
 101    source_url: str
 102    """Web URL for managing this source in Airbyte Cloud."""
 103    connector_definition_id: str
 104    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 105
 106
 107class CloudDestinationDetails(BaseModel):
 108    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 109
 110    destination_id: str
 111    """The destination ID."""
 112    destination_name: str
 113    """Display name of the destination."""
 114    destination_url: str
 115    """Web URL for managing this destination in Airbyte Cloud."""
 116    connector_definition_id: str
 117    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 118
 119
 120class CloudConnectionDetails(BaseModel):
 121    """Detailed information about a deployed connection in Airbyte Cloud."""
 122
 123    connection_id: str
 124    """The connection ID."""
 125    connection_name: str
 126    """Display name of the connection."""
 127    connection_url: str
 128    """Web URL for managing this connection in Airbyte Cloud."""
 129    source_id: str
 130    """ID of the source used by this connection."""
 131    source_name: str
 132    """Display name of the source."""
 133    destination_id: str
 134    """ID of the destination used by this connection."""
 135    destination_name: str
 136    """Display name of the destination."""
 137    selected_streams: list[str]
 138    """List of stream names selected for syncing."""
 139    table_prefix: str | None
 140    """Table prefix applied when syncing to the destination."""
 141
 142
 143class CloudOrganizationResult(BaseModel):
 144    """Information about an organization in Airbyte Cloud."""
 145
 146    id: str
 147    """The organization ID."""
 148    name: str
 149    """Display name of the organization."""
 150    email: str
 151    """Email associated with the organization."""
 152    payment_status: str | None = None
 153    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 154    When 'disabled', syncs are blocked due to unpaid invoices."""
 155    subscription_status: str | None = None
 156    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 157    'unsubscribed')."""
 158    is_account_locked: bool = False
 159    """Whether the account is locked due to billing issues.
 160    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 161    Defaults to False unless we have affirmative evidence of a locked state."""
 162
 163
 164class CloudWorkspaceResult(BaseModel):
 165    """Information about a workspace in Airbyte Cloud."""
 166
 167    workspace_id: str
 168    """The workspace ID."""
 169    workspace_name: str
 170    """Display name of the workspace."""
 171    workspace_url: str | None = None
 172    """URL to access the workspace in Airbyte Cloud."""
 173    organization_id: str
 174    """ID of the organization (requires ORGANIZATION_READER permission)."""
 175    organization_name: str | None = None
 176    """Name of the organization (requires ORGANIZATION_READER permission)."""
 177    payment_status: str | None = None
 178    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 179    When 'disabled', syncs are blocked due to unpaid invoices.
 180    Requires ORGANIZATION_READER permission."""
 181    subscription_status: str | None = None
 182    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 183    'unsubscribed'). Requires ORGANIZATION_READER permission."""
 184    is_account_locked: bool = False
 185    """Whether the account is locked due to billing issues.
 186    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 187    Defaults to False unless we have affirmative evidence of a locked state.
 188    Requires ORGANIZATION_READER permission."""
 189
 190
 191class LogReadResult(BaseModel):
 192    """Result of reading sync logs with pagination support."""
 193
 194    job_id: int
 195    """The job ID the logs belong to."""
 196    attempt_number: int
 197    """The attempt number the logs belong to."""
 198    log_text: str
 199    """The string containing the log text we are returning."""
 200    log_text_start_line: int
 201    """1-based line index of the first line returned."""
 202    log_text_line_count: int
 203    """Count of lines we are returning."""
 204    total_log_lines_available: int
 205    """Total number of log lines available, shows if any lines were missed due to the limit."""
 206
 207
 208class SyncJobResult(BaseModel):
 209    """Information about a sync job."""
 210
 211    job_id: int
 212    """The job ID."""
 213    status: str
 214    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
 215    bytes_synced: int
 216    """Number of bytes synced in this job."""
 217    records_synced: int
 218    """Number of records synced in this job."""
 219    start_time: str
 220    """ISO 8601 timestamp of when the job started."""
 221    job_url: str
 222    """URL to view the job in Airbyte Cloud."""
 223
 224
 225class SyncJobListResult(BaseModel):
 226    """Result of listing sync jobs with pagination support."""
 227
 228    jobs: list[SyncJobResult]
 229    """List of sync jobs."""
 230    jobs_count: int
 231    """Number of jobs returned in this response."""
 232    jobs_offset: int
 233    """Offset used for this request (0 if not specified)."""
 234    from_tail: bool
 235    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
 236
 237
 238def _get_cloud_workspace(
 239    ctx: Context,
 240    workspace_id: str | None = None,
 241) -> CloudWorkspace:
 242    """Get an authenticated CloudWorkspace.
 243
 244    Resolves credentials from multiple sources via MCP config args in order:
 245    1. HTTP headers (when running as MCP server with HTTP/SSE transport)
 246    2. Environment variables
 247
 248    The ctx parameter provides access to MCP config values that are resolved
 249    from HTTP headers or environment variables based on the config args
 250    defined in server.py.
 251    """
 252    resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID)
 253    if not resolved_workspace_id:
 254        raise PyAirbyteInputError(
 255            message="Workspace ID is required but not provided.",
 256            guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.",
 257        )
 258
 259    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
 260    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
 261    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
 262    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
 263
 264    return CloudWorkspace(
 265        workspace_id=resolved_workspace_id,
 266        client_id=SecretString(client_id) if client_id else None,
 267        client_secret=SecretString(client_secret) if client_secret else None,
 268        bearer_token=SecretString(bearer_token) if bearer_token else None,
 269        api_root=api_url,
 270    )
 271
 272
 273@mcp_tool(
 274    open_world=True,
 275    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 276)
 277def deploy_source_to_cloud(
 278    ctx: Context,
 279    source_name: Annotated[
 280        str,
 281        Field(description="The name to use when deploying the source."),
 282    ],
 283    source_connector_name: Annotated[
 284        str,
 285        Field(description="The name of the source connector (e.g., 'source-faker')."),
 286    ],
 287    *,
 288    workspace_id: Annotated[
 289        str | None,
 290        Field(
 291            description=WORKSPACE_ID_TIP_TEXT,
 292            default=None,
 293        ),
 294    ],
 295    config: Annotated[
 296        dict | str | None,
 297        Field(
 298            description="The configuration for the source connector.",
 299            default=None,
 300        ),
 301    ],
 302    config_secret_name: Annotated[
 303        str | None,
 304        Field(
 305            description="The name of the secret containing the configuration.",
 306            default=None,
 307        ),
 308    ],
 309    unique: Annotated[
 310        bool,
 311        Field(
 312            description="Whether to require a unique name.",
 313            default=True,
 314        ),
 315    ],
 316) -> str:
 317    """Deploy a source connector to Airbyte Cloud."""
 318    source = get_source(
 319        source_connector_name,
 320        no_executor=True,
 321    )
 322    config_dict = resolve_connector_config(
 323        config=config,
 324        config_secret_name=config_secret_name,
 325        config_spec_jsonschema=source.config_spec,
 326    )
 327    source.set_config(config_dict, validate=True)
 328
 329    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 330    deployed_source = workspace.deploy_source(
 331        name=source_name,
 332        source=source,
 333        unique=unique,
 334    )
 335
 336    register_guid_created_in_session(deployed_source.connector_id)
 337    return (
 338        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 339        f" and URL: {deployed_source.connector_url}"
 340    )
 341
 342
 343@mcp_tool(
 344    open_world=True,
 345    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 346)
 347def deploy_destination_to_cloud(
 348    ctx: Context,
 349    destination_name: Annotated[
 350        str,
 351        Field(description="The name to use when deploying the destination."),
 352    ],
 353    destination_connector_name: Annotated[
 354        str,
 355        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 356    ],
 357    *,
 358    workspace_id: Annotated[
 359        str | None,
 360        Field(
 361            description=WORKSPACE_ID_TIP_TEXT,
 362            default=None,
 363        ),
 364    ],
 365    config: Annotated[
 366        dict | str | None,
 367        Field(
 368            description="The configuration for the destination connector.",
 369            default=None,
 370        ),
 371    ],
 372    config_secret_name: Annotated[
 373        str | None,
 374        Field(
 375            description="The name of the secret containing the configuration.",
 376            default=None,
 377        ),
 378    ],
 379    unique: Annotated[
 380        bool,
 381        Field(
 382            description="Whether to require a unique name.",
 383            default=True,
 384        ),
 385    ],
 386) -> str:
 387    """Deploy a destination connector to Airbyte Cloud."""
 388    destination = get_destination(
 389        destination_connector_name,
 390        no_executor=True,
 391    )
 392    config_dict = resolve_connector_config(
 393        config=config,
 394        config_secret_name=config_secret_name,
 395        config_spec_jsonschema=destination.config_spec,
 396    )
 397    destination.set_config(config_dict, validate=True)
 398
 399    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 400    deployed_destination = workspace.deploy_destination(
 401        name=destination_name,
 402        destination=destination,
 403        unique=unique,
 404    )
 405
 406    register_guid_created_in_session(deployed_destination.connector_id)
 407    return (
 408        f"Successfully deployed destination '{destination_name}' "
 409        f"with ID: {deployed_destination.connector_id}"
 410    )
 411
 412
 413@mcp_tool(
 414    open_world=True,
 415    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 416)
 417def create_connection_on_cloud(
 418    ctx: Context,
 419    connection_name: Annotated[
 420        str,
 421        Field(description="The name of the connection."),
 422    ],
 423    source_id: Annotated[
 424        str,
 425        Field(description="The ID of the deployed source."),
 426    ],
 427    destination_id: Annotated[
 428        str,
 429        Field(description="The ID of the deployed destination."),
 430    ],
 431    selected_streams: Annotated[
 432        str | list[str],
 433        Field(
 434            description=(
 435                "The selected stream names to sync within the connection. "
 436                "Must be an explicit stream name or list of streams. "
 437                "Cannot be empty or '*'."
 438            )
 439        ),
 440    ],
 441    *,
 442    workspace_id: Annotated[
 443        str | None,
 444        Field(
 445            description=WORKSPACE_ID_TIP_TEXT,
 446            default=None,
 447        ),
 448    ],
 449    table_prefix: Annotated[
 450        str | None,
 451        Field(
 452            description="Optional table prefix to use when syncing to the destination.",
 453            default=None,
 454        ),
 455    ],
 456) -> str:
 457    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 458    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 459    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 460    deployed_connection = workspace.deploy_connection(
 461        connection_name=connection_name,
 462        source=source_id,
 463        destination=destination_id,
 464        selected_streams=resolved_streams_list,
 465        table_prefix=table_prefix,
 466    )
 467
 468    register_guid_created_in_session(deployed_connection.connection_id)
 469    return (
 470        f"Successfully created connection '{connection_name}' "
 471        f"with ID '{deployed_connection.connection_id}' and "
 472        f"URL: {deployed_connection.connection_url}"
 473    )
 474
 475
 476@mcp_tool(
 477    open_world=True,
 478    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 479)
 480def run_cloud_sync(
 481    ctx: Context,
 482    connection_id: Annotated[
 483        str,
 484        Field(description="The ID of the Airbyte Cloud connection."),
 485    ],
 486    *,
 487    workspace_id: Annotated[
 488        str | None,
 489        Field(
 490            description=WORKSPACE_ID_TIP_TEXT,
 491            default=None,
 492        ),
 493    ],
 494    wait: Annotated[
 495        bool,
 496        Field(
 497            description=(
 498                "Whether to wait for the sync to complete. Since a sync can take between several "
 499                "minutes and several hours, this option is not recommended for most "
 500                "scenarios."
 501            ),
 502            default=False,
 503        ),
 504    ],
 505    wait_timeout: Annotated[
 506        int,
 507        Field(
 508            description="Maximum time to wait for sync completion (seconds).",
 509            default=300,
 510        ),
 511    ],
 512) -> str:
 513    """Run a sync job on Airbyte Cloud."""
 514    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 515    connection = workspace.get_connection(connection_id=connection_id)
 516    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 517
 518    if wait:
 519        status = sync_result.get_job_status()
 520        return (
 521            f"Sync completed with status: {status}. "
 522            f"Job ID is '{sync_result.job_id}' and "
 523            f"job URL is: {sync_result.job_url}"
 524        )
 525    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 526
 527
 528@mcp_tool(
 529    read_only=True,
 530    idempotent=True,
 531    open_world=True,
 532    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 533)
 534def check_airbyte_cloud_workspace(
 535    ctx: Context,
 536    *,
 537    workspace_id: Annotated[
 538        str | None,
 539        Field(
 540            description=WORKSPACE_ID_TIP_TEXT,
 541            default=None,
 542        ),
 543    ],
 544) -> CloudWorkspaceResult:
 545    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 546
 547    Returns workspace details including workspace ID, name, organization info, and billing status.
 548    """
 549    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 550
 551    # Get workspace details from the public API using workspace's credentials
 552    workspace_response = api_util.get_workspace(
 553        workspace_id=workspace.workspace_id,
 554        api_root=workspace.api_root,
 555        client_id=workspace.client_id,
 556        client_secret=workspace.client_secret,
 557        bearer_token=workspace.bearer_token,
 558    )
 559
 560    # Try to get organization info (including billing), but fail gracefully if we don't have
 561    # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the
 562    # organization, which may not be available with workspace-scoped credentials.
 563    organization = workspace.get_organization(raise_on_error=False)
 564
 565    return CloudWorkspaceResult(
 566        workspace_id=workspace_response.workspace_id,
 567        workspace_name=workspace_response.name,
 568        workspace_url=workspace.workspace_url,
 569        organization_id=(
 570            organization.organization_id
 571            if organization
 572            else "[unavailable - requires ORGANIZATION_READER permission]"
 573        ),
 574        organization_name=organization.organization_name if organization else None,
 575        payment_status=organization.payment_status if organization else None,
 576        subscription_status=organization.subscription_status if organization else None,
 577        is_account_locked=organization.is_account_locked if organization else False,
 578    )
 579
 580
 581@mcp_tool(
 582    open_world=True,
 583    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 584)
 585def deploy_noop_destination_to_cloud(
 586    ctx: Context,
 587    name: str = "No-op Destination",
 588    *,
 589    workspace_id: Annotated[
 590        str | None,
 591        Field(
 592            description=WORKSPACE_ID_TIP_TEXT,
 593            default=None,
 594        ),
 595    ],
 596    unique: bool = True,
 597) -> str:
 598    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 599    destination = get_noop_destination()
 600    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 601    deployed_destination = workspace.deploy_destination(
 602        name=name,
 603        destination=destination,
 604        unique=unique,
 605    )
 606    register_guid_created_in_session(deployed_destination.connector_id)
 607    return (
 608        f"Successfully deployed No-op Destination "
 609        f"with ID '{deployed_destination.connector_id}' and "
 610        f"URL: {deployed_destination.connector_url}"
 611    )
 612
 613
 614@mcp_tool(
 615    read_only=True,
 616    idempotent=True,
 617    open_world=True,
 618    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 619)
 620def get_cloud_sync_status(
 621    ctx: Context,
 622    connection_id: Annotated[
 623        str,
 624        Field(
 625            description="The ID of the Airbyte Cloud connection.",
 626        ),
 627    ],
 628    job_id: Annotated[
 629        int | None,
 630        Field(
 631            description="Optional job ID. If not provided, the latest job will be used.",
 632            default=None,
 633        ),
 634    ],
 635    *,
 636    workspace_id: Annotated[
 637        str | None,
 638        Field(
 639            description=WORKSPACE_ID_TIP_TEXT,
 640            default=None,
 641        ),
 642    ],
 643    include_attempts: Annotated[
 644        bool,
 645        Field(
 646            description="Whether to include detailed attempts information.",
 647            default=False,
 648        ),
 649    ],
 650) -> dict[str, Any]:
 651    """Get the status of a sync job from the Airbyte Cloud."""
 652    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 653    connection = workspace.get_connection(connection_id=connection_id)
 654
 655    # If a job ID is provided, get the job by ID.
 656    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 657
 658    if not sync_result:
 659        return {"status": None, "job_id": None, "attempts": []}
 660
 661    result = {
 662        "status": sync_result.get_job_status(),
 663        "job_id": sync_result.job_id,
 664        "bytes_synced": sync_result.bytes_synced,
 665        "records_synced": sync_result.records_synced,
 666        "start_time": sync_result.start_time.isoformat(),
 667        "job_url": sync_result.job_url,
 668        "attempts": [],
 669    }
 670
 671    if include_attempts:
 672        attempts = sync_result.get_attempts()
 673        result["attempts"] = [
 674            {
 675                "attempt_number": attempt.attempt_number,
 676                "attempt_id": attempt.attempt_id,
 677                "status": attempt.status,
 678                "bytes_synced": attempt.bytes_synced,
 679                "records_synced": attempt.records_synced,
 680                "created_at": attempt.created_at.isoformat(),
 681            }
 682            for attempt in attempts
 683        ]
 684
 685    return result
 686
 687
 688@mcp_tool(
 689    read_only=True,
 690    idempotent=True,
 691    open_world=True,
 692    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 693)
 694def list_cloud_sync_jobs(
 695    ctx: Context,
 696    connection_id: Annotated[
 697        str,
 698        Field(description="The ID of the Airbyte Cloud connection."),
 699    ],
 700    *,
 701    workspace_id: Annotated[
 702        str | None,
 703        Field(
 704            description=WORKSPACE_ID_TIP_TEXT,
 705            default=None,
 706        ),
 707    ],
 708    max_jobs: Annotated[
 709        int,
 710        Field(
 711            description=(
 712                "Maximum number of jobs to return. "
 713                "Defaults to 20 if not specified. "
 714                "Maximum allowed value is 500."
 715            ),
 716            default=20,
 717        ),
 718    ],
 719    from_tail: Annotated[
 720        bool | None,
 721        Field(
 722            description=(
 723                "When True, jobs are ordered newest-first (createdAt DESC). "
 724                "When False, jobs are ordered oldest-first (createdAt ASC). "
 725                "Defaults to True if `jobs_offset` is not specified. "
 726                "Cannot combine `from_tail=True` with `jobs_offset`."
 727            ),
 728            default=None,
 729        ),
 730    ],
 731    jobs_offset: Annotated[
 732        int | None,
 733        Field(
 734            description=(
 735                "Number of jobs to skip from the beginning. "
 736                "Cannot be combined with `from_tail=True`."
 737            ),
 738            default=None,
 739        ),
 740    ],
 741    job_type: Annotated[
 742        JobTypeEnum | None,
 743        Field(
 744            description=(
 745                "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. "
 746                "If not specified, defaults to sync and reset jobs only (API default). "
 747                "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
 748            ),
 749            default=None,
 750        ),
 751    ],
 752) -> SyncJobListResult:
 753    """List sync jobs for a connection with pagination support.
 754
 755    This tool allows you to retrieve a list of sync jobs for a connection,
 756    with control over ordering and pagination. By default, jobs are returned
 757    newest-first (from_tail=True).
 758    """
 759    # Validate that jobs_offset and from_tail are not both set
 760    if jobs_offset is not None and from_tail is True:
 761        raise PyAirbyteInputError(
 762            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
 763            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
 764        )
 765
 766    # Default to from_tail=True if neither is specified
 767    if from_tail is None and jobs_offset is None:
 768        from_tail = True
 769    elif from_tail is None:
 770        from_tail = False
 771
 772    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 773    connection = workspace.get_connection(connection_id=connection_id)
 774
 775    # Cap at 500 to avoid overloading agent context
 776    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
 777
 778    sync_results = connection.get_previous_sync_logs(
 779        limit=effective_limit,
 780        offset=jobs_offset,
 781        from_tail=from_tail,
 782        job_type=job_type,
 783    )
 784
 785    jobs = [
 786        SyncJobResult(
 787            job_id=sync_result.job_id,
 788            status=str(sync_result.get_job_status()),
 789            bytes_synced=sync_result.bytes_synced,
 790            records_synced=sync_result.records_synced,
 791            start_time=sync_result.start_time.isoformat(),
 792            job_url=sync_result.job_url,
 793        )
 794        for sync_result in sync_results
 795    ]
 796
 797    return SyncJobListResult(
 798        jobs=jobs,
 799        jobs_count=len(jobs),
 800        jobs_offset=jobs_offset or 0,
 801        from_tail=from_tail,
 802    )
 803
 804
 805@mcp_tool(
 806    read_only=True,
 807    idempotent=True,
 808    open_world=True,
 809    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 810)
 811def list_deployed_cloud_source_connectors(
 812    ctx: Context,
 813    *,
 814    workspace_id: Annotated[
 815        str | None,
 816        Field(
 817            description=WORKSPACE_ID_TIP_TEXT,
 818            default=None,
 819        ),
 820    ],
 821    name_contains: Annotated[
 822        str | None,
 823        Field(
 824            description="Optional case-insensitive substring to filter sources by name",
 825            default=None,
 826        ),
 827    ],
 828    max_items_limit: Annotated[
 829        int | None,
 830        Field(
 831            description="Optional maximum number of items to return (default: no limit)",
 832            default=None,
 833        ),
 834    ],
 835) -> list[CloudSourceResult]:
 836    """List all deployed source connectors in the Airbyte Cloud workspace."""
 837    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 838    sources = workspace.list_sources()
 839
 840    # Filter by name if requested
 841    if name_contains:
 842        needle = name_contains.lower()
 843        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 844
 845    # Apply limit if requested
 846    if max_items_limit is not None:
 847        sources = sources[:max_items_limit]
 848
 849    # Note: name and url are guaranteed non-null from list API responses
 850    return [
 851        CloudSourceResult(
 852            id=source.source_id,
 853            name=cast(str, source.name),
 854            url=cast(str, source.connector_url),
 855        )
 856        for source in sources
 857    ]
 858
 859
 860@mcp_tool(
 861    read_only=True,
 862    idempotent=True,
 863    open_world=True,
 864    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 865)
 866def list_deployed_cloud_destination_connectors(
 867    ctx: Context,
 868    *,
 869    workspace_id: Annotated[
 870        str | None,
 871        Field(
 872            description=WORKSPACE_ID_TIP_TEXT,
 873            default=None,
 874        ),
 875    ],
 876    name_contains: Annotated[
 877        str | None,
 878        Field(
 879            description="Optional case-insensitive substring to filter destinations by name",
 880            default=None,
 881        ),
 882    ],
 883    max_items_limit: Annotated[
 884        int | None,
 885        Field(
 886            description="Optional maximum number of items to return (default: no limit)",
 887            default=None,
 888        ),
 889    ],
 890) -> list[CloudDestinationResult]:
 891    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 892    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 893    destinations = workspace.list_destinations()
 894
 895    # Filter by name if requested
 896    if name_contains:
 897        needle = name_contains.lower()
 898        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 899
 900    # Apply limit if requested
 901    if max_items_limit is not None:
 902        destinations = destinations[:max_items_limit]
 903
 904    # Note: name and url are guaranteed non-null from list API responses
 905    return [
 906        CloudDestinationResult(
 907            id=destination.destination_id,
 908            name=cast(str, destination.name),
 909            url=cast(str, destination.connector_url),
 910        )
 911        for destination in destinations
 912    ]
 913
 914
 915@mcp_tool(
 916    read_only=True,
 917    idempotent=True,
 918    open_world=True,
 919    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 920)
 921def describe_cloud_source(
 922    ctx: Context,
 923    source_id: Annotated[
 924        str,
 925        Field(description="The ID of the source to describe."),
 926    ],
 927    *,
 928    workspace_id: Annotated[
 929        str | None,
 930        Field(
 931            description=WORKSPACE_ID_TIP_TEXT,
 932            default=None,
 933        ),
 934    ],
 935) -> CloudSourceDetails:
 936    """Get detailed information about a specific deployed source connector."""
 937    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 938    source = workspace.get_source(source_id=source_id)
 939
 940    # Access name property to ensure _connector_info is populated
 941    source_name = cast(str, source.name)
 942
 943    return CloudSourceDetails(
 944        source_id=source.source_id,
 945        source_name=source_name,
 946        source_url=source.connector_url,
 947        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 948    )
 949
 950
 951@mcp_tool(
 952    read_only=True,
 953    idempotent=True,
 954    open_world=True,
 955    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 956)
 957def describe_cloud_destination(
 958    ctx: Context,
 959    destination_id: Annotated[
 960        str,
 961        Field(description="The ID of the destination to describe."),
 962    ],
 963    *,
 964    workspace_id: Annotated[
 965        str | None,
 966        Field(
 967            description=WORKSPACE_ID_TIP_TEXT,
 968            default=None,
 969        ),
 970    ],
 971) -> CloudDestinationDetails:
 972    """Get detailed information about a specific deployed destination connector."""
 973    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 974    destination = workspace.get_destination(destination_id=destination_id)
 975
 976    # Access name property to ensure _connector_info is populated
 977    destination_name = cast(str, destination.name)
 978
 979    return CloudDestinationDetails(
 980        destination_id=destination.destination_id,
 981        destination_name=destination_name,
 982        destination_url=destination.connector_url,
 983        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 984    )
 985
 986
 987@mcp_tool(
 988    read_only=True,
 989    idempotent=True,
 990    open_world=True,
 991    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 992)
 993def describe_cloud_connection(
 994    ctx: Context,
 995    connection_id: Annotated[
 996        str,
 997        Field(description="The ID of the connection to describe."),
 998    ],
 999    *,
1000    workspace_id: Annotated[
1001        str | None,
1002        Field(
1003            description=WORKSPACE_ID_TIP_TEXT,
1004            default=None,
1005        ),
1006    ],
1007) -> CloudConnectionDetails:
1008    """Get detailed information about a specific deployed connection."""
1009    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1010    connection = workspace.get_connection(connection_id=connection_id)
1011
1012    return CloudConnectionDetails(
1013        connection_id=connection.connection_id,
1014        connection_name=cast(str, connection.name),
1015        connection_url=cast(str, connection.connection_url),
1016        source_id=connection.source_id,
1017        source_name=cast(str, connection.source.name),
1018        destination_id=connection.destination_id,
1019        destination_name=cast(str, connection.destination.name),
1020        selected_streams=connection.stream_names,
1021        table_prefix=connection.table_prefix,
1022    )
1023
1024
1025@mcp_tool(
1026    read_only=True,
1027    idempotent=True,
1028    open_world=True,
1029    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1030)
1031def get_cloud_sync_logs(
1032    ctx: Context,
1033    connection_id: Annotated[
1034        str,
1035        Field(description="The ID of the Airbyte Cloud connection."),
1036    ],
1037    job_id: Annotated[
1038        int | None,
1039        Field(description="Optional job ID. If not provided, the latest job will be used."),
1040    ] = None,
1041    attempt_number: Annotated[
1042        int | None,
1043        Field(
1044            description="Optional attempt number. If not provided, the latest attempt will be used."
1045        ),
1046    ] = None,
1047    *,
1048    workspace_id: Annotated[
1049        str | None,
1050        Field(
1051            description=WORKSPACE_ID_TIP_TEXT,
1052            default=None,
1053        ),
1054    ],
1055    max_lines: Annotated[
1056        int,
1057        Field(
1058            description=(
1059                "Maximum number of lines to return. "
1060                "Defaults to 4000 if not specified. "
1061                "If '0' is provided, no limit is applied."
1062            ),
1063            default=4000,
1064        ),
1065    ],
1066    from_tail: Annotated[
1067        bool | None,
1068        Field(
1069            description=(
1070                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1071                "Defaults to True if `line_offset` is not specified. "
1072                "Cannot combine `from_tail=True` with `line_offset`."
1073            ),
1074            default=None,
1075        ),
1076    ],
1077    line_offset: Annotated[
1078        int | None,
1079        Field(
1080            description=(
1081                "Number of lines to skip from the beginning of the logs. "
1082                "Cannot be combined with `from_tail=True`."
1083            ),
1084            default=None,
1085        ),
1086    ],
1087) -> LogReadResult:
1088    """Get the logs from a sync job attempt on Airbyte Cloud."""
1089    # Validate that line_offset and from_tail are not both set
1090    if line_offset is not None and from_tail:
1091        raise PyAirbyteInputError(
1092            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1093            context={"line_offset": line_offset, "from_tail": from_tail},
1094        )
1095
1096    if from_tail is None and line_offset is None:
1097        from_tail = True
1098    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1099    connection = workspace.get_connection(connection_id=connection_id)
1100
1101    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1102
1103    if not sync_result:
1104        raise AirbyteMissingResourceError(
1105            resource_type="sync job",
1106            resource_name_or_id=connection_id,
1107        )
1108
1109    attempts = sync_result.get_attempts()
1110
1111    if not attempts:
1112        raise AirbyteMissingResourceError(
1113            resource_type="sync attempt",
1114            resource_name_or_id=str(sync_result.job_id),
1115        )
1116
1117    if attempt_number is not None:
1118        target_attempt = None
1119        for attempt in attempts:
1120            if attempt.attempt_number == attempt_number:
1121                target_attempt = attempt
1122                break
1123
1124        if target_attempt is None:
1125            raise AirbyteMissingResourceError(
1126                resource_type="sync attempt",
1127                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1128            )
1129    else:
1130        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1131
1132    logs = target_attempt.get_full_log_text()
1133
1134    if not logs:
1135        # Return empty result with zero lines
1136        return LogReadResult(
1137            log_text=(
1138                f"[No logs available for job '{sync_result.job_id}', "
1139                f"attempt {target_attempt.attempt_number}.]"
1140            ),
1141            log_text_start_line=1,
1142            log_text_line_count=0,
1143            total_log_lines_available=0,
1144            job_id=sync_result.job_id,
1145            attempt_number=target_attempt.attempt_number,
1146        )
1147
1148    # Apply line limiting
1149    log_lines = logs.splitlines()
1150    total_lines = len(log_lines)
1151
1152    # Determine effective max_lines (0 means no limit)
1153    effective_max = total_lines if max_lines == 0 else max_lines
1154
1155    # Calculate start_index and slice based on from_tail or line_offset
1156    if from_tail:
1157        start_index = max(0, total_lines - effective_max)
1158        selected_lines = log_lines[start_index:][:effective_max]
1159    else:
1160        start_index = line_offset or 0
1161        selected_lines = log_lines[start_index : start_index + effective_max]
1162
1163    return LogReadResult(
1164        log_text="\n".join(selected_lines),
1165        log_text_start_line=start_index + 1,  # Convert to 1-based index
1166        log_text_line_count=len(selected_lines),
1167        total_log_lines_available=total_lines,
1168        job_id=sync_result.job_id,
1169        attempt_number=target_attempt.attempt_number,
1170    )
1171
1172
1173@mcp_tool(
1174    read_only=True,
1175    idempotent=True,
1176    open_world=True,
1177    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1178)
1179def list_deployed_cloud_connections(
1180    ctx: Context,
1181    *,
1182    workspace_id: Annotated[
1183        str | None,
1184        Field(
1185            description=WORKSPACE_ID_TIP_TEXT,
1186            default=None,
1187        ),
1188    ],
1189    name_contains: Annotated[
1190        str | None,
1191        Field(
1192            description="Optional case-insensitive substring to filter connections by name",
1193            default=None,
1194        ),
1195    ],
1196    max_items_limit: Annotated[
1197        int | None,
1198        Field(
1199            description="Optional maximum number of items to return (default: no limit)",
1200            default=None,
1201        ),
1202    ],
1203    with_connection_status: Annotated[
1204        bool | None,
1205        Field(
1206            description="If True, include status info for each connection's most recent sync job",
1207            default=False,
1208        ),
1209    ],
1210    failing_connections_only: Annotated[
1211        bool | None,
1212        Field(
1213            description="If True, only return connections with failed/cancelled last sync",
1214            default=False,
1215        ),
1216    ],
1217) -> list[CloudConnectionResult]:
1218    """List all deployed connections in the Airbyte Cloud workspace.
1219
1220    When with_connection_status is True, each connection result will include
1221    information about the most recent sync job status, skipping over any
1222    currently in-progress syncs to find the last completed job.
1223
1224    When failing_connections_only is True, only connections where the most
1225    recent completed sync job failed or was cancelled will be returned.
1226    This implicitly enables with_connection_status.
1227    """
1228    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1229    connections = workspace.list_connections()
1230
1231    # Filter by name if requested
1232    if name_contains:
1233        needle = name_contains.lower()
1234        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1235
1236    # If failing_connections_only is True, implicitly enable with_connection_status
1237    if failing_connections_only:
1238        with_connection_status = True
1239
1240    results: list[CloudConnectionResult] = []
1241
1242    for connection in connections:
1243        last_job_status: str | None = None
1244        last_job_id: int | None = None
1245        last_job_time: str | None = None
1246        currently_running_job_id: int | None = None
1247        currently_running_job_start_time: str | None = None
1248
1249        if with_connection_status:
1250            sync_logs = connection.get_previous_sync_logs(limit=5)
1251            last_completed_job_status = None  # Keep enum for comparison
1252
1253            for sync_result in sync_logs:
1254                job_status = sync_result.get_job_status()
1255
1256                if not sync_result.is_job_complete():
1257                    currently_running_job_id = sync_result.job_id
1258                    currently_running_job_start_time = sync_result.start_time.isoformat()
1259                    continue
1260
1261                last_completed_job_status = job_status
1262                last_job_status = str(job_status.value) if job_status else None
1263                last_job_id = sync_result.job_id
1264                last_job_time = sync_result.start_time.isoformat()
1265                break
1266
1267            if failing_connections_only and (
1268                last_completed_job_status is None
1269                or last_completed_job_status not in FAILED_STATUSES
1270            ):
1271                continue
1272
1273        results.append(
1274            CloudConnectionResult(
1275                id=connection.connection_id,
1276                name=cast(str, connection.name),
1277                url=cast(str, connection.connection_url),
1278                source_id=connection.source_id,
1279                destination_id=connection.destination_id,
1280                last_job_status=last_job_status,
1281                last_job_id=last_job_id,
1282                last_job_time=last_job_time,
1283                currently_running_job_id=currently_running_job_id,
1284                currently_running_job_start_time=currently_running_job_start_time,
1285            )
1286        )
1287
1288        if max_items_limit is not None and len(results) >= max_items_limit:
1289            break
1290
1291    return results
1292
1293
1294def _resolve_organization(
1295    organization_id: str | None,
1296    organization_name: str | None,
1297    *,
1298    api_root: str,
1299    client_id: SecretString | None,
1300    client_secret: SecretString | None,
1301    bearer_token: SecretString | None = None,
1302) -> CloudOrganization:
1303    """Resolve organization from either ID or exact name match.
1304
1305    Args:
1306        organization_id: The organization ID (if provided directly)
1307        organization_name: The organization name (exact match required)
1308        api_root: The API root URL
1309        client_id: OAuth client ID (optional if bearer_token is provided)
1310        client_secret: OAuth client secret (optional if bearer_token is provided)
1311        bearer_token: Bearer token for authentication (optional if client credentials provided)
1312
1313    Returns:
1314        A CloudOrganization object with credentials for lazy loading of billing info.
1315
1316    Raises:
1317        PyAirbyteInputError: If neither or both parameters are provided,
1318            or if no organization matches the exact name
1319        AirbyteMissingResourceError: If the organization is not found
1320    """
1321    if organization_id and organization_name:
1322        raise PyAirbyteInputError(
1323            message="Provide either 'organization_id' or 'organization_name', not both."
1324        )
1325    if not organization_id and not organization_name:
1326        raise PyAirbyteInputError(
1327            message="Either 'organization_id' or 'organization_name' must be provided."
1328        )
1329
1330    # Get all organizations for the user
1331    orgs = api_util.list_organizations_for_user(
1332        api_root=api_root,
1333        client_id=client_id,
1334        client_secret=client_secret,
1335        bearer_token=bearer_token,
1336    )
1337
1338    org_response: api_util.models.OrganizationResponse | None = None
1339
1340    if organization_id:
1341        # Find by ID
1342        matching_orgs = [org for org in orgs if org.organization_id == organization_id]
1343        if not matching_orgs:
1344            raise AirbyteMissingResourceError(
1345                resource_type="organization",
1346                context={
1347                    "organization_id": organization_id,
1348                    "message": f"No organization found with ID '{organization_id}' "
1349                    "for the current user.",
1350                },
1351            )
1352        org_response = matching_orgs[0]
1353    else:
1354        # Find by exact name match (case-sensitive)
1355        matching_orgs = [org for org in orgs if org.organization_name == organization_name]
1356
1357        if not matching_orgs:
1358            raise AirbyteMissingResourceError(
1359                resource_type="organization",
1360                context={
1361                    "organization_name": organization_name,
1362                    "message": f"No organization found with exact name '{organization_name}' "
1363                    "for the current user.",
1364                },
1365            )
1366
1367        if len(matching_orgs) > 1:
1368            raise PyAirbyteInputError(
1369                message=f"Multiple organizations found with name '{organization_name}'. "
1370                "Please use 'organization_id' instead to specify the exact organization."
1371            )
1372
1373        org_response = matching_orgs[0]
1374
1375    # Return a CloudOrganization with credentials for lazy loading of billing info
1376    return CloudOrganization(
1377        organization_id=org_response.organization_id,
1378        organization_name=org_response.organization_name,
1379        email=org_response.email,
1380        api_root=api_root,
1381        client_id=client_id,
1382        client_secret=client_secret,
1383        bearer_token=bearer_token,
1384    )
1385
1386
1387def _resolve_organization_id(
1388    organization_id: str | None,
1389    organization_name: str | None,
1390    *,
1391    api_root: str,
1392    client_id: SecretString | None,
1393    client_secret: SecretString | None,
1394    bearer_token: SecretString | None = None,
1395) -> str:
1396    """Resolve organization ID from either ID or exact name match.
1397
1398    This is a convenience wrapper around _resolve_organization that returns just the ID.
1399    """
1400    org = _resolve_organization(
1401        organization_id=organization_id,
1402        organization_name=organization_name,
1403        api_root=api_root,
1404        client_id=client_id,
1405        client_secret=client_secret,
1406        bearer_token=bearer_token,
1407    )
1408    return org.organization_id
1409
1410
1411@mcp_tool(
1412    read_only=True,
1413    idempotent=True,
1414    open_world=True,
1415    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1416)
1417def list_cloud_workspaces(
1418    ctx: Context,
1419    *,
1420    organization_id: Annotated[
1421        str | None,
1422        Field(
1423            description="Organization ID. Required if organization_name is not provided.",
1424            default=None,
1425        ),
1426    ],
1427    organization_name: Annotated[
1428        str | None,
1429        Field(
1430            description=(
1431                "Organization name (exact match). " "Required if organization_id is not provided."
1432            ),
1433            default=None,
1434        ),
1435    ],
1436    name_contains: Annotated[
1437        str | None,
1438        Field(
1439            description="Optional substring to filter workspaces by name (server-side filtering)",
1440            default=None,
1441        ),
1442    ],
1443    max_items_limit: Annotated[
1444        int | None,
1445        Field(
1446            description="Optional maximum number of items to return (default: no limit)",
1447            default=None,
1448        ),
1449    ],
1450) -> list[CloudWorkspaceResult]:
1451    """List all workspaces in a specific organization.
1452
1453    Requires either organization_id OR organization_name (exact match) to be provided.
1454    This tool will NOT list workspaces across all organizations - you must specify
1455    which organization to list workspaces from.
1456    """
1457    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1458    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1459    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1460    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1461
1462    resolved_org_id = _resolve_organization_id(
1463        organization_id=organization_id,
1464        organization_name=organization_name,
1465        api_root=api_url,
1466        client_id=SecretString(client_id) if client_id else None,
1467        client_secret=SecretString(client_secret) if client_secret else None,
1468        bearer_token=SecretString(bearer_token) if bearer_token else None,
1469    )
1470
1471    workspaces = api_util.list_workspaces_in_organization(
1472        organization_id=resolved_org_id,
1473        api_root=api_url,
1474        client_id=SecretString(client_id) if client_id else None,
1475        client_secret=SecretString(client_secret) if client_secret else None,
1476        bearer_token=SecretString(bearer_token) if bearer_token else None,
1477        name_contains=name_contains,
1478        max_items_limit=max_items_limit,
1479    )
1480
1481    return [
1482        CloudWorkspaceResult(
1483            workspace_id=ws.get("workspaceId", ""),
1484            workspace_name=ws.get("name", ""),
1485            organization_id=ws.get("organizationId", ""),
1486        )
1487        for ws in workspaces
1488    ]
1489
1490
1491@mcp_tool(
1492    read_only=True,
1493    idempotent=True,
1494    open_world=True,
1495    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1496)
1497def describe_cloud_organization(
1498    ctx: Context,
1499    *,
1500    organization_id: Annotated[
1501        str | None,
1502        Field(
1503            description="Organization ID. Required if organization_name is not provided.",
1504            default=None,
1505        ),
1506    ],
1507    organization_name: Annotated[
1508        str | None,
1509        Field(
1510            description=(
1511                "Organization name (exact match). " "Required if organization_id is not provided."
1512            ),
1513            default=None,
1514        ),
1515    ],
1516) -> CloudOrganizationResult:
1517    """Get details about a specific organization including billing status.
1518
1519    Requires either organization_id OR organization_name (exact match) to be provided.
1520    This tool is useful for looking up an organization's ID from its name, or vice versa.
1521    """
1522    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1523    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1524    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1525    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1526
1527    org = _resolve_organization(
1528        organization_id=organization_id,
1529        organization_name=organization_name,
1530        api_root=api_url,
1531        client_id=SecretString(client_id) if client_id else None,
1532        client_secret=SecretString(client_secret) if client_secret else None,
1533        bearer_token=SecretString(bearer_token) if bearer_token else None,
1534    )
1535
1536    # CloudOrganization has lazy loading of billing properties
1537    return CloudOrganizationResult(
1538        id=org.organization_id,
1539        name=org.organization_name,
1540        email=org.email,
1541        payment_status=org.payment_status,
1542        subscription_status=org.subscription_status,
1543        is_account_locked=org.is_account_locked,
1544    )
1545
1546
1547def _get_custom_source_definition_description(
1548    custom_source: CustomCloudSourceDefinition,
1549) -> str:
1550    return "\n".join(
1551        [
1552            f" - Custom Source Name: {custom_source.name}",
1553            f" - Definition ID: {custom_source.definition_id}",
1554            f" - Definition Version: {custom_source.version}",
1555            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1556            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1557        ]
1558    )
1559
1560
1561@mcp_tool(
1562    open_world=True,
1563    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1564)
1565def publish_custom_source_definition(
1566    ctx: Context,
1567    name: Annotated[
1568        str,
1569        Field(description="The name for the custom connector definition."),
1570    ],
1571    *,
1572    workspace_id: Annotated[
1573        str | None,
1574        Field(
1575            description=WORKSPACE_ID_TIP_TEXT,
1576            default=None,
1577        ),
1578    ],
1579    manifest_yaml: Annotated[
1580        str | Path | None,
1581        Field(
1582            description=(
1583                "The Low-code CDK manifest as a YAML string or file path. "
1584                "Required for YAML connectors."
1585            ),
1586            default=None,
1587        ),
1588    ] = None,
1589    unique: Annotated[
1590        bool,
1591        Field(
1592            description="Whether to require a unique name.",
1593            default=True,
1594        ),
1595    ] = True,
1596    pre_validate: Annotated[
1597        bool,
1598        Field(
1599            description="Whether to validate the manifest client-side before publishing.",
1600            default=True,
1601        ),
1602    ] = True,
1603    testing_values: Annotated[
1604        dict | str | None,
1605        Field(
1606            description=(
1607                "Optional testing configuration values for the Builder UI. "
1608                "Can be provided as a JSON object or JSON string. "
1609                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1610                "If provided, these values replace any existing testing values "
1611                "for the connector builder project, allowing immediate test read operations."
1612            ),
1613            default=None,
1614        ),
1615    ],
1616    testing_values_secret_name: Annotated[
1617        str | None,
1618        Field(
1619            description=(
1620                "Optional name of a secret containing testing configuration values "
1621                "in JSON or YAML format. The secret will be resolved by the MCP "
1622                "server and merged into testing_values, with secret values taking "
1623                "precedence. This lets the agent reference secrets without sending "
1624                "raw values as tool arguments."
1625            ),
1626            default=None,
1627        ),
1628    ],
1629) -> str:
1630    """Publish a custom YAML source connector definition to Airbyte Cloud.
1631
1632    Note: Only YAML (declarative) connectors are currently supported.
1633    Docker-based custom sources are not yet available.
1634    """
1635    processed_manifest = manifest_yaml
1636    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1637        processed_manifest = Path(manifest_yaml)
1638
1639    # Resolve testing values from inline config and/or secret
1640    testing_values_dict: dict[str, Any] | None = None
1641    if testing_values is not None or testing_values_secret_name is not None:
1642        testing_values_dict = (
1643            resolve_connector_config(
1644                config=testing_values,
1645                config_secret_name=testing_values_secret_name,
1646            )
1647            or None
1648        )
1649
1650    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1651    custom_source = workspace.publish_custom_source_definition(
1652        name=name,
1653        manifest_yaml=processed_manifest,
1654        unique=unique,
1655        pre_validate=pre_validate,
1656        testing_values=testing_values_dict,
1657    )
1658    register_guid_created_in_session(custom_source.definition_id)
1659    return (
1660        "Successfully published custom YAML source definition:\n"
1661        + _get_custom_source_definition_description(
1662            custom_source=custom_source,
1663        )
1664        + "\n"
1665    )
1666
1667
1668@mcp_tool(
1669    read_only=True,
1670    idempotent=True,
1671    open_world=True,
1672)
1673def list_custom_source_definitions(
1674    ctx: Context,
1675    *,
1676    workspace_id: Annotated[
1677        str | None,
1678        Field(
1679            description=WORKSPACE_ID_TIP_TEXT,
1680            default=None,
1681        ),
1682    ],
1683) -> list[dict[str, Any]]:
1684    """List custom YAML source definitions in the Airbyte Cloud workspace.
1685
1686    Note: Only YAML (declarative) connectors are currently supported.
1687    Docker-based custom sources are not yet available.
1688    """
1689    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1690    definitions = workspace.list_custom_source_definitions(
1691        definition_type="yaml",
1692    )
1693
1694    return [
1695        {
1696            "definition_id": d.definition_id,
1697            "name": d.name,
1698            "version": d.version,
1699            "connector_builder_project_url": d.connector_builder_project_url,
1700        }
1701        for d in definitions
1702    ]
1703
1704
1705@mcp_tool(
1706    read_only=True,
1707    idempotent=True,
1708    open_world=True,
1709)
1710def get_custom_source_definition(
1711    ctx: Context,
1712    definition_id: Annotated[
1713        str,
1714        Field(description="The ID of the custom source definition to retrieve."),
1715    ],
1716    *,
1717    workspace_id: Annotated[
1718        str | None,
1719        Field(
1720            description=WORKSPACE_ID_TIP_TEXT,
1721            default=None,
1722        ),
1723    ],
1724) -> dict[str, Any]:
1725    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1726
1727    Returns the full definition details including the manifest YAML content,
1728    which can be used to inspect or store the connector configuration locally.
1729
1730    Note: Only YAML (declarative) connectors are currently supported.
1731    Docker-based custom sources are not yet available.
1732    """
1733    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1734    definition = workspace.get_custom_source_definition(
1735        definition_id=definition_id,
1736        definition_type="yaml",
1737    )
1738
1739    return {
1740        "definition_id": definition.definition_id,
1741        "name": definition.name,
1742        "version": definition.version,
1743        "connector_builder_project_id": definition.connector_builder_project_id,
1744        "connector_builder_project_url": definition.connector_builder_project_url,
1745        "manifest": definition.manifest,
1746    }
1747
1748
1749@mcp_tool(
1750    destructive=True,
1751    open_world=True,
1752)
1753def update_custom_source_definition(
1754    ctx: Context,
1755    definition_id: Annotated[
1756        str,
1757        Field(description="The ID of the definition to update."),
1758    ],
1759    manifest_yaml: Annotated[
1760        str | Path | None,
1761        Field(
1762            description=(
1763                "New manifest as YAML string or file path. "
1764                "Optional; omit to update only testing values."
1765            ),
1766            default=None,
1767        ),
1768    ] = None,
1769    *,
1770    workspace_id: Annotated[
1771        str | None,
1772        Field(
1773            description=WORKSPACE_ID_TIP_TEXT,
1774            default=None,
1775        ),
1776    ],
1777    pre_validate: Annotated[
1778        bool,
1779        Field(
1780            description="Whether to validate the manifest client-side before updating.",
1781            default=True,
1782        ),
1783    ] = True,
1784    testing_values: Annotated[
1785        dict | str | None,
1786        Field(
1787            description=(
1788                "Optional testing configuration values for the Builder UI. "
1789                "Can be provided as a JSON object or JSON string. "
1790                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1791                "If provided, these values replace any existing testing values "
1792                "for the connector builder project. The entire testing values object "
1793                "is overwritten, so pass the full set of values you want to persist."
1794            ),
1795            default=None,
1796        ),
1797    ],
1798    testing_values_secret_name: Annotated[
1799        str | None,
1800        Field(
1801            description=(
1802                "Optional name of a secret containing testing configuration values "
1803                "in JSON or YAML format. The secret will be resolved by the MCP "
1804                "server and merged into testing_values, with secret values taking "
1805                "precedence. This lets the agent reference secrets without sending "
1806                "raw values as tool arguments."
1807            ),
1808            default=None,
1809        ),
1810    ],
1811) -> str:
1812    """Update a custom YAML source definition in Airbyte Cloud.
1813
1814    Updates the manifest and/or testing values for an existing custom source definition.
1815    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1816    """
1817    check_guid_created_in_session(definition_id)
1818
1819    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1820
1821    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1822        raise PyAirbyteInputError(
1823            message=(
1824                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1825                "must be provided to update a custom source definition."
1826            ),
1827            context={
1828                "definition_id": definition_id,
1829                "workspace_id": workspace.workspace_id,
1830            },
1831        )
1832
1833    processed_manifest: str | Path | None = manifest_yaml
1834    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1835        processed_manifest = Path(manifest_yaml)
1836
1837    # Resolve testing values from inline config and/or secret
1838    testing_values_dict: dict[str, Any] | None = None
1839    if testing_values is not None or testing_values_secret_name is not None:
1840        testing_values_dict = (
1841            resolve_connector_config(
1842                config=testing_values,
1843                config_secret_name=testing_values_secret_name,
1844            )
1845            or None
1846        )
1847
1848    definition = workspace.get_custom_source_definition(
1849        definition_id=definition_id,
1850        definition_type="yaml",
1851    )
1852    custom_source: CustomCloudSourceDefinition = definition
1853
1854    if processed_manifest is not None:
1855        custom_source = definition.update_definition(
1856            manifest_yaml=processed_manifest,
1857            pre_validate=pre_validate,
1858        )
1859
1860    if testing_values_dict is not None:
1861        custom_source.set_testing_values(testing_values_dict)
1862
1863    return (
1864        "Successfully updated custom YAML source definition:\n"
1865        + _get_custom_source_definition_description(
1866            custom_source=custom_source,
1867        )
1868    )
1869
1870
1871@mcp_tool(
1872    destructive=True,
1873    open_world=True,
1874)
1875def permanently_delete_custom_source_definition(
1876    ctx: Context,
1877    definition_id: Annotated[
1878        str,
1879        Field(description="The ID of the custom source definition to delete."),
1880    ],
1881    name: Annotated[
1882        str,
1883        Field(description="The expected name of the custom source definition (for verification)."),
1884    ],
1885    *,
1886    workspace_id: Annotated[
1887        str | None,
1888        Field(
1889            description=WORKSPACE_ID_TIP_TEXT,
1890            default=None,
1891        ),
1892    ],
1893) -> str:
1894    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1895
1896    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1897    (case insensitive).
1898
1899    If the connector does not meet this requirement, the deletion will be rejected with a
1900    helpful error message. Instruct the user to rename the connector appropriately to authorize
1901    the deletion.
1902
1903    The provided name must match the actual name of the definition for the operation to proceed.
1904    This is a safety measure to ensure you are deleting the correct resource.
1905
1906    Note: Only YAML (declarative) connectors are currently supported.
1907    Docker-based custom sources are not yet available.
1908    """
1909    check_guid_created_in_session(definition_id)
1910    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1911    definition = workspace.get_custom_source_definition(
1912        definition_id=definition_id,
1913        definition_type="yaml",
1914    )
1915    actual_name: str = definition.name
1916
1917    # Verify the name matches
1918    if actual_name != name:
1919        raise PyAirbyteInputError(
1920            message=(
1921                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1922                "The provided name must exactly match the definition's actual name. "
1923                "This is a safety measure to prevent accidental deletion."
1924            ),
1925            context={
1926                "definition_id": definition_id,
1927                "expected_name": name,
1928                "actual_name": actual_name,
1929            },
1930        )
1931
1932    definition.permanently_delete(
1933        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1934    )
1935    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
1936
1937
1938@mcp_tool(
1939    destructive=True,
1940    open_world=True,
1941    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1942)
1943def permanently_delete_cloud_source(
1944    ctx: Context,
1945    source_id: Annotated[
1946        str,
1947        Field(description="The ID of the deployed source to delete."),
1948    ],
1949    name: Annotated[
1950        str,
1951        Field(description="The expected name of the source (for verification)."),
1952    ],
1953) -> str:
1954    """Permanently delete a deployed source connector from Airbyte Cloud.
1955
1956    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1957    (case insensitive).
1958
1959    If the source does not meet this requirement, the deletion will be rejected with a
1960    helpful error message. Instruct the user to rename the source appropriately to authorize
1961    the deletion.
1962
1963    The provided name must match the actual name of the source for the operation to proceed.
1964    This is a safety measure to ensure you are deleting the correct resource.
1965    """
1966    check_guid_created_in_session(source_id)
1967    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
1968    source = workspace.get_source(source_id=source_id)
1969    actual_name: str = cast(str, source.name)
1970
1971    # Verify the name matches
1972    if actual_name != name:
1973        raise PyAirbyteInputError(
1974            message=(
1975                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1976                "The provided name must exactly match the source's actual name. "
1977                "This is a safety measure to prevent accidental deletion."
1978            ),
1979            context={
1980                "source_id": source_id,
1981                "expected_name": name,
1982                "actual_name": actual_name,
1983            },
1984        )
1985
1986    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1987    workspace.permanently_delete_source(
1988        source=source_id,
1989        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1990    )
1991    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
1992
1993
1994@mcp_tool(
1995    destructive=True,
1996    open_world=True,
1997    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1998)
1999def permanently_delete_cloud_destination(
2000    ctx: Context,
2001    destination_id: Annotated[
2002        str,
2003        Field(description="The ID of the deployed destination to delete."),
2004    ],
2005    name: Annotated[
2006        str,
2007        Field(description="The expected name of the destination (for verification)."),
2008    ],
2009) -> str:
2010    """Permanently delete a deployed destination connector from Airbyte Cloud.
2011
2012    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
2013    (case insensitive).
2014
2015    If the destination does not meet this requirement, the deletion will be rejected with a
2016    helpful error message. Instruct the user to rename the destination appropriately to authorize
2017    the deletion.
2018
2019    The provided name must match the actual name of the destination for the operation to proceed.
2020    This is a safety measure to ensure you are deleting the correct resource.
2021    """
2022    check_guid_created_in_session(destination_id)
2023    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2024    destination = workspace.get_destination(destination_id=destination_id)
2025    actual_name: str = cast(str, destination.name)
2026
2027    # Verify the name matches
2028    if actual_name != name:
2029        raise PyAirbyteInputError(
2030            message=(
2031                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2032                "The provided name must exactly match the destination's actual name. "
2033                "This is a safety measure to prevent accidental deletion."
2034            ),
2035            context={
2036                "destination_id": destination_id,
2037                "expected_name": name,
2038                "actual_name": actual_name,
2039            },
2040        )
2041
2042    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2043    workspace.permanently_delete_destination(
2044        destination=destination_id,
2045        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2046    )
2047    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
2048
2049
2050@mcp_tool(
2051    destructive=True,
2052    open_world=True,
2053    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2054)
2055def permanently_delete_cloud_connection(
2056    ctx: Context,
2057    connection_id: Annotated[
2058        str,
2059        Field(description="The ID of the connection to delete."),
2060    ],
2061    name: Annotated[
2062        str,
2063        Field(description="The expected name of the connection (for verification)."),
2064    ],
2065    *,
2066    cascade_delete_source: Annotated[
2067        bool,
2068        Field(
2069            description=(
2070                "Whether to also delete the source connector associated with this connection."
2071            ),
2072            default=False,
2073        ),
2074    ] = False,
2075    cascade_delete_destination: Annotated[
2076        bool,
2077        Field(
2078            description=(
2079                "Whether to also delete the destination connector associated with this connection."
2080            ),
2081            default=False,
2082        ),
2083    ] = False,
2084) -> str:
2085    """Permanently delete a connection from Airbyte Cloud.
2086
2087    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2088    (case insensitive).
2089
2090    If the connection does not meet this requirement, the deletion will be rejected with a
2091    helpful error message. Instruct the user to rename the connection appropriately to authorize
2092    the deletion.
2093
2094    The provided name must match the actual name of the connection for the operation to proceed.
2095    This is a safety measure to ensure you are deleting the correct resource.
2096    """
2097    check_guid_created_in_session(connection_id)
2098    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2099    connection = workspace.get_connection(connection_id=connection_id)
2100    actual_name: str = cast(str, connection.name)
2101
2102    # Verify the name matches
2103    if actual_name != name:
2104        raise PyAirbyteInputError(
2105            message=(
2106                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2107                "The provided name must exactly match the connection's actual name. "
2108                "This is a safety measure to prevent accidental deletion."
2109            ),
2110            context={
2111                "connection_id": connection_id,
2112                "expected_name": name,
2113                "actual_name": actual_name,
2114            },
2115        )
2116
2117    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2118    workspace.permanently_delete_connection(
2119        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2120        connection=connection_id,
2121        cascade_delete_source=cascade_delete_source,
2122        cascade_delete_destination=cascade_delete_destination,
2123    )
2124    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
2125
2126
2127@mcp_tool(
2128    open_world=True,
2129    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2130)
2131def rename_cloud_source(
2132    ctx: Context,
2133    source_id: Annotated[
2134        str,
2135        Field(description="The ID of the deployed source to rename."),
2136    ],
2137    name: Annotated[
2138        str,
2139        Field(description="New name for the source."),
2140    ],
2141    *,
2142    workspace_id: Annotated[
2143        str | None,
2144        Field(
2145            description=WORKSPACE_ID_TIP_TEXT,
2146            default=None,
2147        ),
2148    ],
2149) -> str:
2150    """Rename a deployed source connector on Airbyte Cloud."""
2151    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2152    source = workspace.get_source(source_id=source_id)
2153    source.rename(name=name)
2154    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
2155
2156
2157@mcp_tool(
2158    destructive=True,
2159    open_world=True,
2160    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2161)
2162def update_cloud_source_config(
2163    ctx: Context,
2164    source_id: Annotated[
2165        str,
2166        Field(description="The ID of the deployed source to update."),
2167    ],
2168    config: Annotated[
2169        dict | str,
2170        Field(
2171            description="New configuration for the source connector.",
2172        ),
2173    ],
2174    config_secret_name: Annotated[
2175        str | None,
2176        Field(
2177            description="The name of the secret containing the configuration.",
2178            default=None,
2179        ),
2180    ] = None,
2181    *,
2182    workspace_id: Annotated[
2183        str | None,
2184        Field(
2185            description=WORKSPACE_ID_TIP_TEXT,
2186            default=None,
2187        ),
2188    ],
2189) -> str:
2190    """Update a deployed source connector's configuration on Airbyte Cloud.
2191
2192    This is a destructive operation that can break existing connections if the
2193    configuration is changed incorrectly. Use with caution.
2194    """
2195    check_guid_created_in_session(source_id)
2196    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2197    source = workspace.get_source(source_id=source_id)
2198
2199    config_dict = resolve_connector_config(
2200        config=config,
2201        config_secret_name=config_secret_name,
2202        config_spec_jsonschema=None,  # We don't have the spec here
2203    )
2204
2205    source.update_config(config=config_dict)
2206    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
2207
2208
2209@mcp_tool(
2210    open_world=True,
2211    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2212)
2213def rename_cloud_destination(
2214    ctx: Context,
2215    destination_id: Annotated[
2216        str,
2217        Field(description="The ID of the deployed destination to rename."),
2218    ],
2219    name: Annotated[
2220        str,
2221        Field(description="New name for the destination."),
2222    ],
2223    *,
2224    workspace_id: Annotated[
2225        str | None,
2226        Field(
2227            description=WORKSPACE_ID_TIP_TEXT,
2228            default=None,
2229        ),
2230    ],
2231) -> str:
2232    """Rename a deployed destination connector on Airbyte Cloud."""
2233    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2234    destination = workspace.get_destination(destination_id=destination_id)
2235    destination.rename(name=name)
2236    return (
2237        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2238        f"URL: {destination.connector_url}"
2239    )
2240
2241
2242@mcp_tool(
2243    destructive=True,
2244    open_world=True,
2245    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2246)
2247def update_cloud_destination_config(
2248    ctx: Context,
2249    destination_id: Annotated[
2250        str,
2251        Field(description="The ID of the deployed destination to update."),
2252    ],
2253    config: Annotated[
2254        dict | str,
2255        Field(
2256            description="New configuration for the destination connector.",
2257        ),
2258    ],
2259    config_secret_name: Annotated[
2260        str | None,
2261        Field(
2262            description="The name of the secret containing the configuration.",
2263            default=None,
2264        ),
2265    ],
2266    *,
2267    workspace_id: Annotated[
2268        str | None,
2269        Field(
2270            description=WORKSPACE_ID_TIP_TEXT,
2271            default=None,
2272        ),
2273    ],
2274) -> str:
2275    """Update a deployed destination connector's configuration on Airbyte Cloud.
2276
2277    This is a destructive operation that can break existing connections if the
2278    configuration is changed incorrectly. Use with caution.
2279    """
2280    check_guid_created_in_session(destination_id)
2281    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2282    destination = workspace.get_destination(destination_id=destination_id)
2283
2284    config_dict = resolve_connector_config(
2285        config=config,
2286        config_secret_name=config_secret_name,
2287        config_spec_jsonschema=None,  # We don't have the spec here
2288    )
2289
2290    destination.update_config(config=config_dict)
2291    return (
2292        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2293    )
2294
2295
2296@mcp_tool(
2297    open_world=True,
2298    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2299)
2300def rename_cloud_connection(
2301    ctx: Context,
2302    connection_id: Annotated[
2303        str,
2304        Field(description="The ID of the connection to rename."),
2305    ],
2306    name: Annotated[
2307        str,
2308        Field(description="New name for the connection."),
2309    ],
2310    *,
2311    workspace_id: Annotated[
2312        str | None,
2313        Field(
2314            description=WORKSPACE_ID_TIP_TEXT,
2315            default=None,
2316        ),
2317    ],
2318) -> str:
2319    """Rename a connection on Airbyte Cloud."""
2320    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2321    connection = workspace.get_connection(connection_id=connection_id)
2322    connection.rename(name=name)
2323    return (
2324        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2325        f"URL: {connection.connection_url}"
2326    )
2327
2328
2329@mcp_tool(
2330    destructive=True,
2331    open_world=True,
2332    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2333)
2334def set_cloud_connection_table_prefix(
2335    ctx: Context,
2336    connection_id: Annotated[
2337        str,
2338        Field(description="The ID of the connection to update."),
2339    ],
2340    prefix: Annotated[
2341        str,
2342        Field(description="New table prefix to use when syncing to the destination."),
2343    ],
2344    *,
2345    workspace_id: Annotated[
2346        str | None,
2347        Field(
2348            description=WORKSPACE_ID_TIP_TEXT,
2349            default=None,
2350        ),
2351    ],
2352) -> str:
2353    """Set the table prefix for a connection on Airbyte Cloud.
2354
2355    This is a destructive operation that can break downstream dependencies if the
2356    table prefix is changed incorrectly. Use with caution.
2357    """
2358    check_guid_created_in_session(connection_id)
2359    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2360    connection = workspace.get_connection(connection_id=connection_id)
2361    connection.set_table_prefix(prefix=prefix)
2362    return (
2363        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2364        f"URL: {connection.connection_url}"
2365    )
2366
2367
2368@mcp_tool(
2369    destructive=True,
2370    open_world=True,
2371    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2372)
2373def set_cloud_connection_selected_streams(
2374    ctx: Context,
2375    connection_id: Annotated[
2376        str,
2377        Field(description="The ID of the connection to update."),
2378    ],
2379    stream_names: Annotated[
2380        str | list[str],
2381        Field(
2382            description=(
2383                "The selected stream names to sync within the connection. "
2384                "Must be an explicit stream name or list of streams."
2385            )
2386        ),
2387    ],
2388    *,
2389    workspace_id: Annotated[
2390        str | None,
2391        Field(
2392            description=WORKSPACE_ID_TIP_TEXT,
2393            default=None,
2394        ),
2395    ],
2396) -> str:
2397    """Set the selected streams for a connection on Airbyte Cloud.
2398
2399    This is a destructive operation that can break existing connections if the
2400    stream selection is changed incorrectly. Use with caution.
2401    """
2402    check_guid_created_in_session(connection_id)
2403    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2404    connection = workspace.get_connection(connection_id=connection_id)
2405
2406    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2407    connection.set_selected_streams(stream_names=resolved_streams_list)
2408
2409    return (
2410        f"Successfully set selected streams for connection '{connection_id}' "
2411        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2412    )
2413
2414
2415@mcp_tool(
2416    open_world=True,
2417    destructive=True,
2418    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2419)
2420def update_cloud_connection(
2421    ctx: Context,
2422    connection_id: Annotated[
2423        str,
2424        Field(description="The ID of the connection to update."),
2425    ],
2426    *,
2427    enabled: Annotated[
2428        bool | None,
2429        Field(
2430            description=(
2431                "Set the connection's enabled status. "
2432                "True enables the connection (status='active'), "
2433                "False disables it (status='inactive'). "
2434                "Leave unset to keep the current status."
2435            ),
2436            default=None,
2437        ),
2438    ],
2439    cron_expression: Annotated[
2440        str | None,
2441        Field(
2442            description=(
2443                "A cron expression defining when syncs should run. "
2444                "Examples: '0 0 * * *' (daily at midnight UTC), "
2445                "'0 */6 * * *' (every 6 hours), "
2446                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2447                "Leave unset to keep the current schedule. "
2448                "Cannot be used together with 'manual_schedule'."
2449            ),
2450            default=None,
2451        ),
2452    ],
2453    manual_schedule: Annotated[
2454        bool | None,
2455        Field(
2456            description=(
2457                "Set to True to disable automatic syncs (manual scheduling only). "
2458                "Syncs will only run when manually triggered. "
2459                "Cannot be used together with 'cron_expression'."
2460            ),
2461            default=None,
2462        ),
2463    ],
2464    workspace_id: Annotated[
2465        str | None,
2466        Field(
2467            description=WORKSPACE_ID_TIP_TEXT,
2468            default=None,
2469        ),
2470    ],
2471) -> str:
2472    """Update a connection's settings on Airbyte Cloud.
2473
2474    This tool allows updating multiple connection settings in a single call:
2475    - Enable or disable the connection
2476    - Set a cron schedule for automatic syncs
2477    - Switch to manual scheduling (no automatic syncs)
2478
2479    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2480    parameters are mutually exclusive.
2481    """
2482    check_guid_created_in_session(connection_id)
2483
2484    # Validate that at least one setting is provided
2485    if enabled is None and cron_expression is None and manual_schedule is None:
2486        raise ValueError(
2487            "At least one setting must be provided: 'enabled', 'cron_expression', "
2488            "or 'manual_schedule'."
2489        )
2490
2491    # Validate mutually exclusive schedule options
2492    if cron_expression is not None and manual_schedule is True:
2493        raise ValueError(
2494            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2495            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2496            "for manual-only syncs."
2497        )
2498
2499    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2500    connection = workspace.get_connection(connection_id=connection_id)
2501
2502    changes_made: list[str] = []
2503
2504    # Apply enabled status change
2505    if enabled is not None:
2506        connection.set_enabled(enabled=enabled)
2507        status_str = "enabled" if enabled else "disabled"
2508        changes_made.append(f"status set to '{status_str}'")
2509
2510    # Apply schedule change
2511    if cron_expression is not None:
2512        connection.set_schedule(cron_expression=cron_expression)
2513        changes_made.append(f"schedule set to '{cron_expression}'")
2514    elif manual_schedule is True:
2515        connection.set_manual_schedule()
2516        changes_made.append("schedule set to 'manual'")
2517
2518    changes_summary = ", ".join(changes_made)
2519    return (
2520        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2521        f"URL: {connection.connection_url}"
2522    )
2523
2524
2525@mcp_tool(
2526    read_only=True,
2527    idempotent=True,
2528    open_world=True,
2529    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2530)
2531def get_connection_artifact(
2532    ctx: Context,
2533    connection_id: Annotated[
2534        str,
2535        Field(description="The ID of the Airbyte Cloud connection."),
2536    ],
2537    artifact_type: Annotated[
2538        Literal["state", "catalog"],
2539        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2540    ],
2541    *,
2542    workspace_id: Annotated[
2543        str | None,
2544        Field(
2545            description=WORKSPACE_ID_TIP_TEXT,
2546            default=None,
2547        ),
2548    ],
2549) -> dict[str, Any]:
2550    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2551
2552    Retrieves the specified artifact for a connection:
2553    - 'state': Returns the full raw connection state including stateType and all
2554      state data, or {"ERROR": "..."} if no state is set.
2555    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2556      or {"ERROR": "..."} if not found.
2557    """
2558    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2559    connection = workspace.get_connection(connection_id=connection_id)
2560
2561    if artifact_type == "state":
2562        result = connection.dump_raw_state()
2563        if result.get("stateType") == "not_set":
2564            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2565        return result
2566
2567    # artifact_type == "catalog"
2568    result = connection.dump_raw_catalog()
2569    if result is None:
2570        return {"ERROR": "No catalog found for this connection"}
2571    return result
2572
2573
2574def register_cloud_tools(app: FastMCP) -> None:
2575    """Register cloud tools with the FastMCP app.
2576
2577    Args:
2578        app: FastMCP application instance
2579    """
2580    register_mcp_tools(
2581        app,
2582        mcp_module=__name__,
2583        exclude_args=["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None,
2584    )
CLOUD_AUTH_TIP_TEXT = 'By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables will be used to authenticate with the Airbyte Cloud API.'
WORKSPACE_ID_TIP_TEXT = 'Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.'
class CloudSourceResult(pydantic.main.BaseModel):
44class CloudSourceResult(BaseModel):
45    """Information about a deployed source connector in Airbyte Cloud."""
46
47    id: str
48    """The source ID."""
49    name: str
50    """Display name of the source."""
51    url: str
52    """Web URL for managing this source in Airbyte Cloud."""

Information about a deployed source connector in Airbyte Cloud.

id: str = PydanticUndefined

The source ID.

name: str = PydanticUndefined

Display name of the source.

url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

class CloudDestinationResult(pydantic.main.BaseModel):
55class CloudDestinationResult(BaseModel):
56    """Information about a deployed destination connector in Airbyte Cloud."""
57
58    id: str
59    """The destination ID."""
60    name: str
61    """Display name of the destination."""
62    url: str
63    """Web URL for managing this destination in Airbyte Cloud."""

Information about a deployed destination connector in Airbyte Cloud.

id: str = PydanticUndefined

The destination ID.

name: str = PydanticUndefined

Display name of the destination.

url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

class CloudConnectionResult(pydantic.main.BaseModel):
66class CloudConnectionResult(BaseModel):
67    """Information about a deployed connection in Airbyte Cloud."""
68
69    id: str
70    """The connection ID."""
71    name: str
72    """Display name of the connection."""
73    url: str
74    """Web URL for managing this connection in Airbyte Cloud."""
75    source_id: str
76    """ID of the source used by this connection."""
77    destination_id: str
78    """ID of the destination used by this connection."""
79    last_job_status: str | None = None
80    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
81    Only populated when with_connection_status=True."""
82    last_job_id: int | None = None
83    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
84    last_job_time: str | None = None
85    """ISO 8601 timestamp of the most recent completed sync.
86    Only populated when with_connection_status=True."""
87    currently_running_job_id: int | None = None
88    """Job ID of a currently running sync, if any.
89    Only populated when with_connection_status=True."""
90    currently_running_job_start_time: str | None = None
91    """ISO 8601 timestamp of when the currently running sync started.
92    Only populated when with_connection_status=True."""

Information about a deployed connection in Airbyte Cloud.

id: str = PydanticUndefined

The connection ID.

name: str = PydanticUndefined

Display name of the connection.

url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

last_job_status: str | None = None

Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.

last_job_id: int | None = None

Job ID of the most recent completed sync. Only populated when with_connection_status=True.

last_job_time: str | None = None

ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.

currently_running_job_id: int | None = None

Job ID of a currently running sync, if any. Only populated when with_connection_status=True.

currently_running_job_start_time: str | None = None

ISO 8601 timestamp of when the currently running sync started. Only populated when with_connection_status=True.

class CloudSourceDetails(pydantic.main.BaseModel):
 95class CloudSourceDetails(BaseModel):
 96    """Detailed information about a deployed source connector in Airbyte Cloud."""
 97
 98    source_id: str
 99    """The source ID."""
100    source_name: str
101    """Display name of the source."""
102    source_url: str
103    """Web URL for managing this source in Airbyte Cloud."""
104    connector_definition_id: str
105    """The connector definition ID (e.g., the ID for 'source-postgres')."""

Detailed information about a deployed source connector in Airbyte Cloud.

source_id: str = PydanticUndefined

The source ID.

source_name: str = PydanticUndefined

Display name of the source.

source_url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'source-postgres').

class CloudDestinationDetails(pydantic.main.BaseModel):
108class CloudDestinationDetails(BaseModel):
109    """Detailed information about a deployed destination connector in Airbyte Cloud."""
110
111    destination_id: str
112    """The destination ID."""
113    destination_name: str
114    """Display name of the destination."""
115    destination_url: str
116    """Web URL for managing this destination in Airbyte Cloud."""
117    connector_definition_id: str
118    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""

Detailed information about a deployed destination connector in Airbyte Cloud.

destination_id: str = PydanticUndefined

The destination ID.

destination_name: str = PydanticUndefined

Display name of the destination.

destination_url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'destination-snowflake').

class CloudConnectionDetails(pydantic.main.BaseModel):
121class CloudConnectionDetails(BaseModel):
122    """Detailed information about a deployed connection in Airbyte Cloud."""
123
124    connection_id: str
125    """The connection ID."""
126    connection_name: str
127    """Display name of the connection."""
128    connection_url: str
129    """Web URL for managing this connection in Airbyte Cloud."""
130    source_id: str
131    """ID of the source used by this connection."""
132    source_name: str
133    """Display name of the source."""
134    destination_id: str
135    """ID of the destination used by this connection."""
136    destination_name: str
137    """Display name of the destination."""
138    selected_streams: list[str]
139    """List of stream names selected for syncing."""
140    table_prefix: str | None
141    """Table prefix applied when syncing to the destination."""

Detailed information about a deployed connection in Airbyte Cloud.

connection_id: str = PydanticUndefined

The connection ID.

connection_name: str = PydanticUndefined

Display name of the connection.

connection_url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

source_name: str = PydanticUndefined

Display name of the source.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

destination_name: str = PydanticUndefined

Display name of the destination.

selected_streams: list[str] = PydanticUndefined

List of stream names selected for syncing.

table_prefix: str | None = PydanticUndefined

Table prefix applied when syncing to the destination.

class CloudOrganizationResult(pydantic.main.BaseModel):
144class CloudOrganizationResult(BaseModel):
145    """Information about an organization in Airbyte Cloud."""
146
147    id: str
148    """The organization ID."""
149    name: str
150    """Display name of the organization."""
151    email: str
152    """Email associated with the organization."""
153    payment_status: str | None = None
154    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
155    When 'disabled', syncs are blocked due to unpaid invoices."""
156    subscription_status: str | None = None
157    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
158    'unsubscribed')."""
159    is_account_locked: bool = False
160    """Whether the account is locked due to billing issues.
161    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
162    Defaults to False unless we have affirmative evidence of a locked state."""

Information about an organization in Airbyte Cloud.

id: str = PydanticUndefined

The organization ID.

name: str = PydanticUndefined

Display name of the organization.

email: str = PydanticUndefined

Email associated with the organization.

payment_status: str | None = None

Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices.

subscription_status: str | None = None

Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 'unsubscribed').

is_account_locked: bool = False

Whether the account is locked due to billing issues. True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.

class CloudWorkspaceResult(pydantic.main.BaseModel):
165class CloudWorkspaceResult(BaseModel):
166    """Information about a workspace in Airbyte Cloud."""
167
168    workspace_id: str
169    """The workspace ID."""
170    workspace_name: str
171    """Display name of the workspace."""
172    workspace_url: str | None = None
173    """URL to access the workspace in Airbyte Cloud."""
174    organization_id: str
175    """ID of the organization (requires ORGANIZATION_READER permission)."""
176    organization_name: str | None = None
177    """Name of the organization (requires ORGANIZATION_READER permission)."""
178    payment_status: str | None = None
179    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
180    When 'disabled', syncs are blocked due to unpaid invoices.
181    Requires ORGANIZATION_READER permission."""
182    subscription_status: str | None = None
183    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
184    'unsubscribed'). Requires ORGANIZATION_READER permission."""
185    is_account_locked: bool = False
186    """Whether the account is locked due to billing issues.
187    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
188    Defaults to False unless we have affirmative evidence of a locked state.
189    Requires ORGANIZATION_READER permission."""

Information about a workspace in Airbyte Cloud.

workspace_id: str = PydanticUndefined

The workspace ID.

workspace_name: str = PydanticUndefined

Display name of the workspace.

workspace_url: str | None = None

URL to access the workspace in Airbyte Cloud.

organization_id: str = PydanticUndefined

ID of the organization (requires ORGANIZATION_READER permission).

organization_name: str | None = None

Name of the organization (requires ORGANIZATION_READER permission).

payment_status: str | None = None

Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices. Requires ORGANIZATION_READER permission.

subscription_status: str | None = None

Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 'unsubscribed'). Requires ORGANIZATION_READER permission.

is_account_locked: bool = False

Whether the account is locked due to billing issues. True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state. Requires ORGANIZATION_READER permission.

class LogReadResult(pydantic.main.BaseModel):
192class LogReadResult(BaseModel):
193    """Result of reading sync logs with pagination support."""
194
195    job_id: int
196    """The job ID the logs belong to."""
197    attempt_number: int
198    """The attempt number the logs belong to."""
199    log_text: str
200    """The string containing the log text we are returning."""
201    log_text_start_line: int
202    """1-based line index of the first line returned."""
203    log_text_line_count: int
204    """Count of lines we are returning."""
205    total_log_lines_available: int
206    """Total number of log lines available, shows if any lines were missed due to the limit."""

Result of reading sync logs with pagination support.

job_id: int = PydanticUndefined

The job ID the logs belong to.

attempt_number: int = PydanticUndefined

The attempt number the logs belong to.

log_text: str = PydanticUndefined

The string containing the log text we are returning.

log_text_start_line: int = PydanticUndefined

1-based line index of the first line returned.

log_text_line_count: int = PydanticUndefined

Count of lines we are returning.

total_log_lines_available: int = PydanticUndefined

Total number of log lines available, shows if any lines were missed due to the limit.

class SyncJobResult(pydantic.main.BaseModel):
209class SyncJobResult(BaseModel):
210    """Information about a sync job."""
211
212    job_id: int
213    """The job ID."""
214    status: str
215    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
216    bytes_synced: int
217    """Number of bytes synced in this job."""
218    records_synced: int
219    """Number of records synced in this job."""
220    start_time: str
221    """ISO 8601 timestamp of when the job started."""
222    job_url: str
223    """URL to view the job in Airbyte Cloud."""

Information about a sync job.

job_id: int = PydanticUndefined

The job ID.

status: str = PydanticUndefined

The job status (e.g., 'succeeded', 'failed', 'running', 'pending').

bytes_synced: int = PydanticUndefined

Number of bytes synced in this job.

records_synced: int = PydanticUndefined

Number of records synced in this job.

start_time: str = PydanticUndefined

ISO 8601 timestamp of when the job started.

job_url: str = PydanticUndefined

URL to view the job in Airbyte Cloud.

class SyncJobListResult(pydantic.main.BaseModel):
226class SyncJobListResult(BaseModel):
227    """Result of listing sync jobs with pagination support."""
228
229    jobs: list[SyncJobResult]
230    """List of sync jobs."""
231    jobs_count: int
232    """Number of jobs returned in this response."""
233    jobs_offset: int
234    """Offset used for this request (0 if not specified)."""
235    from_tail: bool
236    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""

Result of listing sync jobs with pagination support.

jobs: list[SyncJobResult] = PydanticUndefined

List of sync jobs.

jobs_count: int = PydanticUndefined

Number of jobs returned in this response.

jobs_offset: int = PydanticUndefined

Offset used for this request (0 if not specified).

from_tail: bool = PydanticUndefined

Whether jobs are ordered newest-first (True) or oldest-first (False).

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_source_to_cloud( ctx: fastmcp.server.context.Context, source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
274@mcp_tool(
275    open_world=True,
276    extra_help_text=CLOUD_AUTH_TIP_TEXT,
277)
278def deploy_source_to_cloud(
279    ctx: Context,
280    source_name: Annotated[
281        str,
282        Field(description="The name to use when deploying the source."),
283    ],
284    source_connector_name: Annotated[
285        str,
286        Field(description="The name of the source connector (e.g., 'source-faker')."),
287    ],
288    *,
289    workspace_id: Annotated[
290        str | None,
291        Field(
292            description=WORKSPACE_ID_TIP_TEXT,
293            default=None,
294        ),
295    ],
296    config: Annotated[
297        dict | str | None,
298        Field(
299            description="The configuration for the source connector.",
300            default=None,
301        ),
302    ],
303    config_secret_name: Annotated[
304        str | None,
305        Field(
306            description="The name of the secret containing the configuration.",
307            default=None,
308        ),
309    ],
310    unique: Annotated[
311        bool,
312        Field(
313            description="Whether to require a unique name.",
314            default=True,
315        ),
316    ],
317) -> str:
318    """Deploy a source connector to Airbyte Cloud."""
319    source = get_source(
320        source_connector_name,
321        no_executor=True,
322    )
323    config_dict = resolve_connector_config(
324        config=config,
325        config_secret_name=config_secret_name,
326        config_spec_jsonschema=source.config_spec,
327    )
328    source.set_config(config_dict, validate=True)
329
330    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
331    deployed_source = workspace.deploy_source(
332        name=source_name,
333        source=source,
334        unique=unique,
335    )
336
337    register_guid_created_in_session(deployed_source.connector_id)
338    return (
339        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
340        f" and URL: {deployed_source.connector_url}"
341    )

Deploy a source connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_destination_to_cloud( ctx: fastmcp.server.context.Context, destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
344@mcp_tool(
345    open_world=True,
346    extra_help_text=CLOUD_AUTH_TIP_TEXT,
347)
348def deploy_destination_to_cloud(
349    ctx: Context,
350    destination_name: Annotated[
351        str,
352        Field(description="The name to use when deploying the destination."),
353    ],
354    destination_connector_name: Annotated[
355        str,
356        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
357    ],
358    *,
359    workspace_id: Annotated[
360        str | None,
361        Field(
362            description=WORKSPACE_ID_TIP_TEXT,
363            default=None,
364        ),
365    ],
366    config: Annotated[
367        dict | str | None,
368        Field(
369            description="The configuration for the destination connector.",
370            default=None,
371        ),
372    ],
373    config_secret_name: Annotated[
374        str | None,
375        Field(
376            description="The name of the secret containing the configuration.",
377            default=None,
378        ),
379    ],
380    unique: Annotated[
381        bool,
382        Field(
383            description="Whether to require a unique name.",
384            default=True,
385        ),
386    ],
387) -> str:
388    """Deploy a destination connector to Airbyte Cloud."""
389    destination = get_destination(
390        destination_connector_name,
391        no_executor=True,
392    )
393    config_dict = resolve_connector_config(
394        config=config,
395        config_secret_name=config_secret_name,
396        config_spec_jsonschema=destination.config_spec,
397    )
398    destination.set_config(config_dict, validate=True)
399
400    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
401    deployed_destination = workspace.deploy_destination(
402        name=destination_name,
403        destination=destination,
404        unique=unique,
405    )
406
407    register_guid_created_in_session(deployed_destination.connector_id)
408    return (
409        f"Successfully deployed destination '{destination_name}' "
410        f"with ID: {deployed_destination.connector_id}"
411    )

Deploy a destination connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def create_connection_on_cloud( ctx: fastmcp.server.context.Context, connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
414@mcp_tool(
415    open_world=True,
416    extra_help_text=CLOUD_AUTH_TIP_TEXT,
417)
418def create_connection_on_cloud(
419    ctx: Context,
420    connection_name: Annotated[
421        str,
422        Field(description="The name of the connection."),
423    ],
424    source_id: Annotated[
425        str,
426        Field(description="The ID of the deployed source."),
427    ],
428    destination_id: Annotated[
429        str,
430        Field(description="The ID of the deployed destination."),
431    ],
432    selected_streams: Annotated[
433        str | list[str],
434        Field(
435            description=(
436                "The selected stream names to sync within the connection. "
437                "Must be an explicit stream name or list of streams. "
438                "Cannot be empty or '*'."
439            )
440        ),
441    ],
442    *,
443    workspace_id: Annotated[
444        str | None,
445        Field(
446            description=WORKSPACE_ID_TIP_TEXT,
447            default=None,
448        ),
449    ],
450    table_prefix: Annotated[
451        str | None,
452        Field(
453            description="Optional table prefix to use when syncing to the destination.",
454            default=None,
455        ),
456    ],
457) -> str:
458    """Create a connection between a deployed source and destination on Airbyte Cloud."""
459    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
460    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
461    deployed_connection = workspace.deploy_connection(
462        connection_name=connection_name,
463        source=source_id,
464        destination=destination_id,
465        selected_streams=resolved_streams_list,
466        table_prefix=table_prefix,
467    )
468
469    register_guid_created_in_session(deployed_connection.connection_id)
470    return (
471        f"Successfully created connection '{connection_name}' "
472        f"with ID '{deployed_connection.connection_id}' and "
473        f"URL: {deployed_connection.connection_url}"
474    )

Create a connection between a deployed source and destination on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def run_cloud_sync( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
477@mcp_tool(
478    open_world=True,
479    extra_help_text=CLOUD_AUTH_TIP_TEXT,
480)
481def run_cloud_sync(
482    ctx: Context,
483    connection_id: Annotated[
484        str,
485        Field(description="The ID of the Airbyte Cloud connection."),
486    ],
487    *,
488    workspace_id: Annotated[
489        str | None,
490        Field(
491            description=WORKSPACE_ID_TIP_TEXT,
492            default=None,
493        ),
494    ],
495    wait: Annotated[
496        bool,
497        Field(
498            description=(
499                "Whether to wait for the sync to complete. Since a sync can take between several "
500                "minutes and several hours, this option is not recommended for most "
501                "scenarios."
502            ),
503            default=False,
504        ),
505    ],
506    wait_timeout: Annotated[
507        int,
508        Field(
509            description="Maximum time to wait for sync completion (seconds).",
510            default=300,
511        ),
512    ],
513) -> str:
514    """Run a sync job on Airbyte Cloud."""
515    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
516    connection = workspace.get_connection(connection_id=connection_id)
517    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
518
519    if wait:
520        status = sync_result.get_job_status()
521        return (
522            f"Sync completed with status: {status}. "
523            f"Job ID is '{sync_result.job_id}' and "
524            f"job URL is: {sync_result.job_url}"
525        )
526    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"

Run a sync job on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def check_airbyte_cloud_workspace( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudWorkspaceResult:
529@mcp_tool(
530    read_only=True,
531    idempotent=True,
532    open_world=True,
533    extra_help_text=CLOUD_AUTH_TIP_TEXT,
534)
535def check_airbyte_cloud_workspace(
536    ctx: Context,
537    *,
538    workspace_id: Annotated[
539        str | None,
540        Field(
541            description=WORKSPACE_ID_TIP_TEXT,
542            default=None,
543        ),
544    ],
545) -> CloudWorkspaceResult:
546    """Check if we have a valid Airbyte Cloud connection and return workspace info.
547
548    Returns workspace details including workspace ID, name, organization info, and billing status.
549    """
550    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
551
552    # Get workspace details from the public API using workspace's credentials
553    workspace_response = api_util.get_workspace(
554        workspace_id=workspace.workspace_id,
555        api_root=workspace.api_root,
556        client_id=workspace.client_id,
557        client_secret=workspace.client_secret,
558        bearer_token=workspace.bearer_token,
559    )
560
561    # Try to get organization info (including billing), but fail gracefully if we don't have
562    # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the
563    # organization, which may not be available with workspace-scoped credentials.
564    organization = workspace.get_organization(raise_on_error=False)
565
566    return CloudWorkspaceResult(
567        workspace_id=workspace_response.workspace_id,
568        workspace_name=workspace_response.name,
569        workspace_url=workspace.workspace_url,
570        organization_id=(
571            organization.organization_id
572            if organization
573            else "[unavailable - requires ORGANIZATION_READER permission]"
574        ),
575        organization_name=organization.organization_name if organization else None,
576        payment_status=organization.payment_status if organization else None,
577        subscription_status=organization.subscription_status if organization else None,
578        is_account_locked=organization.is_account_locked if organization else False,
579    )

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace details including workspace ID, name, organization info, and billing status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_noop_destination_to_cloud( ctx: fastmcp.server.context.Context, name: str = 'No-op Destination', *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], unique: bool = True) -> str:
582@mcp_tool(
583    open_world=True,
584    extra_help_text=CLOUD_AUTH_TIP_TEXT,
585)
586def deploy_noop_destination_to_cloud(
587    ctx: Context,
588    name: str = "No-op Destination",
589    *,
590    workspace_id: Annotated[
591        str | None,
592        Field(
593            description=WORKSPACE_ID_TIP_TEXT,
594            default=None,
595        ),
596    ],
597    unique: bool = True,
598) -> str:
599    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
600    destination = get_noop_destination()
601    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
602    deployed_destination = workspace.deploy_destination(
603        name=name,
604        destination=destination,
605        unique=unique,
606    )
607    register_guid_created_in_session(deployed_destination.connector_id)
608    return (
609        f"Successfully deployed No-op Destination "
610        f"with ID '{deployed_destination.connector_id}' and "
611        f"URL: {deployed_destination.connector_url}"
612    )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_status( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
615@mcp_tool(
616    read_only=True,
617    idempotent=True,
618    open_world=True,
619    extra_help_text=CLOUD_AUTH_TIP_TEXT,
620)
621def get_cloud_sync_status(
622    ctx: Context,
623    connection_id: Annotated[
624        str,
625        Field(
626            description="The ID of the Airbyte Cloud connection.",
627        ),
628    ],
629    job_id: Annotated[
630        int | None,
631        Field(
632            description="Optional job ID. If not provided, the latest job will be used.",
633            default=None,
634        ),
635    ],
636    *,
637    workspace_id: Annotated[
638        str | None,
639        Field(
640            description=WORKSPACE_ID_TIP_TEXT,
641            default=None,
642        ),
643    ],
644    include_attempts: Annotated[
645        bool,
646        Field(
647            description="Whether to include detailed attempts information.",
648            default=False,
649        ),
650    ],
651) -> dict[str, Any]:
652    """Get the status of a sync job from the Airbyte Cloud."""
653    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
654    connection = workspace.get_connection(connection_id=connection_id)
655
656    # If a job ID is provided, get the job by ID.
657    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
658
659    if not sync_result:
660        return {"status": None, "job_id": None, "attempts": []}
661
662    result = {
663        "status": sync_result.get_job_status(),
664        "job_id": sync_result.job_id,
665        "bytes_synced": sync_result.bytes_synced,
666        "records_synced": sync_result.records_synced,
667        "start_time": sync_result.start_time.isoformat(),
668        "job_url": sync_result.job_url,
669        "attempts": [],
670    }
671
672    if include_attempts:
673        attempts = sync_result.get_attempts()
674        result["attempts"] = [
675            {
676                "attempt_number": attempt.attempt_number,
677                "attempt_id": attempt.attempt_id,
678                "status": attempt.status,
679                "bytes_synced": attempt.bytes_synced,
680                "records_synced": attempt.records_synced,
681                "created_at": attempt.created_at.isoformat(),
682            }
683            for attempt in attempts
684        ]
685
686    return result

Get the status of a sync job from the Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_sync_jobs( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_jobs: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=20, description='Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.')], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description='When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if `jobs_offset` is not specified. Cannot combine `from_tail=True` with `jobs_offset`.')], jobs_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of jobs to skip from the beginning. Cannot be combined with `from_tail=True`.')], job_type: typing.Annotated[airbyte_api.models.jobtypeenum.JobTypeEnum | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs.")]) -> SyncJobListResult:
689@mcp_tool(
690    read_only=True,
691    idempotent=True,
692    open_world=True,
693    extra_help_text=CLOUD_AUTH_TIP_TEXT,
694)
695def list_cloud_sync_jobs(
696    ctx: Context,
697    connection_id: Annotated[
698        str,
699        Field(description="The ID of the Airbyte Cloud connection."),
700    ],
701    *,
702    workspace_id: Annotated[
703        str | None,
704        Field(
705            description=WORKSPACE_ID_TIP_TEXT,
706            default=None,
707        ),
708    ],
709    max_jobs: Annotated[
710        int,
711        Field(
712            description=(
713                "Maximum number of jobs to return. "
714                "Defaults to 20 if not specified. "
715                "Maximum allowed value is 500."
716            ),
717            default=20,
718        ),
719    ],
720    from_tail: Annotated[
721        bool | None,
722        Field(
723            description=(
724                "When True, jobs are ordered newest-first (createdAt DESC). "
725                "When False, jobs are ordered oldest-first (createdAt ASC). "
726                "Defaults to True if `jobs_offset` is not specified. "
727                "Cannot combine `from_tail=True` with `jobs_offset`."
728            ),
729            default=None,
730        ),
731    ],
732    jobs_offset: Annotated[
733        int | None,
734        Field(
735            description=(
736                "Number of jobs to skip from the beginning. "
737                "Cannot be combined with `from_tail=True`."
738            ),
739            default=None,
740        ),
741    ],
742    job_type: Annotated[
743        JobTypeEnum | None,
744        Field(
745            description=(
746                "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. "
747                "If not specified, defaults to sync and reset jobs only (API default). "
748                "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
749            ),
750            default=None,
751        ),
752    ],
753) -> SyncJobListResult:
754    """List sync jobs for a connection with pagination support.
755
756    This tool allows you to retrieve a list of sync jobs for a connection,
757    with control over ordering and pagination. By default, jobs are returned
758    newest-first (from_tail=True).
759    """
760    # Validate that jobs_offset and from_tail are not both set
761    if jobs_offset is not None and from_tail is True:
762        raise PyAirbyteInputError(
763            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
764            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
765        )
766
767    # Default to from_tail=True if neither is specified
768    if from_tail is None and jobs_offset is None:
769        from_tail = True
770    elif from_tail is None:
771        from_tail = False
772
773    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
774    connection = workspace.get_connection(connection_id=connection_id)
775
776    # Cap at 500 to avoid overloading agent context
777    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
778
779    sync_results = connection.get_previous_sync_logs(
780        limit=effective_limit,
781        offset=jobs_offset,
782        from_tail=from_tail,
783        job_type=job_type,
784    )
785
786    jobs = [
787        SyncJobResult(
788            job_id=sync_result.job_id,
789            status=str(sync_result.get_job_status()),
790            bytes_synced=sync_result.bytes_synced,
791            records_synced=sync_result.records_synced,
792            start_time=sync_result.start_time.isoformat(),
793            job_url=sync_result.job_url,
794        )
795        for sync_result in sync_results
796    ]
797
798    return SyncJobListResult(
799        jobs=jobs,
800        jobs_count=len(jobs),
801        jobs_offset=jobs_offset or 0,
802        from_tail=from_tail,
803    )

List sync jobs for a connection with pagination support.

This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_source_connectors( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter sources by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudSourceResult]:
806@mcp_tool(
807    read_only=True,
808    idempotent=True,
809    open_world=True,
810    extra_help_text=CLOUD_AUTH_TIP_TEXT,
811)
812def list_deployed_cloud_source_connectors(
813    ctx: Context,
814    *,
815    workspace_id: Annotated[
816        str | None,
817        Field(
818            description=WORKSPACE_ID_TIP_TEXT,
819            default=None,
820        ),
821    ],
822    name_contains: Annotated[
823        str | None,
824        Field(
825            description="Optional case-insensitive substring to filter sources by name",
826            default=None,
827        ),
828    ],
829    max_items_limit: Annotated[
830        int | None,
831        Field(
832            description="Optional maximum number of items to return (default: no limit)",
833            default=None,
834        ),
835    ],
836) -> list[CloudSourceResult]:
837    """List all deployed source connectors in the Airbyte Cloud workspace."""
838    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
839    sources = workspace.list_sources()
840
841    # Filter by name if requested
842    if name_contains:
843        needle = name_contains.lower()
844        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
845
846    # Apply limit if requested
847    if max_items_limit is not None:
848        sources = sources[:max_items_limit]
849
850    # Note: name and url are guaranteed non-null from list API responses
851    return [
852        CloudSourceResult(
853            id=source.source_id,
854            name=cast(str, source.name),
855            url=cast(str, source.connector_url),
856        )
857        for source in sources
858    ]

List all deployed source connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_destination_connectors( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter destinations by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudDestinationResult]:
861@mcp_tool(
862    read_only=True,
863    idempotent=True,
864    open_world=True,
865    extra_help_text=CLOUD_AUTH_TIP_TEXT,
866)
867def list_deployed_cloud_destination_connectors(
868    ctx: Context,
869    *,
870    workspace_id: Annotated[
871        str | None,
872        Field(
873            description=WORKSPACE_ID_TIP_TEXT,
874            default=None,
875        ),
876    ],
877    name_contains: Annotated[
878        str | None,
879        Field(
880            description="Optional case-insensitive substring to filter destinations by name",
881            default=None,
882        ),
883    ],
884    max_items_limit: Annotated[
885        int | None,
886        Field(
887            description="Optional maximum number of items to return (default: no limit)",
888            default=None,
889        ),
890    ],
891) -> list[CloudDestinationResult]:
892    """List all deployed destination connectors in the Airbyte Cloud workspace."""
893    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
894    destinations = workspace.list_destinations()
895
896    # Filter by name if requested
897    if name_contains:
898        needle = name_contains.lower()
899        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
900
901    # Apply limit if requested
902    if max_items_limit is not None:
903        destinations = destinations[:max_items_limit]
904
905    # Note: name and url are guaranteed non-null from list API responses
906    return [
907        CloudDestinationResult(
908            id=destination.destination_id,
909            name=cast(str, destination.name),
910            url=cast(str, destination.connector_url),
911        )
912        for destination in destinations
913    ]

List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_source( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the source to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudSourceDetails:
916@mcp_tool(
917    read_only=True,
918    idempotent=True,
919    open_world=True,
920    extra_help_text=CLOUD_AUTH_TIP_TEXT,
921)
922def describe_cloud_source(
923    ctx: Context,
924    source_id: Annotated[
925        str,
926        Field(description="The ID of the source to describe."),
927    ],
928    *,
929    workspace_id: Annotated[
930        str | None,
931        Field(
932            description=WORKSPACE_ID_TIP_TEXT,
933            default=None,
934        ),
935    ],
936) -> CloudSourceDetails:
937    """Get detailed information about a specific deployed source connector."""
938    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
939    source = workspace.get_source(source_id=source_id)
940
941    # Access name property to ensure _connector_info is populated
942    source_name = cast(str, source.name)
943
944    return CloudSourceDetails(
945        source_id=source.source_id,
946        source_name=source_name,
947        source_url=source.connector_url,
948        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
949    )

Get detailed information about a specific deployed source connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_destination( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the destination to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudDestinationDetails:
952@mcp_tool(
953    read_only=True,
954    idempotent=True,
955    open_world=True,
956    extra_help_text=CLOUD_AUTH_TIP_TEXT,
957)
958def describe_cloud_destination(
959    ctx: Context,
960    destination_id: Annotated[
961        str,
962        Field(description="The ID of the destination to describe."),
963    ],
964    *,
965    workspace_id: Annotated[
966        str | None,
967        Field(
968            description=WORKSPACE_ID_TIP_TEXT,
969            default=None,
970        ),
971    ],
972) -> CloudDestinationDetails:
973    """Get detailed information about a specific deployed destination connector."""
974    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
975    destination = workspace.get_destination(destination_id=destination_id)
976
977    # Access name property to ensure _connector_info is populated
978    destination_name = cast(str, destination.name)
979
980    return CloudDestinationDetails(
981        destination_id=destination.destination_id,
982        destination_name=destination_name,
983        destination_url=destination.connector_url,
984        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
985    )

Get detailed information about a specific deployed destination connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudConnectionDetails:
 988@mcp_tool(
 989    read_only=True,
 990    idempotent=True,
 991    open_world=True,
 992    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 993)
 994def describe_cloud_connection(
 995    ctx: Context,
 996    connection_id: Annotated[
 997        str,
 998        Field(description="The ID of the connection to describe."),
 999    ],
1000    *,
1001    workspace_id: Annotated[
1002        str | None,
1003        Field(
1004            description=WORKSPACE_ID_TIP_TEXT,
1005            default=None,
1006        ),
1007    ],
1008) -> CloudConnectionDetails:
1009    """Get detailed information about a specific deployed connection."""
1010    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1011    connection = workspace.get_connection(connection_id=connection_id)
1012
1013    return CloudConnectionDetails(
1014        connection_id=connection.connection_id,
1015        connection_name=cast(str, connection.name),
1016        connection_url=cast(str, connection.connection_url),
1017        source_id=connection.source_id,
1018        source_name=cast(str, connection.source.name),
1019        destination_id=connection.destination_id,
1020        destination_name=cast(str, connection.destination.name),
1021        selected_streams=connection.stream_names,
1022        table_prefix=connection.table_prefix,
1023    )

Get detailed information about a specific deployed connection.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_logs( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_lines: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=4000, description="Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.")], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`.")], line_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`.')]) -> LogReadResult:
1026@mcp_tool(
1027    read_only=True,
1028    idempotent=True,
1029    open_world=True,
1030    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1031)
1032def get_cloud_sync_logs(
1033    ctx: Context,
1034    connection_id: Annotated[
1035        str,
1036        Field(description="The ID of the Airbyte Cloud connection."),
1037    ],
1038    job_id: Annotated[
1039        int | None,
1040        Field(description="Optional job ID. If not provided, the latest job will be used."),
1041    ] = None,
1042    attempt_number: Annotated[
1043        int | None,
1044        Field(
1045            description="Optional attempt number. If not provided, the latest attempt will be used."
1046        ),
1047    ] = None,
1048    *,
1049    workspace_id: Annotated[
1050        str | None,
1051        Field(
1052            description=WORKSPACE_ID_TIP_TEXT,
1053            default=None,
1054        ),
1055    ],
1056    max_lines: Annotated[
1057        int,
1058        Field(
1059            description=(
1060                "Maximum number of lines to return. "
1061                "Defaults to 4000 if not specified. "
1062                "If '0' is provided, no limit is applied."
1063            ),
1064            default=4000,
1065        ),
1066    ],
1067    from_tail: Annotated[
1068        bool | None,
1069        Field(
1070            description=(
1071                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1072                "Defaults to True if `line_offset` is not specified. "
1073                "Cannot combine `from_tail=True` with `line_offset`."
1074            ),
1075            default=None,
1076        ),
1077    ],
1078    line_offset: Annotated[
1079        int | None,
1080        Field(
1081            description=(
1082                "Number of lines to skip from the beginning of the logs. "
1083                "Cannot be combined with `from_tail=True`."
1084            ),
1085            default=None,
1086        ),
1087    ],
1088) -> LogReadResult:
1089    """Get the logs from a sync job attempt on Airbyte Cloud."""
1090    # Validate that line_offset and from_tail are not both set
1091    if line_offset is not None and from_tail:
1092        raise PyAirbyteInputError(
1093            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1094            context={"line_offset": line_offset, "from_tail": from_tail},
1095        )
1096
1097    if from_tail is None and line_offset is None:
1098        from_tail = True
1099    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1100    connection = workspace.get_connection(connection_id=connection_id)
1101
1102    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1103
1104    if not sync_result:
1105        raise AirbyteMissingResourceError(
1106            resource_type="sync job",
1107            resource_name_or_id=connection_id,
1108        )
1109
1110    attempts = sync_result.get_attempts()
1111
1112    if not attempts:
1113        raise AirbyteMissingResourceError(
1114            resource_type="sync attempt",
1115            resource_name_or_id=str(sync_result.job_id),
1116        )
1117
1118    if attempt_number is not None:
1119        target_attempt = None
1120        for attempt in attempts:
1121            if attempt.attempt_number == attempt_number:
1122                target_attempt = attempt
1123                break
1124
1125        if target_attempt is None:
1126            raise AirbyteMissingResourceError(
1127                resource_type="sync attempt",
1128                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1129            )
1130    else:
1131        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1132
1133    logs = target_attempt.get_full_log_text()
1134
1135    if not logs:
1136        # Return empty result with zero lines
1137        return LogReadResult(
1138            log_text=(
1139                f"[No logs available for job '{sync_result.job_id}', "
1140                f"attempt {target_attempt.attempt_number}.]"
1141            ),
1142            log_text_start_line=1,
1143            log_text_line_count=0,
1144            total_log_lines_available=0,
1145            job_id=sync_result.job_id,
1146            attempt_number=target_attempt.attempt_number,
1147        )
1148
1149    # Apply line limiting
1150    log_lines = logs.splitlines()
1151    total_lines = len(log_lines)
1152
1153    # Determine effective max_lines (0 means no limit)
1154    effective_max = total_lines if max_lines == 0 else max_lines
1155
1156    # Calculate start_index and slice based on from_tail or line_offset
1157    if from_tail:
1158        start_index = max(0, total_lines - effective_max)
1159        selected_lines = log_lines[start_index:][:effective_max]
1160    else:
1161        start_index = line_offset or 0
1162        selected_lines = log_lines[start_index : start_index + effective_max]
1163
1164    return LogReadResult(
1165        log_text="\n".join(selected_lines),
1166        log_text_start_line=start_index + 1,  # Convert to 1-based index
1167        log_text_line_count=len(selected_lines),
1168        total_log_lines_available=total_lines,
1169        job_id=sync_result.job_id,
1170        attempt_number=target_attempt.attempt_number,
1171    )

Get the logs from a sync job attempt on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_connections( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter connections by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')], with_connection_status: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description="If True, include status info for each connection's most recent sync job")], failing_connections_only: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description='If True, only return connections with failed/cancelled last sync')]) -> list[CloudConnectionResult]:
1174@mcp_tool(
1175    read_only=True,
1176    idempotent=True,
1177    open_world=True,
1178    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1179)
1180def list_deployed_cloud_connections(
1181    ctx: Context,
1182    *,
1183    workspace_id: Annotated[
1184        str | None,
1185        Field(
1186            description=WORKSPACE_ID_TIP_TEXT,
1187            default=None,
1188        ),
1189    ],
1190    name_contains: Annotated[
1191        str | None,
1192        Field(
1193            description="Optional case-insensitive substring to filter connections by name",
1194            default=None,
1195        ),
1196    ],
1197    max_items_limit: Annotated[
1198        int | None,
1199        Field(
1200            description="Optional maximum number of items to return (default: no limit)",
1201            default=None,
1202        ),
1203    ],
1204    with_connection_status: Annotated[
1205        bool | None,
1206        Field(
1207            description="If True, include status info for each connection's most recent sync job",
1208            default=False,
1209        ),
1210    ],
1211    failing_connections_only: Annotated[
1212        bool | None,
1213        Field(
1214            description="If True, only return connections with failed/cancelled last sync",
1215            default=False,
1216        ),
1217    ],
1218) -> list[CloudConnectionResult]:
1219    """List all deployed connections in the Airbyte Cloud workspace.
1220
1221    When with_connection_status is True, each connection result will include
1222    information about the most recent sync job status, skipping over any
1223    currently in-progress syncs to find the last completed job.
1224
1225    When failing_connections_only is True, only connections where the most
1226    recent completed sync job failed or was cancelled will be returned.
1227    This implicitly enables with_connection_status.
1228    """
1229    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1230    connections = workspace.list_connections()
1231
1232    # Filter by name if requested
1233    if name_contains:
1234        needle = name_contains.lower()
1235        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1236
1237    # If failing_connections_only is True, implicitly enable with_connection_status
1238    if failing_connections_only:
1239        with_connection_status = True
1240
1241    results: list[CloudConnectionResult] = []
1242
1243    for connection in connections:
1244        last_job_status: str | None = None
1245        last_job_id: int | None = None
1246        last_job_time: str | None = None
1247        currently_running_job_id: int | None = None
1248        currently_running_job_start_time: str | None = None
1249
1250        if with_connection_status:
1251            sync_logs = connection.get_previous_sync_logs(limit=5)
1252            last_completed_job_status = None  # Keep enum for comparison
1253
1254            for sync_result in sync_logs:
1255                job_status = sync_result.get_job_status()
1256
1257                if not sync_result.is_job_complete():
1258                    currently_running_job_id = sync_result.job_id
1259                    currently_running_job_start_time = sync_result.start_time.isoformat()
1260                    continue
1261
1262                last_completed_job_status = job_status
1263                last_job_status = str(job_status.value) if job_status else None
1264                last_job_id = sync_result.job_id
1265                last_job_time = sync_result.start_time.isoformat()
1266                break
1267
1268            if failing_connections_only and (
1269                last_completed_job_status is None
1270                or last_completed_job_status not in FAILED_STATUSES
1271            ):
1272                continue
1273
1274        results.append(
1275            CloudConnectionResult(
1276                id=connection.connection_id,
1277                name=cast(str, connection.name),
1278                url=cast(str, connection.connection_url),
1279                source_id=connection.source_id,
1280                destination_id=connection.destination_id,
1281                last_job_status=last_job_status,
1282                last_job_id=last_job_id,
1283                last_job_time=last_job_time,
1284                currently_running_job_id=currently_running_job_id,
1285                currently_running_job_start_time=currently_running_job_start_time,
1286            )
1287        )
1288
1289        if max_items_limit is not None and len(results) >= max_items_limit:
1290            break
1291
1292    return results

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_workspaces( ctx: fastmcp.server.context.Context, *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional substring to filter workspaces by name (server-side filtering)')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudWorkspaceResult]:
1412@mcp_tool(
1413    read_only=True,
1414    idempotent=True,
1415    open_world=True,
1416    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1417)
1418def list_cloud_workspaces(
1419    ctx: Context,
1420    *,
1421    organization_id: Annotated[
1422        str | None,
1423        Field(
1424            description="Organization ID. Required if organization_name is not provided.",
1425            default=None,
1426        ),
1427    ],
1428    organization_name: Annotated[
1429        str | None,
1430        Field(
1431            description=(
1432                "Organization name (exact match). " "Required if organization_id is not provided."
1433            ),
1434            default=None,
1435        ),
1436    ],
1437    name_contains: Annotated[
1438        str | None,
1439        Field(
1440            description="Optional substring to filter workspaces by name (server-side filtering)",
1441            default=None,
1442        ),
1443    ],
1444    max_items_limit: Annotated[
1445        int | None,
1446        Field(
1447            description="Optional maximum number of items to return (default: no limit)",
1448            default=None,
1449        ),
1450    ],
1451) -> list[CloudWorkspaceResult]:
1452    """List all workspaces in a specific organization.
1453
1454    Requires either organization_id OR organization_name (exact match) to be provided.
1455    This tool will NOT list workspaces across all organizations - you must specify
1456    which organization to list workspaces from.
1457    """
1458    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1459    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1460    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1461    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1462
1463    resolved_org_id = _resolve_organization_id(
1464        organization_id=organization_id,
1465        organization_name=organization_name,
1466        api_root=api_url,
1467        client_id=SecretString(client_id) if client_id else None,
1468        client_secret=SecretString(client_secret) if client_secret else None,
1469        bearer_token=SecretString(bearer_token) if bearer_token else None,
1470    )
1471
1472    workspaces = api_util.list_workspaces_in_organization(
1473        organization_id=resolved_org_id,
1474        api_root=api_url,
1475        client_id=SecretString(client_id) if client_id else None,
1476        client_secret=SecretString(client_secret) if client_secret else None,
1477        bearer_token=SecretString(bearer_token) if bearer_token else None,
1478        name_contains=name_contains,
1479        max_items_limit=max_items_limit,
1480    )
1481
1482    return [
1483        CloudWorkspaceResult(
1484            workspace_id=ws.get("workspaceId", ""),
1485            workspace_name=ws.get("name", ""),
1486            organization_id=ws.get("organizationId", ""),
1487        )
1488        for ws in workspaces
1489    ]

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_organization( ctx: fastmcp.server.context.Context, *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')]) -> CloudOrganizationResult:
1492@mcp_tool(
1493    read_only=True,
1494    idempotent=True,
1495    open_world=True,
1496    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1497)
1498def describe_cloud_organization(
1499    ctx: Context,
1500    *,
1501    organization_id: Annotated[
1502        str | None,
1503        Field(
1504            description="Organization ID. Required if organization_name is not provided.",
1505            default=None,
1506        ),
1507    ],
1508    organization_name: Annotated[
1509        str | None,
1510        Field(
1511            description=(
1512                "Organization name (exact match). " "Required if organization_id is not provided."
1513            ),
1514            default=None,
1515        ),
1516    ],
1517) -> CloudOrganizationResult:
1518    """Get details about a specific organization including billing status.
1519
1520    Requires either organization_id OR organization_name (exact match) to be provided.
1521    This tool is useful for looking up an organization's ID from its name, or vice versa.
1522    """
1523    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1524    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1525    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1526    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1527
1528    org = _resolve_organization(
1529        organization_id=organization_id,
1530        organization_name=organization_name,
1531        api_root=api_url,
1532        client_id=SecretString(client_id) if client_id else None,
1533        client_secret=SecretString(client_secret) if client_secret else None,
1534        bearer_token=SecretString(bearer_token) if bearer_token else None,
1535    )
1536
1537    # CloudOrganization has lazy loading of billing properties
1538    return CloudOrganizationResult(
1539        id=org.organization_id,
1540        name=org.organization_name,
1541        email=org.email,
1542        payment_status=org.payment_status,
1543        subscription_status=org.subscription_status,
1544        is_account_locked=org.is_account_locked,
1545    )

Get details about a specific organization including billing status.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def publish_custom_source_definition( ctx: fastmcp.server.context.Context, name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name for the custom connector definition.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors.')] = None, unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')] = True, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before publishing.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1562@mcp_tool(
1563    open_world=True,
1564    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1565)
1566def publish_custom_source_definition(
1567    ctx: Context,
1568    name: Annotated[
1569        str,
1570        Field(description="The name for the custom connector definition."),
1571    ],
1572    *,
1573    workspace_id: Annotated[
1574        str | None,
1575        Field(
1576            description=WORKSPACE_ID_TIP_TEXT,
1577            default=None,
1578        ),
1579    ],
1580    manifest_yaml: Annotated[
1581        str | Path | None,
1582        Field(
1583            description=(
1584                "The Low-code CDK manifest as a YAML string or file path. "
1585                "Required for YAML connectors."
1586            ),
1587            default=None,
1588        ),
1589    ] = None,
1590    unique: Annotated[
1591        bool,
1592        Field(
1593            description="Whether to require a unique name.",
1594            default=True,
1595        ),
1596    ] = True,
1597    pre_validate: Annotated[
1598        bool,
1599        Field(
1600            description="Whether to validate the manifest client-side before publishing.",
1601            default=True,
1602        ),
1603    ] = True,
1604    testing_values: Annotated[
1605        dict | str | None,
1606        Field(
1607            description=(
1608                "Optional testing configuration values for the Builder UI. "
1609                "Can be provided as a JSON object or JSON string. "
1610                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1611                "If provided, these values replace any existing testing values "
1612                "for the connector builder project, allowing immediate test read operations."
1613            ),
1614            default=None,
1615        ),
1616    ],
1617    testing_values_secret_name: Annotated[
1618        str | None,
1619        Field(
1620            description=(
1621                "Optional name of a secret containing testing configuration values "
1622                "in JSON or YAML format. The secret will be resolved by the MCP "
1623                "server and merged into testing_values, with secret values taking "
1624                "precedence. This lets the agent reference secrets without sending "
1625                "raw values as tool arguments."
1626            ),
1627            default=None,
1628        ),
1629    ],
1630) -> str:
1631    """Publish a custom YAML source connector definition to Airbyte Cloud.
1632
1633    Note: Only YAML (declarative) connectors are currently supported.
1634    Docker-based custom sources are not yet available.
1635    """
1636    processed_manifest = manifest_yaml
1637    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1638        processed_manifest = Path(manifest_yaml)
1639
1640    # Resolve testing values from inline config and/or secret
1641    testing_values_dict: dict[str, Any] | None = None
1642    if testing_values is not None or testing_values_secret_name is not None:
1643        testing_values_dict = (
1644            resolve_connector_config(
1645                config=testing_values,
1646                config_secret_name=testing_values_secret_name,
1647            )
1648            or None
1649        )
1650
1651    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1652    custom_source = workspace.publish_custom_source_definition(
1653        name=name,
1654        manifest_yaml=processed_manifest,
1655        unique=unique,
1656        pre_validate=pre_validate,
1657        testing_values=testing_values_dict,
1658    )
1659    register_guid_created_in_session(custom_source.definition_id)
1660    return (
1661        "Successfully published custom YAML source definition:\n"
1662        + _get_custom_source_definition_description(
1663            custom_source=custom_source,
1664        )
1665        + "\n"
1666    )

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True)
def list_custom_source_definitions( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> list[dict[str, typing.Any]]:
1669@mcp_tool(
1670    read_only=True,
1671    idempotent=True,
1672    open_world=True,
1673)
1674def list_custom_source_definitions(
1675    ctx: Context,
1676    *,
1677    workspace_id: Annotated[
1678        str | None,
1679        Field(
1680            description=WORKSPACE_ID_TIP_TEXT,
1681            default=None,
1682        ),
1683    ],
1684) -> list[dict[str, Any]]:
1685    """List custom YAML source definitions in the Airbyte Cloud workspace.
1686
1687    Note: Only YAML (declarative) connectors are currently supported.
1688    Docker-based custom sources are not yet available.
1689    """
1690    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1691    definitions = workspace.list_custom_source_definitions(
1692        definition_type="yaml",
1693    )
1694
1695    return [
1696        {
1697            "definition_id": d.definition_id,
1698            "name": d.name,
1699            "version": d.version,
1700            "connector_builder_project_url": d.connector_builder_project_url,
1701        }
1702        for d in definitions
1703    ]

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(read_only=True, idempotent=True, open_world=True)
def get_custom_source_definition( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to retrieve.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any]:
1706@mcp_tool(
1707    read_only=True,
1708    idempotent=True,
1709    open_world=True,
1710)
1711def get_custom_source_definition(
1712    ctx: Context,
1713    definition_id: Annotated[
1714        str,
1715        Field(description="The ID of the custom source definition to retrieve."),
1716    ],
1717    *,
1718    workspace_id: Annotated[
1719        str | None,
1720        Field(
1721            description=WORKSPACE_ID_TIP_TEXT,
1722            default=None,
1723        ),
1724    ],
1725) -> dict[str, Any]:
1726    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1727
1728    Returns the full definition details including the manifest YAML content,
1729    which can be used to inspect or store the connector configuration locally.
1730
1731    Note: Only YAML (declarative) connectors are currently supported.
1732    Docker-based custom sources are not yet available.
1733    """
1734    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1735    definition = workspace.get_custom_source_definition(
1736        definition_id=definition_id,
1737        definition_type="yaml",
1738    )
1739
1740    return {
1741        "definition_id": definition.definition_id,
1742        "name": definition.name,
1743        "version": definition.version,
1744        "connector_builder_project_id": definition.connector_builder_project_id,
1745        "connector_builder_project_url": definition.connector_builder_project_url,
1746        "manifest": definition.manifest,
1747    }

Get a custom YAML source definition from Airbyte Cloud, including its manifest.

Returns the full definition details including the manifest YAML content, which can be used to inspect or store the connector configuration locally.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(destructive=True, open_world=True)
def update_custom_source_definition( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the definition to update.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='New manifest as YAML string or file path. Optional; omit to update only testing values.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before updating.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1750@mcp_tool(
1751    destructive=True,
1752    open_world=True,
1753)
1754def update_custom_source_definition(
1755    ctx: Context,
1756    definition_id: Annotated[
1757        str,
1758        Field(description="The ID of the definition to update."),
1759    ],
1760    manifest_yaml: Annotated[
1761        str | Path | None,
1762        Field(
1763            description=(
1764                "New manifest as YAML string or file path. "
1765                "Optional; omit to update only testing values."
1766            ),
1767            default=None,
1768        ),
1769    ] = None,
1770    *,
1771    workspace_id: Annotated[
1772        str | None,
1773        Field(
1774            description=WORKSPACE_ID_TIP_TEXT,
1775            default=None,
1776        ),
1777    ],
1778    pre_validate: Annotated[
1779        bool,
1780        Field(
1781            description="Whether to validate the manifest client-side before updating.",
1782            default=True,
1783        ),
1784    ] = True,
1785    testing_values: Annotated[
1786        dict | str | None,
1787        Field(
1788            description=(
1789                "Optional testing configuration values for the Builder UI. "
1790                "Can be provided as a JSON object or JSON string. "
1791                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1792                "If provided, these values replace any existing testing values "
1793                "for the connector builder project. The entire testing values object "
1794                "is overwritten, so pass the full set of values you want to persist."
1795            ),
1796            default=None,
1797        ),
1798    ],
1799    testing_values_secret_name: Annotated[
1800        str | None,
1801        Field(
1802            description=(
1803                "Optional name of a secret containing testing configuration values "
1804                "in JSON or YAML format. The secret will be resolved by the MCP "
1805                "server and merged into testing_values, with secret values taking "
1806                "precedence. This lets the agent reference secrets without sending "
1807                "raw values as tool arguments."
1808            ),
1809            default=None,
1810        ),
1811    ],
1812) -> str:
1813    """Update a custom YAML source definition in Airbyte Cloud.
1814
1815    Updates the manifest and/or testing values for an existing custom source definition.
1816    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1817    """
1818    check_guid_created_in_session(definition_id)
1819
1820    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1821
1822    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1823        raise PyAirbyteInputError(
1824            message=(
1825                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1826                "must be provided to update a custom source definition."
1827            ),
1828            context={
1829                "definition_id": definition_id,
1830                "workspace_id": workspace.workspace_id,
1831            },
1832        )
1833
1834    processed_manifest: str | Path | None = manifest_yaml
1835    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1836        processed_manifest = Path(manifest_yaml)
1837
1838    # Resolve testing values from inline config and/or secret
1839    testing_values_dict: dict[str, Any] | None = None
1840    if testing_values is not None or testing_values_secret_name is not None:
1841        testing_values_dict = (
1842            resolve_connector_config(
1843                config=testing_values,
1844                config_secret_name=testing_values_secret_name,
1845            )
1846            or None
1847        )
1848
1849    definition = workspace.get_custom_source_definition(
1850        definition_id=definition_id,
1851        definition_type="yaml",
1852    )
1853    custom_source: CustomCloudSourceDefinition = definition
1854
1855    if processed_manifest is not None:
1856        custom_source = definition.update_definition(
1857            manifest_yaml=processed_manifest,
1858            pre_validate=pre_validate,
1859        )
1860
1861    if testing_values_dict is not None:
1862        custom_source.set_testing_values(testing_values_dict)
1863
1864    return (
1865        "Successfully updated custom YAML source definition:\n"
1866        + _get_custom_source_definition_description(
1867            custom_source=custom_source,
1868        )
1869    )

Update a custom YAML source definition in Airbyte Cloud.

Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.

@mcp_tool(destructive=True, open_world=True)
def permanently_delete_custom_source_definition( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the custom source definition (for verification).')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1872@mcp_tool(
1873    destructive=True,
1874    open_world=True,
1875)
1876def permanently_delete_custom_source_definition(
1877    ctx: Context,
1878    definition_id: Annotated[
1879        str,
1880        Field(description="The ID of the custom source definition to delete."),
1881    ],
1882    name: Annotated[
1883        str,
1884        Field(description="The expected name of the custom source definition (for verification)."),
1885    ],
1886    *,
1887    workspace_id: Annotated[
1888        str | None,
1889        Field(
1890            description=WORKSPACE_ID_TIP_TEXT,
1891            default=None,
1892        ),
1893    ],
1894) -> str:
1895    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1896
1897    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1898    (case insensitive).
1899
1900    If the connector does not meet this requirement, the deletion will be rejected with a
1901    helpful error message. Instruct the user to rename the connector appropriately to authorize
1902    the deletion.
1903
1904    The provided name must match the actual name of the definition for the operation to proceed.
1905    This is a safety measure to ensure you are deleting the correct resource.
1906
1907    Note: Only YAML (declarative) connectors are currently supported.
1908    Docker-based custom sources are not yet available.
1909    """
1910    check_guid_created_in_session(definition_id)
1911    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1912    definition = workspace.get_custom_source_definition(
1913        definition_id=definition_id,
1914        definition_type="yaml",
1915    )
1916    actual_name: str = definition.name
1917
1918    # Verify the name matches
1919    if actual_name != name:
1920        raise PyAirbyteInputError(
1921            message=(
1922                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1923                "The provided name must exactly match the definition's actual name. "
1924                "This is a safety measure to prevent accidental deletion."
1925            ),
1926            context={
1927                "definition_id": definition_id,
1928                "expected_name": name,
1929                "actual_name": actual_name,
1930            },
1931        )
1932
1933    definition.permanently_delete(
1934        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1935    )
1936    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_source( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the source (for verification).')]) -> str:
1939@mcp_tool(
1940    destructive=True,
1941    open_world=True,
1942    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1943)
1944def permanently_delete_cloud_source(
1945    ctx: Context,
1946    source_id: Annotated[
1947        str,
1948        Field(description="The ID of the deployed source to delete."),
1949    ],
1950    name: Annotated[
1951        str,
1952        Field(description="The expected name of the source (for verification)."),
1953    ],
1954) -> str:
1955    """Permanently delete a deployed source connector from Airbyte Cloud.
1956
1957    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
1958    (case insensitive).
1959
1960    If the source does not meet this requirement, the deletion will be rejected with a
1961    helpful error message. Instruct the user to rename the source appropriately to authorize
1962    the deletion.
1963
1964    The provided name must match the actual name of the source for the operation to proceed.
1965    This is a safety measure to ensure you are deleting the correct resource.
1966    """
1967    check_guid_created_in_session(source_id)
1968    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
1969    source = workspace.get_source(source_id=source_id)
1970    actual_name: str = cast(str, source.name)
1971
1972    # Verify the name matches
1973    if actual_name != name:
1974        raise PyAirbyteInputError(
1975            message=(
1976                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1977                "The provided name must exactly match the source's actual name. "
1978                "This is a safety measure to prevent accidental deletion."
1979            ),
1980            context={
1981                "source_id": source_id,
1982                "expected_name": name,
1983                "actual_name": actual_name,
1984            },
1985        )
1986
1987    # Safe mode is hard-coded to True for extra protection when running in LLM agents
1988    workspace.permanently_delete_source(
1989        source=source_id,
1990        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
1991    )
1992    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.

The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_destination( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the destination (for verification).')]) -> str:
1995@mcp_tool(
1996    destructive=True,
1997    open_world=True,
1998    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1999)
2000def permanently_delete_cloud_destination(
2001    ctx: Context,
2002    destination_id: Annotated[
2003        str,
2004        Field(description="The ID of the deployed destination to delete."),
2005    ],
2006    name: Annotated[
2007        str,
2008        Field(description="The expected name of the destination (for verification)."),
2009    ],
2010) -> str:
2011    """Permanently delete a deployed destination connector from Airbyte Cloud.
2012
2013    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
2014    (case insensitive).
2015
2016    If the destination does not meet this requirement, the deletion will be rejected with a
2017    helpful error message. Instruct the user to rename the destination appropriately to authorize
2018    the deletion.
2019
2020    The provided name must match the actual name of the destination for the operation to proceed.
2021    This is a safety measure to ensure you are deleting the correct resource.
2022    """
2023    check_guid_created_in_session(destination_id)
2024    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2025    destination = workspace.get_destination(destination_id=destination_id)
2026    actual_name: str = cast(str, destination.name)
2027
2028    # Verify the name matches
2029    if actual_name != name:
2030        raise PyAirbyteInputError(
2031            message=(
2032                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2033                "The provided name must exactly match the destination's actual name. "
2034                "This is a safety measure to prevent accidental deletion."
2035            ),
2036            context={
2037                "destination_id": destination_id,
2038                "expected_name": name,
2039                "actual_name": actual_name,
2040            },
2041        )
2042
2043    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2044    workspace.permanently_delete_destination(
2045        destination=destination_id,
2046        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2047    )
2048    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.

The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the connection (for verification).')], *, cascade_delete_source: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the source connector associated with this connection.')] = False, cascade_delete_destination: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the destination connector associated with this connection.')] = False) -> str:
2051@mcp_tool(
2052    destructive=True,
2053    open_world=True,
2054    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2055)
2056def permanently_delete_cloud_connection(
2057    ctx: Context,
2058    connection_id: Annotated[
2059        str,
2060        Field(description="The ID of the connection to delete."),
2061    ],
2062    name: Annotated[
2063        str,
2064        Field(description="The expected name of the connection (for verification)."),
2065    ],
2066    *,
2067    cascade_delete_source: Annotated[
2068        bool,
2069        Field(
2070            description=(
2071                "Whether to also delete the source connector associated with this connection."
2072            ),
2073            default=False,
2074        ),
2075    ] = False,
2076    cascade_delete_destination: Annotated[
2077        bool,
2078        Field(
2079            description=(
2080                "Whether to also delete the destination connector associated with this connection."
2081            ),
2082            default=False,
2083        ),
2084    ] = False,
2085) -> str:
2086    """Permanently delete a connection from Airbyte Cloud.
2087
2088    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2089    (case insensitive).
2090
2091    If the connection does not meet this requirement, the deletion will be rejected with a
2092    helpful error message. Instruct the user to rename the connection appropriately to authorize
2093    the deletion.
2094
2095    The provided name must match the actual name of the connection for the operation to proceed.
2096    This is a safety measure to ensure you are deleting the correct resource.
2097    """
2098    check_guid_created_in_session(connection_id)
2099    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2100    connection = workspace.get_connection(connection_id=connection_id)
2101    actual_name: str = cast(str, connection.name)
2102
2103    # Verify the name matches
2104    if actual_name != name:
2105        raise PyAirbyteInputError(
2106            message=(
2107                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2108                "The provided name must exactly match the connection's actual name. "
2109                "This is a safety measure to prevent accidental deletion."
2110            ),
2111            context={
2112                "connection_id": connection_id,
2113                "expected_name": name,
2114                "actual_name": actual_name,
2115            },
2116        )
2117
2118    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2119    workspace.permanently_delete_connection(
2120        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2121        connection=connection_id,
2122        cascade_delete_source=cascade_delete_source,
2123        cascade_delete_destination=cascade_delete_destination,
2124    )
2125    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.

The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_source( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the source.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2128@mcp_tool(
2129    open_world=True,
2130    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2131)
2132def rename_cloud_source(
2133    ctx: Context,
2134    source_id: Annotated[
2135        str,
2136        Field(description="The ID of the deployed source to rename."),
2137    ],
2138    name: Annotated[
2139        str,
2140        Field(description="New name for the source."),
2141    ],
2142    *,
2143    workspace_id: Annotated[
2144        str | None,
2145        Field(
2146            description=WORKSPACE_ID_TIP_TEXT,
2147            default=None,
2148        ),
2149    ],
2150) -> str:
2151    """Rename a deployed source connector on Airbyte Cloud."""
2152    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2153    source = workspace.get_source(source_id=source_id)
2154    source.rename(name=name)
2155    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"

Rename a deployed source connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_source_config( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2158@mcp_tool(
2159    destructive=True,
2160    open_world=True,
2161    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2162)
2163def update_cloud_source_config(
2164    ctx: Context,
2165    source_id: Annotated[
2166        str,
2167        Field(description="The ID of the deployed source to update."),
2168    ],
2169    config: Annotated[
2170        dict | str,
2171        Field(
2172            description="New configuration for the source connector.",
2173        ),
2174    ],
2175    config_secret_name: Annotated[
2176        str | None,
2177        Field(
2178            description="The name of the secret containing the configuration.",
2179            default=None,
2180        ),
2181    ] = None,
2182    *,
2183    workspace_id: Annotated[
2184        str | None,
2185        Field(
2186            description=WORKSPACE_ID_TIP_TEXT,
2187            default=None,
2188        ),
2189    ],
2190) -> str:
2191    """Update a deployed source connector's configuration on Airbyte Cloud.
2192
2193    This is a destructive operation that can break existing connections if the
2194    configuration is changed incorrectly. Use with caution.
2195    """
2196    check_guid_created_in_session(source_id)
2197    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2198    source = workspace.get_source(source_id=source_id)
2199
2200    config_dict = resolve_connector_config(
2201        config=config,
2202        config_secret_name=config_secret_name,
2203        config_spec_jsonschema=None,  # We don't have the spec here
2204    )
2205
2206    source.update_config(config=config_dict)
2207    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_destination( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2210@mcp_tool(
2211    open_world=True,
2212    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2213)
2214def rename_cloud_destination(
2215    ctx: Context,
2216    destination_id: Annotated[
2217        str,
2218        Field(description="The ID of the deployed destination to rename."),
2219    ],
2220    name: Annotated[
2221        str,
2222        Field(description="New name for the destination."),
2223    ],
2224    *,
2225    workspace_id: Annotated[
2226        str | None,
2227        Field(
2228            description=WORKSPACE_ID_TIP_TEXT,
2229            default=None,
2230        ),
2231    ],
2232) -> str:
2233    """Rename a deployed destination connector on Airbyte Cloud."""
2234    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2235    destination = workspace.get_destination(destination_id=destination_id)
2236    destination.rename(name=name)
2237    return (
2238        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2239        f"URL: {destination.connector_url}"
2240    )

Rename a deployed destination connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_destination_config( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2243@mcp_tool(
2244    destructive=True,
2245    open_world=True,
2246    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2247)
2248def update_cloud_destination_config(
2249    ctx: Context,
2250    destination_id: Annotated[
2251        str,
2252        Field(description="The ID of the deployed destination to update."),
2253    ],
2254    config: Annotated[
2255        dict | str,
2256        Field(
2257            description="New configuration for the destination connector.",
2258        ),
2259    ],
2260    config_secret_name: Annotated[
2261        str | None,
2262        Field(
2263            description="The name of the secret containing the configuration.",
2264            default=None,
2265        ),
2266    ],
2267    *,
2268    workspace_id: Annotated[
2269        str | None,
2270        Field(
2271            description=WORKSPACE_ID_TIP_TEXT,
2272            default=None,
2273        ),
2274    ],
2275) -> str:
2276    """Update a deployed destination connector's configuration on Airbyte Cloud.
2277
2278    This is a destructive operation that can break existing connections if the
2279    configuration is changed incorrectly. Use with caution.
2280    """
2281    check_guid_created_in_session(destination_id)
2282    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2283    destination = workspace.get_destination(destination_id=destination_id)
2284
2285    config_dict = resolve_connector_config(
2286        config=config,
2287        config_secret_name=config_secret_name,
2288        config_spec_jsonschema=None,  # We don't have the spec here
2289    )
2290
2291    destination.update_config(config=config_dict)
2292    return (
2293        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2294    )

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2297@mcp_tool(
2298    open_world=True,
2299    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2300)
2301def rename_cloud_connection(
2302    ctx: Context,
2303    connection_id: Annotated[
2304        str,
2305        Field(description="The ID of the connection to rename."),
2306    ],
2307    name: Annotated[
2308        str,
2309        Field(description="New name for the connection."),
2310    ],
2311    *,
2312    workspace_id: Annotated[
2313        str | None,
2314        Field(
2315            description=WORKSPACE_ID_TIP_TEXT,
2316            default=None,
2317        ),
2318    ],
2319) -> str:
2320    """Rename a connection on Airbyte Cloud."""
2321    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2322    connection = workspace.get_connection(connection_id=connection_id)
2323    connection.rename(name=name)
2324    return (
2325        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2326        f"URL: {connection.connection_url}"
2327    )

Rename a connection on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_table_prefix( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], prefix: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New table prefix to use when syncing to the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2330@mcp_tool(
2331    destructive=True,
2332    open_world=True,
2333    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2334)
2335def set_cloud_connection_table_prefix(
2336    ctx: Context,
2337    connection_id: Annotated[
2338        str,
2339        Field(description="The ID of the connection to update."),
2340    ],
2341    prefix: Annotated[
2342        str,
2343        Field(description="New table prefix to use when syncing to the destination."),
2344    ],
2345    *,
2346    workspace_id: Annotated[
2347        str | None,
2348        Field(
2349            description=WORKSPACE_ID_TIP_TEXT,
2350            default=None,
2351        ),
2352    ],
2353) -> str:
2354    """Set the table prefix for a connection on Airbyte Cloud.
2355
2356    This is a destructive operation that can break downstream dependencies if the
2357    table prefix is changed incorrectly. Use with caution.
2358    """
2359    check_guid_created_in_session(connection_id)
2360    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2361    connection = workspace.get_connection(connection_id=connection_id)
2362    connection.set_table_prefix(prefix=prefix)
2363    return (
2364        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2365        f"URL: {connection.connection_url}"
2366    )

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_selected_streams( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], stream_names: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description='The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2369@mcp_tool(
2370    destructive=True,
2371    open_world=True,
2372    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2373)
2374def set_cloud_connection_selected_streams(
2375    ctx: Context,
2376    connection_id: Annotated[
2377        str,
2378        Field(description="The ID of the connection to update."),
2379    ],
2380    stream_names: Annotated[
2381        str | list[str],
2382        Field(
2383            description=(
2384                "The selected stream names to sync within the connection. "
2385                "Must be an explicit stream name or list of streams."
2386            )
2387        ),
2388    ],
2389    *,
2390    workspace_id: Annotated[
2391        str | None,
2392        Field(
2393            description=WORKSPACE_ID_TIP_TEXT,
2394            default=None,
2395        ),
2396    ],
2397) -> str:
2398    """Set the selected streams for a connection on Airbyte Cloud.
2399
2400    This is a destructive operation that can break existing connections if the
2401    stream selection is changed incorrectly. Use with caution.
2402    """
2403    check_guid_created_in_session(connection_id)
2404    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2405    connection = workspace.get_connection(connection_id=connection_id)
2406
2407    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2408    connection.set_selected_streams(stream_names=resolved_streams_list)
2409
2410    return (
2411        f"Successfully set selected streams for connection '{connection_id}' "
2412        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2413    )

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, destructive=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], *, enabled: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status.")], cron_expression: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'.")], manual_schedule: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'.")], workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2416@mcp_tool(
2417    open_world=True,
2418    destructive=True,
2419    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2420)
2421def update_cloud_connection(
2422    ctx: Context,
2423    connection_id: Annotated[
2424        str,
2425        Field(description="The ID of the connection to update."),
2426    ],
2427    *,
2428    enabled: Annotated[
2429        bool | None,
2430        Field(
2431            description=(
2432                "Set the connection's enabled status. "
2433                "True enables the connection (status='active'), "
2434                "False disables it (status='inactive'). "
2435                "Leave unset to keep the current status."
2436            ),
2437            default=None,
2438        ),
2439    ],
2440    cron_expression: Annotated[
2441        str | None,
2442        Field(
2443            description=(
2444                "A cron expression defining when syncs should run. "
2445                "Examples: '0 0 * * *' (daily at midnight UTC), "
2446                "'0 */6 * * *' (every 6 hours), "
2447                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2448                "Leave unset to keep the current schedule. "
2449                "Cannot be used together with 'manual_schedule'."
2450            ),
2451            default=None,
2452        ),
2453    ],
2454    manual_schedule: Annotated[
2455        bool | None,
2456        Field(
2457            description=(
2458                "Set to True to disable automatic syncs (manual scheduling only). "
2459                "Syncs will only run when manually triggered. "
2460                "Cannot be used together with 'cron_expression'."
2461            ),
2462            default=None,
2463        ),
2464    ],
2465    workspace_id: Annotated[
2466        str | None,
2467        Field(
2468            description=WORKSPACE_ID_TIP_TEXT,
2469            default=None,
2470        ),
2471    ],
2472) -> str:
2473    """Update a connection's settings on Airbyte Cloud.
2474
2475    This tool allows updating multiple connection settings in a single call:
2476    - Enable or disable the connection
2477    - Set a cron schedule for automatic syncs
2478    - Switch to manual scheduling (no automatic syncs)
2479
2480    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2481    parameters are mutually exclusive.
2482    """
2483    check_guid_created_in_session(connection_id)
2484
2485    # Validate that at least one setting is provided
2486    if enabled is None and cron_expression is None and manual_schedule is None:
2487        raise ValueError(
2488            "At least one setting must be provided: 'enabled', 'cron_expression', "
2489            "or 'manual_schedule'."
2490        )
2491
2492    # Validate mutually exclusive schedule options
2493    if cron_expression is not None and manual_schedule is True:
2494        raise ValueError(
2495            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2496            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2497            "for manual-only syncs."
2498        )
2499
2500    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2501    connection = workspace.get_connection(connection_id=connection_id)
2502
2503    changes_made: list[str] = []
2504
2505    # Apply enabled status change
2506    if enabled is not None:
2507        connection.set_enabled(enabled=enabled)
2508        status_str = "enabled" if enabled else "disabled"
2509        changes_made.append(f"status set to '{status_str}'")
2510
2511    # Apply schedule change
2512    if cron_expression is not None:
2513        connection.set_schedule(cron_expression=cron_expression)
2514        changes_made.append(f"schedule set to '{cron_expression}'")
2515    elif manual_schedule is True:
2516        connection.set_manual_schedule()
2517        changes_made.append("schedule set to 'manual'")
2518
2519    changes_summary = ", ".join(changes_made)
2520    return (
2521        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2522        f"URL: {connection.connection_url}"
2523    )

Update a connection's settings on Airbyte Cloud.

This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)

At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_connection_artifact( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], artifact_type: Annotated[Literal['state', 'catalog'], FieldInfo(annotation=NoneType, required=True, description="The type of artifact to retrieve: 'state' or 'catalog'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any]:
2526@mcp_tool(
2527    read_only=True,
2528    idempotent=True,
2529    open_world=True,
2530    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2531)
2532def get_connection_artifact(
2533    ctx: Context,
2534    connection_id: Annotated[
2535        str,
2536        Field(description="The ID of the Airbyte Cloud connection."),
2537    ],
2538    artifact_type: Annotated[
2539        Literal["state", "catalog"],
2540        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2541    ],
2542    *,
2543    workspace_id: Annotated[
2544        str | None,
2545        Field(
2546            description=WORKSPACE_ID_TIP_TEXT,
2547            default=None,
2548        ),
2549    ],
2550) -> dict[str, Any]:
2551    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2552
2553    Retrieves the specified artifact for a connection:
2554    - 'state': Returns the full raw connection state including stateType and all
2555      state data, or {"ERROR": "..."} if no state is set.
2556    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2557      or {"ERROR": "..."} if not found.
2558    """
2559    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2560    connection = workspace.get_connection(connection_id=connection_id)
2561
2562    if artifact_type == "state":
2563        result = connection.dump_raw_state()
2564        if result.get("stateType") == "not_set":
2565            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2566        return result
2567
2568    # artifact_type == "catalog"
2569    result = connection.dump_raw_catalog()
2570    if result is None:
2571        return {"ERROR": "No catalog found for this connection"}
2572    return result

Get a connection artifact (state or catalog) from Airbyte Cloud.

Retrieves the specified artifact for a connection:
- 'state': Returns the full raw connection state including stateType and all
  state data, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
  or {"ERROR": "..."} if not found.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

def register_cloud_tools(app: fastmcp.server.server.FastMCP) -> None:
2575def register_cloud_tools(app: FastMCP) -> None:
2576    """Register cloud tools with the FastMCP app.
2577
2578    Args:
2579        app: FastMCP application instance
2580    """
2581    register_mcp_tools(
2582        app,
2583        mcp_module=__name__,
2584        exclude_args=["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None,
2585    )

Register cloud tools with the FastMCP app.

Arguments:
  • app: FastMCP application instance