airbyte.mcp.cloud

Airbyte Cloud MCP operations.

   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""Airbyte Cloud MCP operations."""
   3
   4from pathlib import Path
   5from typing import Annotated, Any, Literal, cast
   6
   7from airbyte_api.models import JobTypeEnum
   8from fastmcp import Context, FastMCP
   9from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools
  10from pydantic import BaseModel, Field
  11
  12from airbyte import cloud, get_destination, get_source
  13from airbyte._util import api_util
  14from airbyte.cloud.connectors import CustomCloudSourceDefinition
  15from airbyte.cloud.constants import FAILED_STATUSES
  16from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace
  17from airbyte.constants import (
  18    MCP_CONFIG_API_URL,
  19    MCP_CONFIG_BEARER_TOKEN,
  20    MCP_CONFIG_CLIENT_ID,
  21    MCP_CONFIG_CLIENT_SECRET,
  22    MCP_CONFIG_WORKSPACE_ID,
  23)
  24from airbyte.destinations.util import get_noop_destination
  25from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
  26from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings
  27from airbyte.mcp._tool_utils import (
  28    AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET,
  29    check_guid_created_in_session,
  30    register_guid_created_in_session,
  31)
  32from airbyte.secrets import SecretString
  33
  34
  35CLOUD_AUTH_TIP_TEXT = (
  36    "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, "
  37    "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables "
  38    "will be used to authenticate with the Airbyte Cloud API."
  39)
  40WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var."
  41
  42
  43class CloudSourceResult(BaseModel):
  44    """Information about a deployed source connector in Airbyte Cloud."""
  45
  46    id: str
  47    """The source ID."""
  48    name: str
  49    """Display name of the source."""
  50    url: str
  51    """Web URL for managing this source in Airbyte Cloud."""
  52
  53
  54class CloudDestinationResult(BaseModel):
  55    """Information about a deployed destination connector in Airbyte Cloud."""
  56
  57    id: str
  58    """The destination ID."""
  59    name: str
  60    """Display name of the destination."""
  61    url: str
  62    """Web URL for managing this destination in Airbyte Cloud."""
  63
  64
  65class CloudConnectionResult(BaseModel):
  66    """Information about a deployed connection in Airbyte Cloud."""
  67
  68    id: str
  69    """The connection ID."""
  70    name: str
  71    """Display name of the connection."""
  72    url: str
  73    """Web URL for managing this connection in Airbyte Cloud."""
  74    source_id: str
  75    """ID of the source used by this connection."""
  76    destination_id: str
  77    """ID of the destination used by this connection."""
  78    last_job_status: str | None = None
  79    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
  80    Only populated when with_connection_status=True."""
  81    last_job_id: int | None = None
  82    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
  83    last_job_time: str | None = None
  84    """ISO 8601 timestamp of the most recent completed sync.
  85    Only populated when with_connection_status=True."""
  86    currently_running_job_id: int | None = None
  87    """Job ID of a currently running sync, if any.
  88    Only populated when with_connection_status=True."""
  89    currently_running_job_start_time: str | None = None
  90    """ISO 8601 timestamp of when the currently running sync started.
  91    Only populated when with_connection_status=True."""
  92
  93
  94class CloudSourceDetails(BaseModel):
  95    """Detailed information about a deployed source connector in Airbyte Cloud."""
  96
  97    source_id: str
  98    """The source ID."""
  99    source_name: str
 100    """Display name of the source."""
 101    source_url: str
 102    """Web URL for managing this source in Airbyte Cloud."""
 103    connector_definition_id: str
 104    """The connector definition ID (e.g., the ID for 'source-postgres')."""
 105
 106
 107class CloudDestinationDetails(BaseModel):
 108    """Detailed information about a deployed destination connector in Airbyte Cloud."""
 109
 110    destination_id: str
 111    """The destination ID."""
 112    destination_name: str
 113    """Display name of the destination."""
 114    destination_url: str
 115    """Web URL for managing this destination in Airbyte Cloud."""
 116    connector_definition_id: str
 117    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
 118
 119
 120class CloudConnectionDetails(BaseModel):
 121    """Detailed information about a deployed connection in Airbyte Cloud."""
 122
 123    connection_id: str
 124    """The connection ID."""
 125    connection_name: str
 126    """Display name of the connection."""
 127    connection_url: str
 128    """Web URL for managing this connection in Airbyte Cloud."""
 129    source_id: str
 130    """ID of the source used by this connection."""
 131    source_name: str
 132    """Display name of the source."""
 133    destination_id: str
 134    """ID of the destination used by this connection."""
 135    destination_name: str
 136    """Display name of the destination."""
 137    selected_streams: list[str]
 138    """List of stream names selected for syncing."""
 139    table_prefix: str | None
 140    """Table prefix applied when syncing to the destination."""
 141
 142
 143class CloudOrganizationResult(BaseModel):
 144    """Information about an organization in Airbyte Cloud."""
 145
 146    id: str
 147    """The organization ID."""
 148    name: str
 149    """Display name of the organization."""
 150    email: str
 151    """Email associated with the organization."""
 152    payment_status: str | None = None
 153    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 154    When 'disabled', syncs are blocked due to unpaid invoices."""
 155    subscription_status: str | None = None
 156    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 157    'unsubscribed')."""
 158    is_account_locked: bool = False
 159    """Whether the account is locked due to billing issues.
 160    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 161    Defaults to False unless we have affirmative evidence of a locked state."""
 162
 163
 164class CloudWorkspaceResult(BaseModel):
 165    """Information about a workspace in Airbyte Cloud."""
 166
 167    workspace_id: str
 168    """The workspace ID."""
 169    workspace_name: str
 170    """Display name of the workspace."""
 171    workspace_url: str | None = None
 172    """URL to access the workspace in Airbyte Cloud."""
 173    organization_id: str
 174    """ID of the organization (requires ORGANIZATION_READER permission)."""
 175    organization_name: str | None = None
 176    """Name of the organization (requires ORGANIZATION_READER permission)."""
 177    payment_status: str | None = None
 178    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
 179    When 'disabled', syncs are blocked due to unpaid invoices.
 180    Requires ORGANIZATION_READER permission."""
 181    subscription_status: str | None = None
 182    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
 183    'unsubscribed'). Requires ORGANIZATION_READER permission."""
 184    is_account_locked: bool = False
 185    """Whether the account is locked due to billing issues.
 186    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
 187    Defaults to False unless we have affirmative evidence of a locked state.
 188    Requires ORGANIZATION_READER permission."""
 189
 190
 191class LogReadResult(BaseModel):
 192    """Result of reading sync logs with pagination support."""
 193
 194    job_id: int
 195    """The job ID the logs belong to."""
 196    attempt_number: int
 197    """The attempt number the logs belong to."""
 198    log_text: str
 199    """The string containing the log text we are returning."""
 200    log_text_start_line: int
 201    """1-based line index of the first line returned."""
 202    log_text_line_count: int
 203    """Count of lines we are returning."""
 204    total_log_lines_available: int
 205    """Total number of log lines available, shows if any lines were missed due to the limit."""
 206
 207
 208class SyncJobResult(BaseModel):
 209    """Information about a sync job."""
 210
 211    job_id: int
 212    """The job ID."""
 213    status: str
 214    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
 215    bytes_synced: int
 216    """Number of bytes synced in this job."""
 217    records_synced: int
 218    """Number of records synced in this job."""
 219    start_time: str
 220    """ISO 8601 timestamp of when the job started."""
 221    job_url: str
 222    """URL to view the job in Airbyte Cloud."""
 223
 224
 225class SyncJobListResult(BaseModel):
 226    """Result of listing sync jobs with pagination support."""
 227
 228    jobs: list[SyncJobResult]
 229    """List of sync jobs."""
 230    jobs_count: int
 231    """Number of jobs returned in this response."""
 232    jobs_offset: int
 233    """Offset used for this request (0 if not specified)."""
 234    from_tail: bool
 235    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
 236
 237
 238def _get_cloud_workspace(
 239    ctx: Context,
 240    workspace_id: str | None = None,
 241) -> CloudWorkspace:
 242    """Get an authenticated CloudWorkspace.
 243
 244    Resolves credentials from multiple sources via MCP config args in order:
 245    1. HTTP headers (when running as MCP server with HTTP/SSE transport)
 246    2. Environment variables
 247
 248    The ctx parameter provides access to MCP config values that are resolved
 249    from HTTP headers or environment variables based on the config args
 250    defined in server.py.
 251    """
 252    resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID)
 253    if not resolved_workspace_id:
 254        raise PyAirbyteInputError(
 255            message="Workspace ID is required but not provided.",
 256            guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.",
 257        )
 258
 259    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
 260    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
 261    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
 262    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
 263
 264    return CloudWorkspace(
 265        workspace_id=resolved_workspace_id,
 266        client_id=SecretString(client_id) if client_id else None,
 267        client_secret=SecretString(client_secret) if client_secret else None,
 268        bearer_token=SecretString(bearer_token) if bearer_token else None,
 269        api_root=api_url,
 270    )
 271
 272
 273@mcp_tool(
 274    open_world=True,
 275    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 276)
 277def deploy_source_to_cloud(
 278    ctx: Context,
 279    source_name: Annotated[
 280        str,
 281        Field(description="The name to use when deploying the source."),
 282    ],
 283    source_connector_name: Annotated[
 284        str,
 285        Field(description="The name of the source connector (e.g., 'source-faker')."),
 286    ],
 287    *,
 288    workspace_id: Annotated[
 289        str | None,
 290        Field(
 291            description=WORKSPACE_ID_TIP_TEXT,
 292            default=None,
 293        ),
 294    ],
 295    config: Annotated[
 296        dict | str | None,
 297        Field(
 298            description="The configuration for the source connector.",
 299            default=None,
 300        ),
 301    ],
 302    config_secret_name: Annotated[
 303        str | None,
 304        Field(
 305            description="The name of the secret containing the configuration.",
 306            default=None,
 307        ),
 308    ],
 309    unique: Annotated[
 310        bool,
 311        Field(
 312            description="Whether to require a unique name.",
 313            default=True,
 314        ),
 315    ],
 316) -> str:
 317    """Deploy a source connector to Airbyte Cloud."""
 318    source = get_source(
 319        source_connector_name,
 320        no_executor=True,
 321    )
 322    config_dict = resolve_connector_config(
 323        config=config,
 324        config_secret_name=config_secret_name,
 325        config_spec_jsonschema=source.config_spec,
 326    )
 327    source.set_config(config_dict, validate=True)
 328
 329    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 330    deployed_source = workspace.deploy_source(
 331        name=source_name,
 332        source=source,
 333        unique=unique,
 334    )
 335
 336    register_guid_created_in_session(deployed_source.connector_id)
 337    return (
 338        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
 339        f" and URL: {deployed_source.connector_url}"
 340    )
 341
 342
 343@mcp_tool(
 344    open_world=True,
 345    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 346)
 347def deploy_destination_to_cloud(
 348    ctx: Context,
 349    destination_name: Annotated[
 350        str,
 351        Field(description="The name to use when deploying the destination."),
 352    ],
 353    destination_connector_name: Annotated[
 354        str,
 355        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
 356    ],
 357    *,
 358    workspace_id: Annotated[
 359        str | None,
 360        Field(
 361            description=WORKSPACE_ID_TIP_TEXT,
 362            default=None,
 363        ),
 364    ],
 365    config: Annotated[
 366        dict | str | None,
 367        Field(
 368            description="The configuration for the destination connector.",
 369            default=None,
 370        ),
 371    ],
 372    config_secret_name: Annotated[
 373        str | None,
 374        Field(
 375            description="The name of the secret containing the configuration.",
 376            default=None,
 377        ),
 378    ],
 379    unique: Annotated[
 380        bool,
 381        Field(
 382            description="Whether to require a unique name.",
 383            default=True,
 384        ),
 385    ],
 386) -> str:
 387    """Deploy a destination connector to Airbyte Cloud."""
 388    destination = get_destination(
 389        destination_connector_name,
 390        no_executor=True,
 391    )
 392    config_dict = resolve_connector_config(
 393        config=config,
 394        config_secret_name=config_secret_name,
 395        config_spec_jsonschema=destination.config_spec,
 396    )
 397    destination.set_config(config_dict, validate=True)
 398
 399    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 400    deployed_destination = workspace.deploy_destination(
 401        name=destination_name,
 402        destination=destination,
 403        unique=unique,
 404    )
 405
 406    register_guid_created_in_session(deployed_destination.connector_id)
 407    return (
 408        f"Successfully deployed destination '{destination_name}' "
 409        f"with ID: {deployed_destination.connector_id}"
 410    )
 411
 412
 413@mcp_tool(
 414    open_world=True,
 415    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 416)
 417def create_connection_on_cloud(
 418    ctx: Context,
 419    connection_name: Annotated[
 420        str,
 421        Field(description="The name of the connection."),
 422    ],
 423    source_id: Annotated[
 424        str,
 425        Field(description="The ID of the deployed source."),
 426    ],
 427    destination_id: Annotated[
 428        str,
 429        Field(description="The ID of the deployed destination."),
 430    ],
 431    selected_streams: Annotated[
 432        str | list[str],
 433        Field(
 434            description=(
 435                "The selected stream names to sync within the connection. "
 436                "Must be an explicit stream name or list of streams. "
 437                "Cannot be empty or '*'."
 438            )
 439        ),
 440    ],
 441    *,
 442    workspace_id: Annotated[
 443        str | None,
 444        Field(
 445            description=WORKSPACE_ID_TIP_TEXT,
 446            default=None,
 447        ),
 448    ],
 449    table_prefix: Annotated[
 450        str | None,
 451        Field(
 452            description="Optional table prefix to use when syncing to the destination.",
 453            default=None,
 454        ),
 455    ],
 456) -> str:
 457    """Create a connection between a deployed source and destination on Airbyte Cloud."""
 458    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
 459    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 460    deployed_connection = workspace.deploy_connection(
 461        connection_name=connection_name,
 462        source=source_id,
 463        destination=destination_id,
 464        selected_streams=resolved_streams_list,
 465        table_prefix=table_prefix,
 466    )
 467
 468    register_guid_created_in_session(deployed_connection.connection_id)
 469    return (
 470        f"Successfully created connection '{connection_name}' "
 471        f"with ID '{deployed_connection.connection_id}' and "
 472        f"URL: {deployed_connection.connection_url}"
 473    )
 474
 475
 476@mcp_tool(
 477    open_world=True,
 478    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 479)
 480def run_cloud_sync(
 481    ctx: Context,
 482    connection_id: Annotated[
 483        str,
 484        Field(description="The ID of the Airbyte Cloud connection."),
 485    ],
 486    *,
 487    workspace_id: Annotated[
 488        str | None,
 489        Field(
 490            description=WORKSPACE_ID_TIP_TEXT,
 491            default=None,
 492        ),
 493    ],
 494    wait: Annotated[
 495        bool,
 496        Field(
 497            description=(
 498                "Whether to wait for the sync to complete. Since a sync can take between several "
 499                "minutes and several hours, this option is not recommended for most "
 500                "scenarios."
 501            ),
 502            default=False,
 503        ),
 504    ],
 505    wait_timeout: Annotated[
 506        int,
 507        Field(
 508            description="Maximum time to wait for sync completion (seconds).",
 509            default=300,
 510        ),
 511    ],
 512) -> str:
 513    """Run a sync job on Airbyte Cloud."""
 514    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 515    connection = workspace.get_connection(connection_id=connection_id)
 516    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
 517
 518    if wait:
 519        status = sync_result.get_job_status()
 520        return (
 521            f"Sync completed with status: {status}. "
 522            f"Job ID is '{sync_result.job_id}' and "
 523            f"job URL is: {sync_result.job_url}"
 524        )
 525    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
 526
 527
 528@mcp_tool(
 529    read_only=True,
 530    idempotent=True,
 531    open_world=True,
 532    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 533)
 534def check_airbyte_cloud_workspace(
 535    ctx: Context,
 536    *,
 537    workspace_id: Annotated[
 538        str | None,
 539        Field(
 540            description=WORKSPACE_ID_TIP_TEXT,
 541            default=None,
 542        ),
 543    ],
 544) -> CloudWorkspaceResult:
 545    """Check if we have a valid Airbyte Cloud connection and return workspace info.
 546
 547    Returns workspace details including workspace ID, name, organization info, and billing status.
 548    """
 549    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 550
 551    # Get workspace details from the public API using workspace's credentials
 552    workspace_response = api_util.get_workspace(
 553        workspace_id=workspace.workspace_id,
 554        api_root=workspace.api_root,
 555        client_id=workspace.client_id,
 556        client_secret=workspace.client_secret,
 557        bearer_token=workspace.bearer_token,
 558    )
 559
 560    # Try to get organization info (including billing), but fail gracefully if we don't have
 561    # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the
 562    # organization, which may not be available with workspace-scoped credentials.
 563    organization = workspace.get_organization(raise_on_error=False)
 564
 565    return CloudWorkspaceResult(
 566        workspace_id=workspace_response.workspace_id,
 567        workspace_name=workspace_response.name,
 568        workspace_url=workspace.workspace_url,
 569        organization_id=(
 570            organization.organization_id
 571            if organization
 572            else "[unavailable - requires ORGANIZATION_READER permission]"
 573        ),
 574        organization_name=organization.organization_name if organization else None,
 575        payment_status=organization.payment_status if organization else None,
 576        subscription_status=organization.subscription_status if organization else None,
 577        is_account_locked=organization.is_account_locked if organization else False,
 578    )
 579
 580
 581@mcp_tool(
 582    open_world=True,
 583    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 584)
 585def deploy_noop_destination_to_cloud(
 586    ctx: Context,
 587    name: str = "No-op Destination",
 588    *,
 589    workspace_id: Annotated[
 590        str | None,
 591        Field(
 592            description=WORKSPACE_ID_TIP_TEXT,
 593            default=None,
 594        ),
 595    ],
 596    unique: bool = True,
 597) -> str:
 598    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
 599    destination = get_noop_destination()
 600    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 601    deployed_destination = workspace.deploy_destination(
 602        name=name,
 603        destination=destination,
 604        unique=unique,
 605    )
 606    register_guid_created_in_session(deployed_destination.connector_id)
 607    return (
 608        f"Successfully deployed No-op Destination "
 609        f"with ID '{deployed_destination.connector_id}' and "
 610        f"URL: {deployed_destination.connector_url}"
 611    )
 612
 613
 614@mcp_tool(
 615    read_only=True,
 616    idempotent=True,
 617    open_world=True,
 618    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 619)
 620def get_cloud_sync_status(
 621    ctx: Context,
 622    connection_id: Annotated[
 623        str,
 624        Field(
 625            description="The ID of the Airbyte Cloud connection.",
 626        ),
 627    ],
 628    job_id: Annotated[
 629        int | None,
 630        Field(
 631            description="Optional job ID. If not provided, the latest job will be used.",
 632            default=None,
 633        ),
 634    ],
 635    *,
 636    workspace_id: Annotated[
 637        str | None,
 638        Field(
 639            description=WORKSPACE_ID_TIP_TEXT,
 640            default=None,
 641        ),
 642    ],
 643    include_attempts: Annotated[
 644        bool,
 645        Field(
 646            description="Whether to include detailed attempts information.",
 647            default=False,
 648        ),
 649    ],
 650) -> dict[str, Any]:
 651    """Get the status of a sync job from the Airbyte Cloud."""
 652    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 653    connection = workspace.get_connection(connection_id=connection_id)
 654
 655    # If a job ID is provided, get the job by ID.
 656    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
 657
 658    if not sync_result:
 659        return {"status": None, "job_id": None, "attempts": []}
 660
 661    result = {
 662        "status": sync_result.get_job_status(),
 663        "job_id": sync_result.job_id,
 664        "bytes_synced": sync_result.bytes_synced,
 665        "records_synced": sync_result.records_synced,
 666        "start_time": sync_result.start_time.isoformat(),
 667        "job_url": sync_result.job_url,
 668        "attempts": [],
 669    }
 670
 671    if include_attempts:
 672        attempts = sync_result.get_attempts()
 673        result["attempts"] = [
 674            {
 675                "attempt_number": attempt.attempt_number,
 676                "attempt_id": attempt.attempt_id,
 677                "status": attempt.status,
 678                "bytes_synced": attempt.bytes_synced,
 679                "records_synced": attempt.records_synced,
 680                "created_at": attempt.created_at.isoformat(),
 681            }
 682            for attempt in attempts
 683        ]
 684
 685    return result
 686
 687
 688@mcp_tool(
 689    read_only=True,
 690    idempotent=True,
 691    open_world=True,
 692    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 693)
 694def list_cloud_sync_jobs(
 695    ctx: Context,
 696    connection_id: Annotated[
 697        str,
 698        Field(description="The ID of the Airbyte Cloud connection."),
 699    ],
 700    *,
 701    workspace_id: Annotated[
 702        str | None,
 703        Field(
 704            description=WORKSPACE_ID_TIP_TEXT,
 705            default=None,
 706        ),
 707    ],
 708    max_jobs: Annotated[
 709        int,
 710        Field(
 711            description=(
 712                "Maximum number of jobs to return. "
 713                "Defaults to 20 if not specified. "
 714                "Maximum allowed value is 500."
 715            ),
 716            default=20,
 717        ),
 718    ],
 719    from_tail: Annotated[
 720        bool | None,
 721        Field(
 722            description=(
 723                "When True, jobs are ordered newest-first (createdAt DESC). "
 724                "When False, jobs are ordered oldest-first (createdAt ASC). "
 725                "Defaults to True if `jobs_offset` is not specified. "
 726                "Cannot combine `from_tail=True` with `jobs_offset`."
 727            ),
 728            default=None,
 729        ),
 730    ],
 731    jobs_offset: Annotated[
 732        int | None,
 733        Field(
 734            description=(
 735                "Number of jobs to skip from the beginning. "
 736                "Cannot be combined with `from_tail=True`."
 737            ),
 738            default=None,
 739        ),
 740    ],
 741    job_type: Annotated[
 742        JobTypeEnum | None,
 743        Field(
 744            description=(
 745                "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. "
 746                "If not specified, defaults to sync and reset jobs only (API default). "
 747                "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
 748            ),
 749            default=None,
 750        ),
 751    ],
 752) -> SyncJobListResult:
 753    """List sync jobs for a connection with pagination support.
 754
 755    This tool allows you to retrieve a list of sync jobs for a connection,
 756    with control over ordering and pagination. By default, jobs are returned
 757    newest-first (from_tail=True).
 758    """
 759    # Validate that jobs_offset and from_tail are not both set
 760    if jobs_offset is not None and from_tail is True:
 761        raise PyAirbyteInputError(
 762            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
 763            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
 764        )
 765
 766    # Default to from_tail=True if neither is specified
 767    if from_tail is None and jobs_offset is None:
 768        from_tail = True
 769    elif from_tail is None:
 770        from_tail = False
 771
 772    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 773    connection = workspace.get_connection(connection_id=connection_id)
 774
 775    # Cap at 500 to avoid overloading agent context
 776    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
 777
 778    sync_results = connection.get_previous_sync_logs(
 779        limit=effective_limit,
 780        offset=jobs_offset,
 781        from_tail=from_tail,
 782        job_type=job_type,
 783    )
 784
 785    jobs = [
 786        SyncJobResult(
 787            job_id=sync_result.job_id,
 788            status=str(sync_result.get_job_status()),
 789            bytes_synced=sync_result.bytes_synced,
 790            records_synced=sync_result.records_synced,
 791            start_time=sync_result.start_time.isoformat(),
 792            job_url=sync_result.job_url,
 793        )
 794        for sync_result in sync_results
 795    ]
 796
 797    return SyncJobListResult(
 798        jobs=jobs,
 799        jobs_count=len(jobs),
 800        jobs_offset=jobs_offset or 0,
 801        from_tail=from_tail,
 802    )
 803
 804
 805@mcp_tool(
 806    read_only=True,
 807    idempotent=True,
 808    open_world=True,
 809    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 810)
 811def list_deployed_cloud_source_connectors(
 812    ctx: Context,
 813    *,
 814    workspace_id: Annotated[
 815        str | None,
 816        Field(
 817            description=WORKSPACE_ID_TIP_TEXT,
 818            default=None,
 819        ),
 820    ],
 821    name_contains: Annotated[
 822        str | None,
 823        Field(
 824            description="Optional case-insensitive substring to filter sources by name",
 825            default=None,
 826        ),
 827    ],
 828    max_items_limit: Annotated[
 829        int | None,
 830        Field(
 831            description="Optional maximum number of items to return (default: no limit)",
 832            default=None,
 833        ),
 834    ],
 835) -> list[CloudSourceResult]:
 836    """List all deployed source connectors in the Airbyte Cloud workspace."""
 837    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 838    sources = workspace.list_sources()
 839
 840    # Filter by name if requested
 841    if name_contains:
 842        needle = name_contains.lower()
 843        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
 844
 845    # Apply limit if requested
 846    if max_items_limit is not None:
 847        sources = sources[:max_items_limit]
 848
 849    # Note: name and url are guaranteed non-null from list API responses
 850    return [
 851        CloudSourceResult(
 852            id=source.source_id,
 853            name=cast(str, source.name),
 854            url=cast(str, source.connector_url),
 855        )
 856        for source in sources
 857    ]
 858
 859
 860@mcp_tool(
 861    read_only=True,
 862    idempotent=True,
 863    open_world=True,
 864    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 865)
 866def list_deployed_cloud_destination_connectors(
 867    ctx: Context,
 868    *,
 869    workspace_id: Annotated[
 870        str | None,
 871        Field(
 872            description=WORKSPACE_ID_TIP_TEXT,
 873            default=None,
 874        ),
 875    ],
 876    name_contains: Annotated[
 877        str | None,
 878        Field(
 879            description="Optional case-insensitive substring to filter destinations by name",
 880            default=None,
 881        ),
 882    ],
 883    max_items_limit: Annotated[
 884        int | None,
 885        Field(
 886            description="Optional maximum number of items to return (default: no limit)",
 887            default=None,
 888        ),
 889    ],
 890) -> list[CloudDestinationResult]:
 891    """List all deployed destination connectors in the Airbyte Cloud workspace."""
 892    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 893    destinations = workspace.list_destinations()
 894
 895    # Filter by name if requested
 896    if name_contains:
 897        needle = name_contains.lower()
 898        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
 899
 900    # Apply limit if requested
 901    if max_items_limit is not None:
 902        destinations = destinations[:max_items_limit]
 903
 904    # Note: name and url are guaranteed non-null from list API responses
 905    return [
 906        CloudDestinationResult(
 907            id=destination.destination_id,
 908            name=cast(str, destination.name),
 909            url=cast(str, destination.connector_url),
 910        )
 911        for destination in destinations
 912    ]
 913
 914
 915@mcp_tool(
 916    read_only=True,
 917    idempotent=True,
 918    open_world=True,
 919    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 920)
 921def describe_cloud_source(
 922    ctx: Context,
 923    source_id: Annotated[
 924        str,
 925        Field(description="The ID of the source to describe."),
 926    ],
 927    *,
 928    workspace_id: Annotated[
 929        str | None,
 930        Field(
 931            description=WORKSPACE_ID_TIP_TEXT,
 932            default=None,
 933        ),
 934    ],
 935) -> CloudSourceDetails:
 936    """Get detailed information about a specific deployed source connector."""
 937    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 938    source = workspace.get_source(source_id=source_id)
 939
 940    # Access name property to ensure _connector_info is populated
 941    source_name = cast(str, source.name)
 942
 943    return CloudSourceDetails(
 944        source_id=source.source_id,
 945        source_name=source_name,
 946        source_url=source.connector_url,
 947        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 948    )
 949
 950
 951@mcp_tool(
 952    read_only=True,
 953    idempotent=True,
 954    open_world=True,
 955    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 956)
 957def describe_cloud_destination(
 958    ctx: Context,
 959    destination_id: Annotated[
 960        str,
 961        Field(description="The ID of the destination to describe."),
 962    ],
 963    *,
 964    workspace_id: Annotated[
 965        str | None,
 966        Field(
 967            description=WORKSPACE_ID_TIP_TEXT,
 968            default=None,
 969        ),
 970    ],
 971) -> CloudDestinationDetails:
 972    """Get detailed information about a specific deployed destination connector."""
 973    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
 974    destination = workspace.get_destination(destination_id=destination_id)
 975
 976    # Access name property to ensure _connector_info is populated
 977    destination_name = cast(str, destination.name)
 978
 979    return CloudDestinationDetails(
 980        destination_id=destination.destination_id,
 981        destination_name=destination_name,
 982        destination_url=destination.connector_url,
 983        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
 984    )
 985
 986
 987@mcp_tool(
 988    read_only=True,
 989    idempotent=True,
 990    open_world=True,
 991    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 992)
 993def describe_cloud_connection(
 994    ctx: Context,
 995    connection_id: Annotated[
 996        str,
 997        Field(description="The ID of the connection to describe."),
 998    ],
 999    *,
1000    workspace_id: Annotated[
1001        str | None,
1002        Field(
1003            description=WORKSPACE_ID_TIP_TEXT,
1004            default=None,
1005        ),
1006    ],
1007) -> CloudConnectionDetails:
1008    """Get detailed information about a specific deployed connection."""
1009    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1010    connection = workspace.get_connection(connection_id=connection_id)
1011
1012    return CloudConnectionDetails(
1013        connection_id=connection.connection_id,
1014        connection_name=cast(str, connection.name),
1015        connection_url=cast(str, connection.connection_url),
1016        source_id=connection.source_id,
1017        source_name=cast(str, connection.source.name),
1018        destination_id=connection.destination_id,
1019        destination_name=cast(str, connection.destination.name),
1020        selected_streams=connection.stream_names,
1021        table_prefix=connection.table_prefix,
1022    )
1023
1024
1025@mcp_tool(
1026    read_only=True,
1027    idempotent=True,
1028    open_world=True,
1029    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1030)
1031def get_cloud_sync_logs(
1032    ctx: Context,
1033    connection_id: Annotated[
1034        str,
1035        Field(description="The ID of the Airbyte Cloud connection."),
1036    ],
1037    job_id: Annotated[
1038        int | None,
1039        Field(description="Optional job ID. If not provided, the latest job will be used."),
1040    ] = None,
1041    attempt_number: Annotated[
1042        int | None,
1043        Field(
1044            description="Optional attempt number. If not provided, the latest attempt will be used."
1045        ),
1046    ] = None,
1047    *,
1048    workspace_id: Annotated[
1049        str | None,
1050        Field(
1051            description=WORKSPACE_ID_TIP_TEXT,
1052            default=None,
1053        ),
1054    ],
1055    max_lines: Annotated[
1056        int,
1057        Field(
1058            description=(
1059                "Maximum number of lines to return. "
1060                "Defaults to 4000 if not specified. "
1061                "If '0' is provided, no limit is applied."
1062            ),
1063            default=4000,
1064        ),
1065    ],
1066    from_tail: Annotated[
1067        bool | None,
1068        Field(
1069            description=(
1070                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1071                "Defaults to True if `line_offset` is not specified. "
1072                "Cannot combine `from_tail=True` with `line_offset`."
1073            ),
1074            default=None,
1075        ),
1076    ],
1077    line_offset: Annotated[
1078        int | None,
1079        Field(
1080            description=(
1081                "Number of lines to skip from the beginning of the logs. "
1082                "Cannot be combined with `from_tail=True`."
1083            ),
1084            default=None,
1085        ),
1086    ],
1087) -> LogReadResult:
1088    """Get the logs from a sync job attempt on Airbyte Cloud."""
1089    # Validate that line_offset and from_tail are not both set
1090    if line_offset is not None and from_tail:
1091        raise PyAirbyteInputError(
1092            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1093            context={"line_offset": line_offset, "from_tail": from_tail},
1094        )
1095
1096    if from_tail is None and line_offset is None:
1097        from_tail = True
1098    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1099    connection = workspace.get_connection(connection_id=connection_id)
1100
1101    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1102
1103    if not sync_result:
1104        raise AirbyteMissingResourceError(
1105            resource_type="sync job",
1106            resource_name_or_id=connection_id,
1107        )
1108
1109    attempts = sync_result.get_attempts()
1110
1111    if not attempts:
1112        raise AirbyteMissingResourceError(
1113            resource_type="sync attempt",
1114            resource_name_or_id=str(sync_result.job_id),
1115        )
1116
1117    if attempt_number is not None:
1118        target_attempt = None
1119        for attempt in attempts:
1120            if attempt.attempt_number == attempt_number:
1121                target_attempt = attempt
1122                break
1123
1124        if target_attempt is None:
1125            raise AirbyteMissingResourceError(
1126                resource_type="sync attempt",
1127                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1128            )
1129    else:
1130        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1131
1132    logs = target_attempt.get_full_log_text()
1133
1134    if not logs:
1135        # Return empty result with zero lines
1136        return LogReadResult(
1137            log_text=(
1138                f"[No logs available for job '{sync_result.job_id}', "
1139                f"attempt {target_attempt.attempt_number}.]"
1140            ),
1141            log_text_start_line=1,
1142            log_text_line_count=0,
1143            total_log_lines_available=0,
1144            job_id=sync_result.job_id,
1145            attempt_number=target_attempt.attempt_number,
1146        )
1147
1148    # Apply line limiting
1149    log_lines = logs.splitlines()
1150    total_lines = len(log_lines)
1151
1152    # Determine effective max_lines (0 means no limit)
1153    effective_max = total_lines if max_lines == 0 else max_lines
1154
1155    # Calculate start_index and slice based on from_tail or line_offset
1156    if from_tail:
1157        start_index = max(0, total_lines - effective_max)
1158        selected_lines = log_lines[start_index:][:effective_max]
1159    else:
1160        start_index = line_offset or 0
1161        selected_lines = log_lines[start_index : start_index + effective_max]
1162
1163    return LogReadResult(
1164        log_text="\n".join(selected_lines),
1165        log_text_start_line=start_index + 1,  # Convert to 1-based index
1166        log_text_line_count=len(selected_lines),
1167        total_log_lines_available=total_lines,
1168        job_id=sync_result.job_id,
1169        attempt_number=target_attempt.attempt_number,
1170    )
1171
1172
1173@mcp_tool(
1174    read_only=True,
1175    idempotent=True,
1176    open_world=True,
1177    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1178)
1179def list_deployed_cloud_connections(
1180    ctx: Context,
1181    *,
1182    workspace_id: Annotated[
1183        str | None,
1184        Field(
1185            description=WORKSPACE_ID_TIP_TEXT,
1186            default=None,
1187        ),
1188    ],
1189    name_contains: Annotated[
1190        str | None,
1191        Field(
1192            description="Optional case-insensitive substring to filter connections by name",
1193            default=None,
1194        ),
1195    ],
1196    max_items_limit: Annotated[
1197        int | None,
1198        Field(
1199            description="Optional maximum number of items to return (default: no limit)",
1200            default=None,
1201        ),
1202    ],
1203    with_connection_status: Annotated[
1204        bool | None,
1205        Field(
1206            description="If True, include status info for each connection's most recent sync job",
1207            default=False,
1208        ),
1209    ],
1210    failing_connections_only: Annotated[
1211        bool | None,
1212        Field(
1213            description="If True, only return connections with failed/cancelled last sync",
1214            default=False,
1215        ),
1216    ],
1217) -> list[CloudConnectionResult]:
1218    """List all deployed connections in the Airbyte Cloud workspace.
1219
1220    When with_connection_status is True, each connection result will include
1221    information about the most recent sync job status, skipping over any
1222    currently in-progress syncs to find the last completed job.
1223
1224    When failing_connections_only is True, only connections where the most
1225    recent completed sync job failed or was cancelled will be returned.
1226    This implicitly enables with_connection_status.
1227    """
1228    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1229    connections = workspace.list_connections()
1230
1231    # Filter by name if requested
1232    if name_contains:
1233        needle = name_contains.lower()
1234        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1235
1236    # If failing_connections_only is True, implicitly enable with_connection_status
1237    if failing_connections_only:
1238        with_connection_status = True
1239
1240    results: list[CloudConnectionResult] = []
1241
1242    for connection in connections:
1243        last_job_status: str | None = None
1244        last_job_id: int | None = None
1245        last_job_time: str | None = None
1246        currently_running_job_id: int | None = None
1247        currently_running_job_start_time: str | None = None
1248
1249        if with_connection_status:
1250            sync_logs = connection.get_previous_sync_logs(limit=5)
1251            last_completed_job_status = None  # Keep enum for comparison
1252
1253            for sync_result in sync_logs:
1254                job_status = sync_result.get_job_status()
1255
1256                if not sync_result.is_job_complete():
1257                    currently_running_job_id = sync_result.job_id
1258                    currently_running_job_start_time = sync_result.start_time.isoformat()
1259                    continue
1260
1261                last_completed_job_status = job_status
1262                last_job_status = str(job_status.value) if job_status else None
1263                last_job_id = sync_result.job_id
1264                last_job_time = sync_result.start_time.isoformat()
1265                break
1266
1267            if failing_connections_only and (
1268                last_completed_job_status is None
1269                or last_completed_job_status not in FAILED_STATUSES
1270            ):
1271                continue
1272
1273        results.append(
1274            CloudConnectionResult(
1275                id=connection.connection_id,
1276                name=cast(str, connection.name),
1277                url=cast(str, connection.connection_url),
1278                source_id=connection.source_id,
1279                destination_id=connection.destination_id,
1280                last_job_status=last_job_status,
1281                last_job_id=last_job_id,
1282                last_job_time=last_job_time,
1283                currently_running_job_id=currently_running_job_id,
1284                currently_running_job_start_time=currently_running_job_start_time,
1285            )
1286        )
1287
1288        if max_items_limit is not None and len(results) >= max_items_limit:
1289            break
1290
1291    return results
1292
1293
1294def _resolve_organization(
1295    organization_id: str | None,
1296    organization_name: str | None,
1297    *,
1298    api_root: str,
1299    client_id: SecretString | None,
1300    client_secret: SecretString | None,
1301    bearer_token: SecretString | None = None,
1302) -> CloudOrganization:
1303    """Resolve organization from either ID or exact name match.
1304
1305    Args:
1306        organization_id: The organization ID (if provided directly)
1307        organization_name: The organization name (exact match required)
1308        api_root: The API root URL
1309        client_id: OAuth client ID (optional if bearer_token is provided)
1310        client_secret: OAuth client secret (optional if bearer_token is provided)
1311        bearer_token: Bearer token for authentication (optional if client credentials provided)
1312
1313    Returns:
1314        A CloudOrganization object with credentials for lazy loading of billing info.
1315
1316    Raises:
1317        PyAirbyteInputError: If neither or both parameters are provided,
1318            or if no organization matches the exact name
1319        AirbyteMissingResourceError: If the organization is not found
1320    """
1321    if organization_id and organization_name:
1322        raise PyAirbyteInputError(
1323            message="Provide either 'organization_id' or 'organization_name', not both."
1324        )
1325    if not organization_id and not organization_name:
1326        raise PyAirbyteInputError(
1327            message="Either 'organization_id' or 'organization_name' must be provided."
1328        )
1329
1330    # Get all organizations for the user
1331    orgs = api_util.list_organizations_for_user(
1332        api_root=api_root,
1333        client_id=client_id,
1334        client_secret=client_secret,
1335        bearer_token=bearer_token,
1336    )
1337
1338    org_response: api_util.models.OrganizationResponse | None = None
1339
1340    if organization_id:
1341        # Find by ID
1342        matching_orgs = [org for org in orgs if org.organization_id == organization_id]
1343        if not matching_orgs:
1344            raise AirbyteMissingResourceError(
1345                resource_type="organization",
1346                context={
1347                    "organization_id": organization_id,
1348                    "message": f"No organization found with ID '{organization_id}' "
1349                    "for the current user.",
1350                },
1351            )
1352        org_response = matching_orgs[0]
1353    else:
1354        # Find by exact name match (case-sensitive)
1355        matching_orgs = [org for org in orgs if org.organization_name == organization_name]
1356
1357        if not matching_orgs:
1358            raise AirbyteMissingResourceError(
1359                resource_type="organization",
1360                context={
1361                    "organization_name": organization_name,
1362                    "message": f"No organization found with exact name '{organization_name}' "
1363                    "for the current user.",
1364                },
1365            )
1366
1367        if len(matching_orgs) > 1:
1368            raise PyAirbyteInputError(
1369                message=f"Multiple organizations found with name '{organization_name}'. "
1370                "Please use 'organization_id' instead to specify the exact organization."
1371            )
1372
1373        org_response = matching_orgs[0]
1374
1375    # Return a CloudOrganization with credentials for lazy loading of billing info
1376    return CloudOrganization(
1377        organization_id=org_response.organization_id,
1378        organization_name=org_response.organization_name,
1379        email=org_response.email,
1380        api_root=api_root,
1381        client_id=client_id,
1382        client_secret=client_secret,
1383        bearer_token=bearer_token,
1384    )
1385
1386
1387def _resolve_organization_id(
1388    organization_id: str | None,
1389    organization_name: str | None,
1390    *,
1391    api_root: str,
1392    client_id: SecretString | None,
1393    client_secret: SecretString | None,
1394    bearer_token: SecretString | None = None,
1395) -> str:
1396    """Resolve organization ID from either ID or exact name match.
1397
1398    This is a convenience wrapper around _resolve_organization that returns just the ID.
1399    """
1400    org = _resolve_organization(
1401        organization_id=organization_id,
1402        organization_name=organization_name,
1403        api_root=api_root,
1404        client_id=client_id,
1405        client_secret=client_secret,
1406        bearer_token=bearer_token,
1407    )
1408    return org.organization_id
1409
1410
1411@mcp_tool(
1412    read_only=True,
1413    idempotent=True,
1414    open_world=True,
1415    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1416)
1417def list_cloud_workspaces(
1418    ctx: Context,
1419    *,
1420    organization_id: Annotated[
1421        str | None,
1422        Field(
1423            description="Organization ID. Required if organization_name is not provided.",
1424            default=None,
1425        ),
1426    ],
1427    organization_name: Annotated[
1428        str | None,
1429        Field(
1430            description=(
1431                "Organization name (exact match). " "Required if organization_id is not provided."
1432            ),
1433            default=None,
1434        ),
1435    ],
1436    name_contains: Annotated[
1437        str | None,
1438        Field(
1439            description="Optional substring to filter workspaces by name (server-side filtering)",
1440            default=None,
1441        ),
1442    ],
1443    max_items_limit: Annotated[
1444        int | None,
1445        Field(
1446            description="Optional maximum number of items to return (default: no limit)",
1447            default=None,
1448        ),
1449    ],
1450) -> list[CloudWorkspaceResult]:
1451    """List all workspaces in a specific organization.
1452
1453    Requires either organization_id OR organization_name (exact match) to be provided.
1454    This tool will NOT list workspaces across all organizations - you must specify
1455    which organization to list workspaces from.
1456    """
1457    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1458    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1459    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1460    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1461
1462    resolved_org_id = _resolve_organization_id(
1463        organization_id=organization_id,
1464        organization_name=organization_name,
1465        api_root=api_url,
1466        client_id=SecretString(client_id) if client_id else None,
1467        client_secret=SecretString(client_secret) if client_secret else None,
1468        bearer_token=SecretString(bearer_token) if bearer_token else None,
1469    )
1470
1471    workspaces = api_util.list_workspaces_in_organization(
1472        organization_id=resolved_org_id,
1473        api_root=api_url,
1474        client_id=SecretString(client_id) if client_id else None,
1475        client_secret=SecretString(client_secret) if client_secret else None,
1476        bearer_token=SecretString(bearer_token) if bearer_token else None,
1477        name_contains=name_contains,
1478        max_items_limit=max_items_limit,
1479    )
1480
1481    return [
1482        CloudWorkspaceResult(
1483            workspace_id=ws.get("workspaceId", ""),
1484            workspace_name=ws.get("name", ""),
1485            organization_id=ws.get("organizationId", ""),
1486        )
1487        for ws in workspaces
1488    ]
1489
1490
1491@mcp_tool(
1492    read_only=True,
1493    idempotent=True,
1494    open_world=True,
1495    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1496)
1497def describe_cloud_organization(
1498    ctx: Context,
1499    *,
1500    organization_id: Annotated[
1501        str | None,
1502        Field(
1503            description="Organization ID. Required if organization_name is not provided.",
1504            default=None,
1505        ),
1506    ],
1507    organization_name: Annotated[
1508        str | None,
1509        Field(
1510            description=(
1511                "Organization name (exact match). " "Required if organization_id is not provided."
1512            ),
1513            default=None,
1514        ),
1515    ],
1516) -> CloudOrganizationResult:
1517    """Get details about a specific organization including billing status.
1518
1519    Requires either organization_id OR organization_name (exact match) to be provided.
1520    This tool is useful for looking up an organization's ID from its name, or vice versa.
1521    """
1522    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1523    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1524    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1525    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1526
1527    org = _resolve_organization(
1528        organization_id=organization_id,
1529        organization_name=organization_name,
1530        api_root=api_url,
1531        client_id=SecretString(client_id) if client_id else None,
1532        client_secret=SecretString(client_secret) if client_secret else None,
1533        bearer_token=SecretString(bearer_token) if bearer_token else None,
1534    )
1535
1536    # CloudOrganization has lazy loading of billing properties
1537    return CloudOrganizationResult(
1538        id=org.organization_id,
1539        name=org.organization_name,
1540        email=org.email,
1541        payment_status=org.payment_status,
1542        subscription_status=org.subscription_status,
1543        is_account_locked=org.is_account_locked,
1544    )
1545
1546
1547def _get_custom_source_definition_description(
1548    custom_source: CustomCloudSourceDefinition,
1549) -> str:
1550    return "\n".join(
1551        [
1552            f" - Custom Source Name: {custom_source.name}",
1553            f" - Definition ID: {custom_source.definition_id}",
1554            f" - Definition Version: {custom_source.version}",
1555            f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}",
1556            f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}",
1557        ]
1558    )
1559
1560
1561@mcp_tool(
1562    open_world=True,
1563    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1564)
1565def publish_custom_source_definition(
1566    ctx: Context,
1567    name: Annotated[
1568        str,
1569        Field(description="The name for the custom connector definition."),
1570    ],
1571    *,
1572    workspace_id: Annotated[
1573        str | None,
1574        Field(
1575            description=WORKSPACE_ID_TIP_TEXT,
1576            default=None,
1577        ),
1578    ],
1579    manifest_yaml: Annotated[
1580        str | Path | None,
1581        Field(
1582            description=(
1583                "The Low-code CDK manifest as a YAML string or file path. "
1584                "Required for YAML connectors."
1585            ),
1586            default=None,
1587        ),
1588    ] = None,
1589    unique: Annotated[
1590        bool,
1591        Field(
1592            description="Whether to require a unique name.",
1593            default=True,
1594        ),
1595    ] = True,
1596    pre_validate: Annotated[
1597        bool,
1598        Field(
1599            description="Whether to validate the manifest client-side before publishing.",
1600            default=True,
1601        ),
1602    ] = True,
1603    testing_values: Annotated[
1604        dict | str | None,
1605        Field(
1606            description=(
1607                "Optional testing configuration values for the Builder UI. "
1608                "Can be provided as a JSON object or JSON string. "
1609                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1610                "If provided, these values replace any existing testing values "
1611                "for the connector builder project, allowing immediate test read operations."
1612            ),
1613            default=None,
1614        ),
1615    ],
1616    testing_values_secret_name: Annotated[
1617        str | None,
1618        Field(
1619            description=(
1620                "Optional name of a secret containing testing configuration values "
1621                "in JSON or YAML format. The secret will be resolved by the MCP "
1622                "server and merged into testing_values, with secret values taking "
1623                "precedence. This lets the agent reference secrets without sending "
1624                "raw values as tool arguments."
1625            ),
1626            default=None,
1627        ),
1628    ],
1629) -> str:
1630    """Publish a custom YAML source connector definition to Airbyte Cloud.
1631
1632    Note: Only YAML (declarative) connectors are currently supported.
1633    Docker-based custom sources are not yet available.
1634    """
1635    processed_manifest = manifest_yaml
1636    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1637        processed_manifest = Path(manifest_yaml)
1638
1639    # Resolve testing values from inline config and/or secret
1640    testing_values_dict: dict[str, Any] | None = None
1641    if testing_values is not None or testing_values_secret_name is not None:
1642        testing_values_dict = (
1643            resolve_connector_config(
1644                config=testing_values,
1645                config_secret_name=testing_values_secret_name,
1646            )
1647            or None
1648        )
1649
1650    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1651    custom_source = workspace.publish_custom_source_definition(
1652        name=name,
1653        manifest_yaml=processed_manifest,
1654        unique=unique,
1655        pre_validate=pre_validate,
1656        testing_values=testing_values_dict,
1657    )
1658    register_guid_created_in_session(custom_source.definition_id)
1659    return (
1660        "Successfully published custom YAML source definition:\n"
1661        + _get_custom_source_definition_description(
1662            custom_source=custom_source,
1663        )
1664        + "\n"
1665    )
1666
1667
1668@mcp_tool(
1669    read_only=True,
1670    idempotent=True,
1671    open_world=True,
1672)
1673def list_custom_source_definitions(
1674    ctx: Context,
1675    *,
1676    workspace_id: Annotated[
1677        str | None,
1678        Field(
1679            description=WORKSPACE_ID_TIP_TEXT,
1680            default=None,
1681        ),
1682    ],
1683) -> list[dict[str, Any]]:
1684    """List custom YAML source definitions in the Airbyte Cloud workspace.
1685
1686    Note: Only YAML (declarative) connectors are currently supported.
1687    Docker-based custom sources are not yet available.
1688    """
1689    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1690    definitions = workspace.list_custom_source_definitions(
1691        definition_type="yaml",
1692    )
1693
1694    return [
1695        {
1696            "definition_id": d.definition_id,
1697            "name": d.name,
1698            "version": d.version,
1699            "connector_builder_project_url": d.connector_builder_project_url,
1700        }
1701        for d in definitions
1702    ]
1703
1704
1705@mcp_tool(
1706    read_only=True,
1707    idempotent=True,
1708    open_world=True,
1709)
1710def get_custom_source_definition(
1711    ctx: Context,
1712    definition_id: Annotated[
1713        str,
1714        Field(description="The ID of the custom source definition to retrieve."),
1715    ],
1716    *,
1717    workspace_id: Annotated[
1718        str | None,
1719        Field(
1720            description=WORKSPACE_ID_TIP_TEXT,
1721            default=None,
1722        ),
1723    ],
1724    include_draft: Annotated[
1725        bool,
1726        Field(
1727            description=(
1728                "Whether to include the Connector Builder draft manifest in the response. "
1729                "If True and a draft exists, the response will include 'has_draft' and "
1730                "'draft_manifest' fields. Defaults to False."
1731            ),
1732            default=False,
1733        ),
1734    ] = False,
1735) -> dict[str, Any]:
1736    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1737
1738    Returns the full definition details including the published manifest YAML content.
1739    Optionally includes the Connector Builder draft manifest (unpublished changes)
1740    when include_draft=True.
1741
1742    Note: Only YAML (declarative) connectors are currently supported.
1743    Docker-based custom sources are not yet available.
1744    """
1745    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1746    definition = workspace.get_custom_source_definition(
1747        definition_id=definition_id,
1748        definition_type="yaml",
1749    )
1750
1751    result: dict[str, Any] = {
1752        "definition_id": definition.definition_id,
1753        "name": definition.name,
1754        "version": definition.version,
1755        "connector_builder_project_id": definition.connector_builder_project_id,
1756        "connector_builder_project_url": definition.connector_builder_project_url,
1757        "manifest": definition.manifest,
1758    }
1759
1760    if include_draft:
1761        result["has_draft"] = definition.has_draft
1762        result["draft_manifest"] = definition.draft_manifest
1763
1764    return result
1765
1766
1767@mcp_tool(
1768    read_only=True,
1769    idempotent=True,
1770    open_world=True,
1771)
1772def get_connector_builder_draft_manifest(
1773    ctx: Context,
1774    definition_id: Annotated[
1775        str,
1776        Field(description="The ID of the custom source definition to retrieve the draft for."),
1777    ],
1778    *,
1779    workspace_id: Annotated[
1780        str | None,
1781        Field(
1782            description=WORKSPACE_ID_TIP_TEXT,
1783            default=None,
1784        ),
1785    ],
1786) -> dict[str, Any]:
1787    """Get the Connector Builder draft manifest for a custom source definition.
1788
1789    Returns the working draft manifest that has been saved in the Connector Builder UI
1790    but not yet published. This is useful for inspecting what a user is currently working
1791    on before they publish their changes.
1792
1793    If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None.
1794    The published manifest is always included for comparison.
1795    """
1796    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1797    definition = workspace.get_custom_source_definition(
1798        definition_id=definition_id,
1799        definition_type="yaml",
1800    )
1801
1802    return {
1803        "definition_id": definition.definition_id,
1804        "name": definition.name,
1805        "connector_builder_project_id": definition.connector_builder_project_id,
1806        "connector_builder_project_url": definition.connector_builder_project_url,
1807        "has_draft": definition.has_draft,
1808        "draft_manifest": definition.draft_manifest,
1809        "published_manifest": definition.manifest,
1810    }
1811
1812
1813@mcp_tool(
1814    destructive=True,
1815    open_world=True,
1816)
1817def update_custom_source_definition(
1818    ctx: Context,
1819    definition_id: Annotated[
1820        str,
1821        Field(description="The ID of the definition to update."),
1822    ],
1823    manifest_yaml: Annotated[
1824        str | Path | None,
1825        Field(
1826            description=(
1827                "New manifest as YAML string or file path. "
1828                "Optional; omit to update only testing values."
1829            ),
1830            default=None,
1831        ),
1832    ] = None,
1833    *,
1834    workspace_id: Annotated[
1835        str | None,
1836        Field(
1837            description=WORKSPACE_ID_TIP_TEXT,
1838            default=None,
1839        ),
1840    ],
1841    pre_validate: Annotated[
1842        bool,
1843        Field(
1844            description="Whether to validate the manifest client-side before updating.",
1845            default=True,
1846        ),
1847    ] = True,
1848    testing_values: Annotated[
1849        dict | str | None,
1850        Field(
1851            description=(
1852                "Optional testing configuration values for the Builder UI. "
1853                "Can be provided as a JSON object or JSON string. "
1854                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1855                "If provided, these values replace any existing testing values "
1856                "for the connector builder project. The entire testing values object "
1857                "is overwritten, so pass the full set of values you want to persist."
1858            ),
1859            default=None,
1860        ),
1861    ],
1862    testing_values_secret_name: Annotated[
1863        str | None,
1864        Field(
1865            description=(
1866                "Optional name of a secret containing testing configuration values "
1867                "in JSON or YAML format. The secret will be resolved by the MCP "
1868                "server and merged into testing_values, with secret values taking "
1869                "precedence. This lets the agent reference secrets without sending "
1870                "raw values as tool arguments."
1871            ),
1872            default=None,
1873        ),
1874    ],
1875) -> str:
1876    """Update a custom YAML source definition in Airbyte Cloud.
1877
1878    Updates the manifest and/or testing values for an existing custom source definition.
1879    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1880    """
1881    check_guid_created_in_session(definition_id)
1882
1883    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1884
1885    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1886        raise PyAirbyteInputError(
1887            message=(
1888                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1889                "must be provided to update a custom source definition."
1890            ),
1891            context={
1892                "definition_id": definition_id,
1893                "workspace_id": workspace.workspace_id,
1894            },
1895        )
1896
1897    processed_manifest: str | Path | None = manifest_yaml
1898    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1899        processed_manifest = Path(manifest_yaml)
1900
1901    # Resolve testing values from inline config and/or secret
1902    testing_values_dict: dict[str, Any] | None = None
1903    if testing_values is not None or testing_values_secret_name is not None:
1904        testing_values_dict = (
1905            resolve_connector_config(
1906                config=testing_values,
1907                config_secret_name=testing_values_secret_name,
1908            )
1909            or None
1910        )
1911
1912    definition = workspace.get_custom_source_definition(
1913        definition_id=definition_id,
1914        definition_type="yaml",
1915    )
1916    custom_source: CustomCloudSourceDefinition = definition
1917
1918    if processed_manifest is not None:
1919        custom_source = definition.update_definition(
1920            manifest_yaml=processed_manifest,
1921            pre_validate=pre_validate,
1922        )
1923
1924    if testing_values_dict is not None:
1925        custom_source.set_testing_values(testing_values_dict)
1926
1927    return (
1928        "Successfully updated custom YAML source definition:\n"
1929        + _get_custom_source_definition_description(
1930            custom_source=custom_source,
1931        )
1932    )
1933
1934
1935@mcp_tool(
1936    destructive=True,
1937    open_world=True,
1938)
1939def permanently_delete_custom_source_definition(
1940    ctx: Context,
1941    definition_id: Annotated[
1942        str,
1943        Field(description="The ID of the custom source definition to delete."),
1944    ],
1945    name: Annotated[
1946        str,
1947        Field(description="The expected name of the custom source definition (for verification)."),
1948    ],
1949    *,
1950    workspace_id: Annotated[
1951        str | None,
1952        Field(
1953            description=WORKSPACE_ID_TIP_TEXT,
1954            default=None,
1955        ),
1956    ],
1957) -> str:
1958    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1959
1960    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1961    (case insensitive).
1962
1963    If the connector does not meet this requirement, the deletion will be rejected with a
1964    helpful error message. Instruct the user to rename the connector appropriately to authorize
1965    the deletion.
1966
1967    The provided name must match the actual name of the definition for the operation to proceed.
1968    This is a safety measure to ensure you are deleting the correct resource.
1969
1970    Note: Only YAML (declarative) connectors are currently supported.
1971    Docker-based custom sources are not yet available.
1972    """
1973    check_guid_created_in_session(definition_id)
1974    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1975    definition = workspace.get_custom_source_definition(
1976        definition_id=definition_id,
1977        definition_type="yaml",
1978    )
1979    actual_name: str = definition.name
1980
1981    # Verify the name matches
1982    if actual_name != name:
1983        raise PyAirbyteInputError(
1984            message=(
1985                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1986                "The provided name must exactly match the definition's actual name. "
1987                "This is a safety measure to prevent accidental deletion."
1988            ),
1989            context={
1990                "definition_id": definition_id,
1991                "expected_name": name,
1992                "actual_name": actual_name,
1993            },
1994        )
1995
1996    definition.permanently_delete(
1997        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1998    )
1999    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
2000
2001
2002@mcp_tool(
2003    destructive=True,
2004    open_world=True,
2005    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2006)
2007def permanently_delete_cloud_source(
2008    ctx: Context,
2009    source_id: Annotated[
2010        str,
2011        Field(description="The ID of the deployed source to delete."),
2012    ],
2013    name: Annotated[
2014        str,
2015        Field(description="The expected name of the source (for verification)."),
2016    ],
2017) -> str:
2018    """Permanently delete a deployed source connector from Airbyte Cloud.
2019
2020    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
2021    (case insensitive).
2022
2023    If the source does not meet this requirement, the deletion will be rejected with a
2024    helpful error message. Instruct the user to rename the source appropriately to authorize
2025    the deletion.
2026
2027    The provided name must match the actual name of the source for the operation to proceed.
2028    This is a safety measure to ensure you are deleting the correct resource.
2029    """
2030    check_guid_created_in_session(source_id)
2031    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2032    source = workspace.get_source(source_id=source_id)
2033    actual_name: str = cast(str, source.name)
2034
2035    # Verify the name matches
2036    if actual_name != name:
2037        raise PyAirbyteInputError(
2038            message=(
2039                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2040                "The provided name must exactly match the source's actual name. "
2041                "This is a safety measure to prevent accidental deletion."
2042            ),
2043            context={
2044                "source_id": source_id,
2045                "expected_name": name,
2046                "actual_name": actual_name,
2047            },
2048        )
2049
2050    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2051    workspace.permanently_delete_source(
2052        source=source_id,
2053        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
2054    )
2055    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
2056
2057
2058@mcp_tool(
2059    destructive=True,
2060    open_world=True,
2061    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2062)
2063def permanently_delete_cloud_destination(
2064    ctx: Context,
2065    destination_id: Annotated[
2066        str,
2067        Field(description="The ID of the deployed destination to delete."),
2068    ],
2069    name: Annotated[
2070        str,
2071        Field(description="The expected name of the destination (for verification)."),
2072    ],
2073) -> str:
2074    """Permanently delete a deployed destination connector from Airbyte Cloud.
2075
2076    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
2077    (case insensitive).
2078
2079    If the destination does not meet this requirement, the deletion will be rejected with a
2080    helpful error message. Instruct the user to rename the destination appropriately to authorize
2081    the deletion.
2082
2083    The provided name must match the actual name of the destination for the operation to proceed.
2084    This is a safety measure to ensure you are deleting the correct resource.
2085    """
2086    check_guid_created_in_session(destination_id)
2087    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2088    destination = workspace.get_destination(destination_id=destination_id)
2089    actual_name: str = cast(str, destination.name)
2090
2091    # Verify the name matches
2092    if actual_name != name:
2093        raise PyAirbyteInputError(
2094            message=(
2095                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2096                "The provided name must exactly match the destination's actual name. "
2097                "This is a safety measure to prevent accidental deletion."
2098            ),
2099            context={
2100                "destination_id": destination_id,
2101                "expected_name": name,
2102                "actual_name": actual_name,
2103            },
2104        )
2105
2106    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2107    workspace.permanently_delete_destination(
2108        destination=destination_id,
2109        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2110    )
2111    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
2112
2113
2114@mcp_tool(
2115    destructive=True,
2116    open_world=True,
2117    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2118)
2119def permanently_delete_cloud_connection(
2120    ctx: Context,
2121    connection_id: Annotated[
2122        str,
2123        Field(description="The ID of the connection to delete."),
2124    ],
2125    name: Annotated[
2126        str,
2127        Field(description="The expected name of the connection (for verification)."),
2128    ],
2129    *,
2130    cascade_delete_source: Annotated[
2131        bool,
2132        Field(
2133            description=(
2134                "Whether to also delete the source connector associated with this connection."
2135            ),
2136            default=False,
2137        ),
2138    ] = False,
2139    cascade_delete_destination: Annotated[
2140        bool,
2141        Field(
2142            description=(
2143                "Whether to also delete the destination connector associated with this connection."
2144            ),
2145            default=False,
2146        ),
2147    ] = False,
2148) -> str:
2149    """Permanently delete a connection from Airbyte Cloud.
2150
2151    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2152    (case insensitive).
2153
2154    If the connection does not meet this requirement, the deletion will be rejected with a
2155    helpful error message. Instruct the user to rename the connection appropriately to authorize
2156    the deletion.
2157
2158    The provided name must match the actual name of the connection for the operation to proceed.
2159    This is a safety measure to ensure you are deleting the correct resource.
2160    """
2161    check_guid_created_in_session(connection_id)
2162    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2163    connection = workspace.get_connection(connection_id=connection_id)
2164    actual_name: str = cast(str, connection.name)
2165
2166    # Verify the name matches
2167    if actual_name != name:
2168        raise PyAirbyteInputError(
2169            message=(
2170                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2171                "The provided name must exactly match the connection's actual name. "
2172                "This is a safety measure to prevent accidental deletion."
2173            ),
2174            context={
2175                "connection_id": connection_id,
2176                "expected_name": name,
2177                "actual_name": actual_name,
2178            },
2179        )
2180
2181    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2182    workspace.permanently_delete_connection(
2183        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2184        connection=connection_id,
2185        cascade_delete_source=cascade_delete_source,
2186        cascade_delete_destination=cascade_delete_destination,
2187    )
2188    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
2189
2190
2191@mcp_tool(
2192    open_world=True,
2193    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2194)
2195def rename_cloud_source(
2196    ctx: Context,
2197    source_id: Annotated[
2198        str,
2199        Field(description="The ID of the deployed source to rename."),
2200    ],
2201    name: Annotated[
2202        str,
2203        Field(description="New name for the source."),
2204    ],
2205    *,
2206    workspace_id: Annotated[
2207        str | None,
2208        Field(
2209            description=WORKSPACE_ID_TIP_TEXT,
2210            default=None,
2211        ),
2212    ],
2213) -> str:
2214    """Rename a deployed source connector on Airbyte Cloud."""
2215    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2216    source = workspace.get_source(source_id=source_id)
2217    source.rename(name=name)
2218    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
2219
2220
2221@mcp_tool(
2222    destructive=True,
2223    open_world=True,
2224    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2225)
2226def update_cloud_source_config(
2227    ctx: Context,
2228    source_id: Annotated[
2229        str,
2230        Field(description="The ID of the deployed source to update."),
2231    ],
2232    config: Annotated[
2233        dict | str,
2234        Field(
2235            description="New configuration for the source connector.",
2236        ),
2237    ],
2238    config_secret_name: Annotated[
2239        str | None,
2240        Field(
2241            description="The name of the secret containing the configuration.",
2242            default=None,
2243        ),
2244    ] = None,
2245    *,
2246    workspace_id: Annotated[
2247        str | None,
2248        Field(
2249            description=WORKSPACE_ID_TIP_TEXT,
2250            default=None,
2251        ),
2252    ],
2253) -> str:
2254    """Update a deployed source connector's configuration on Airbyte Cloud.
2255
2256    This is a destructive operation that can break existing connections if the
2257    configuration is changed incorrectly. Use with caution.
2258    """
2259    check_guid_created_in_session(source_id)
2260    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2261    source = workspace.get_source(source_id=source_id)
2262
2263    config_dict = resolve_connector_config(
2264        config=config,
2265        config_secret_name=config_secret_name,
2266        config_spec_jsonschema=None,  # We don't have the spec here
2267    )
2268
2269    source.update_config(config=config_dict)
2270    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
2271
2272
2273@mcp_tool(
2274    open_world=True,
2275    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2276)
2277def rename_cloud_destination(
2278    ctx: Context,
2279    destination_id: Annotated[
2280        str,
2281        Field(description="The ID of the deployed destination to rename."),
2282    ],
2283    name: Annotated[
2284        str,
2285        Field(description="New name for the destination."),
2286    ],
2287    *,
2288    workspace_id: Annotated[
2289        str | None,
2290        Field(
2291            description=WORKSPACE_ID_TIP_TEXT,
2292            default=None,
2293        ),
2294    ],
2295) -> str:
2296    """Rename a deployed destination connector on Airbyte Cloud."""
2297    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2298    destination = workspace.get_destination(destination_id=destination_id)
2299    destination.rename(name=name)
2300    return (
2301        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2302        f"URL: {destination.connector_url}"
2303    )
2304
2305
2306@mcp_tool(
2307    destructive=True,
2308    open_world=True,
2309    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2310)
2311def update_cloud_destination_config(
2312    ctx: Context,
2313    destination_id: Annotated[
2314        str,
2315        Field(description="The ID of the deployed destination to update."),
2316    ],
2317    config: Annotated[
2318        dict | str,
2319        Field(
2320            description="New configuration for the destination connector.",
2321        ),
2322    ],
2323    config_secret_name: Annotated[
2324        str | None,
2325        Field(
2326            description="The name of the secret containing the configuration.",
2327            default=None,
2328        ),
2329    ],
2330    *,
2331    workspace_id: Annotated[
2332        str | None,
2333        Field(
2334            description=WORKSPACE_ID_TIP_TEXT,
2335            default=None,
2336        ),
2337    ],
2338) -> str:
2339    """Update a deployed destination connector's configuration on Airbyte Cloud.
2340
2341    This is a destructive operation that can break existing connections if the
2342    configuration is changed incorrectly. Use with caution.
2343    """
2344    check_guid_created_in_session(destination_id)
2345    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2346    destination = workspace.get_destination(destination_id=destination_id)
2347
2348    config_dict = resolve_connector_config(
2349        config=config,
2350        config_secret_name=config_secret_name,
2351        config_spec_jsonschema=None,  # We don't have the spec here
2352    )
2353
2354    destination.update_config(config=config_dict)
2355    return (
2356        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2357    )
2358
2359
2360@mcp_tool(
2361    open_world=True,
2362    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2363)
2364def rename_cloud_connection(
2365    ctx: Context,
2366    connection_id: Annotated[
2367        str,
2368        Field(description="The ID of the connection to rename."),
2369    ],
2370    name: Annotated[
2371        str,
2372        Field(description="New name for the connection."),
2373    ],
2374    *,
2375    workspace_id: Annotated[
2376        str | None,
2377        Field(
2378            description=WORKSPACE_ID_TIP_TEXT,
2379            default=None,
2380        ),
2381    ],
2382) -> str:
2383    """Rename a connection on Airbyte Cloud."""
2384    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2385    connection = workspace.get_connection(connection_id=connection_id)
2386    connection.rename(name=name)
2387    return (
2388        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2389        f"URL: {connection.connection_url}"
2390    )
2391
2392
2393@mcp_tool(
2394    destructive=True,
2395    open_world=True,
2396    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2397)
2398def set_cloud_connection_table_prefix(
2399    ctx: Context,
2400    connection_id: Annotated[
2401        str,
2402        Field(description="The ID of the connection to update."),
2403    ],
2404    prefix: Annotated[
2405        str,
2406        Field(description="New table prefix to use when syncing to the destination."),
2407    ],
2408    *,
2409    workspace_id: Annotated[
2410        str | None,
2411        Field(
2412            description=WORKSPACE_ID_TIP_TEXT,
2413            default=None,
2414        ),
2415    ],
2416) -> str:
2417    """Set the table prefix for a connection on Airbyte Cloud.
2418
2419    This is a destructive operation that can break downstream dependencies if the
2420    table prefix is changed incorrectly. Use with caution.
2421    """
2422    check_guid_created_in_session(connection_id)
2423    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2424    connection = workspace.get_connection(connection_id=connection_id)
2425    connection.set_table_prefix(prefix=prefix)
2426    return (
2427        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2428        f"URL: {connection.connection_url}"
2429    )
2430
2431
2432@mcp_tool(
2433    destructive=True,
2434    open_world=True,
2435    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2436)
2437def set_cloud_connection_selected_streams(
2438    ctx: Context,
2439    connection_id: Annotated[
2440        str,
2441        Field(description="The ID of the connection to update."),
2442    ],
2443    stream_names: Annotated[
2444        str | list[str],
2445        Field(
2446            description=(
2447                "The selected stream names to sync within the connection. "
2448                "Must be an explicit stream name or list of streams."
2449            )
2450        ),
2451    ],
2452    *,
2453    workspace_id: Annotated[
2454        str | None,
2455        Field(
2456            description=WORKSPACE_ID_TIP_TEXT,
2457            default=None,
2458        ),
2459    ],
2460) -> str:
2461    """Set the selected streams for a connection on Airbyte Cloud.
2462
2463    This is a destructive operation that can break existing connections if the
2464    stream selection is changed incorrectly. Use with caution.
2465    """
2466    check_guid_created_in_session(connection_id)
2467    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2468    connection = workspace.get_connection(connection_id=connection_id)
2469
2470    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2471    connection.set_selected_streams(stream_names=resolved_streams_list)
2472
2473    return (
2474        f"Successfully set selected streams for connection '{connection_id}' "
2475        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2476    )
2477
2478
2479@mcp_tool(
2480    open_world=True,
2481    destructive=True,
2482    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2483)
2484def update_cloud_connection(
2485    ctx: Context,
2486    connection_id: Annotated[
2487        str,
2488        Field(description="The ID of the connection to update."),
2489    ],
2490    *,
2491    enabled: Annotated[
2492        bool | None,
2493        Field(
2494            description=(
2495                "Set the connection's enabled status. "
2496                "True enables the connection (status='active'), "
2497                "False disables it (status='inactive'). "
2498                "Leave unset to keep the current status."
2499            ),
2500            default=None,
2501        ),
2502    ],
2503    cron_expression: Annotated[
2504        str | None,
2505        Field(
2506            description=(
2507                "A cron expression defining when syncs should run. "
2508                "Examples: '0 0 * * *' (daily at midnight UTC), "
2509                "'0 */6 * * *' (every 6 hours), "
2510                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2511                "Leave unset to keep the current schedule. "
2512                "Cannot be used together with 'manual_schedule'."
2513            ),
2514            default=None,
2515        ),
2516    ],
2517    manual_schedule: Annotated[
2518        bool | None,
2519        Field(
2520            description=(
2521                "Set to True to disable automatic syncs (manual scheduling only). "
2522                "Syncs will only run when manually triggered. "
2523                "Cannot be used together with 'cron_expression'."
2524            ),
2525            default=None,
2526        ),
2527    ],
2528    workspace_id: Annotated[
2529        str | None,
2530        Field(
2531            description=WORKSPACE_ID_TIP_TEXT,
2532            default=None,
2533        ),
2534    ],
2535) -> str:
2536    """Update a connection's settings on Airbyte Cloud.
2537
2538    This tool allows updating multiple connection settings in a single call:
2539    - Enable or disable the connection
2540    - Set a cron schedule for automatic syncs
2541    - Switch to manual scheduling (no automatic syncs)
2542
2543    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2544    parameters are mutually exclusive.
2545    """
2546    check_guid_created_in_session(connection_id)
2547
2548    # Validate that at least one setting is provided
2549    if enabled is None and cron_expression is None and manual_schedule is None:
2550        raise ValueError(
2551            "At least one setting must be provided: 'enabled', 'cron_expression', "
2552            "or 'manual_schedule'."
2553        )
2554
2555    # Validate mutually exclusive schedule options
2556    if cron_expression is not None and manual_schedule is True:
2557        raise ValueError(
2558            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2559            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2560            "for manual-only syncs."
2561        )
2562
2563    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2564    connection = workspace.get_connection(connection_id=connection_id)
2565
2566    changes_made: list[str] = []
2567
2568    # Apply enabled status change
2569    if enabled is not None:
2570        connection.set_enabled(enabled=enabled)
2571        status_str = "enabled" if enabled else "disabled"
2572        changes_made.append(f"status set to '{status_str}'")
2573
2574    # Apply schedule change
2575    if cron_expression is not None:
2576        connection.set_schedule(cron_expression=cron_expression)
2577        changes_made.append(f"schedule set to '{cron_expression}'")
2578    elif manual_schedule is True:
2579        connection.set_manual_schedule()
2580        changes_made.append("schedule set to 'manual'")
2581
2582    changes_summary = ", ".join(changes_made)
2583    return (
2584        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2585        f"URL: {connection.connection_url}"
2586    )
2587
2588
2589@mcp_tool(
2590    read_only=True,
2591    idempotent=True,
2592    open_world=True,
2593    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2594)
2595def get_connection_artifact(
2596    ctx: Context,
2597    connection_id: Annotated[
2598        str,
2599        Field(description="The ID of the Airbyte Cloud connection."),
2600    ],
2601    artifact_type: Annotated[
2602        Literal["state", "catalog"],
2603        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2604    ],
2605    *,
2606    workspace_id: Annotated[
2607        str | None,
2608        Field(
2609            description=WORKSPACE_ID_TIP_TEXT,
2610            default=None,
2611        ),
2612    ],
2613) -> dict[str, Any]:
2614    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2615
2616    Retrieves the specified artifact for a connection:
2617    - 'state': Returns the full raw connection state including stateType and all
2618      state data, or {"ERROR": "..."} if no state is set.
2619    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2620      or {"ERROR": "..."} if not found.
2621    """
2622    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2623    connection = workspace.get_connection(connection_id=connection_id)
2624
2625    if artifact_type == "state":
2626        result = connection.dump_raw_state()
2627        if result.get("stateType") == "not_set":
2628            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2629        return result
2630
2631    # artifact_type == "catalog"
2632    result = connection.dump_raw_catalog()
2633    if result is None:
2634        return {"ERROR": "No catalog found for this connection"}
2635    return result
2636
2637
2638def _add_defaults_for_exclude_args(
2639    exclude_args: list[str],
2640) -> None:
2641    """Patch registered tool functions to add Python-level defaults for excluded args.
2642
2643    FastMCP requires that excluded args have Python-level default values, but MCP tool
2644    functions should only use Field(default=...) in their Annotated type hints (not
2645    Python-level `= None`). This function bridges the gap by dynamically adding Python
2646    defaults to the function signatures at registration time, so the source code stays
2647    clean while satisfying FastMCP's requirement.
2648
2649    Args:
2650        exclude_args: List of argument names that will be excluded from the tool schema.
2651    """
2652    import inspect  # noqa: PLC0415  # Local import for optional patching logic
2653
2654    from fastmcp_extensions.decorators import (  # noqa: PLC0415
2655        _REGISTERED_TOOLS,  # noqa: PLC2701
2656    )
2657
2658    for func, _annotations in _REGISTERED_TOOLS:
2659        sig = inspect.signature(func)
2660        needs_patch = any(
2661            arg_name in sig.parameters
2662            and sig.parameters[arg_name].default is inspect.Parameter.empty
2663            for arg_name in exclude_args
2664        )
2665        if needs_patch:
2666            new_params = [
2667                p.replace(default=None)
2668                if name in exclude_args and p.default is inspect.Parameter.empty
2669                else p
2670                for name, p in sig.parameters.items()
2671            ]
2672            func.__signature__ = sig.replace(parameters=new_params)  # type: ignore[attr-defined]
2673
2674
2675def register_cloud_tools(app: FastMCP) -> None:
2676    """Register cloud tools with the FastMCP app.
2677
2678    Args:
2679        app: FastMCP application instance
2680    """
2681    exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None
2682    if exclude_args:
2683        _add_defaults_for_exclude_args(exclude_args)
2684    register_mcp_tools(
2685        app,
2686        mcp_module=__name__,
2687        exclude_args=exclude_args,
2688    )
CLOUD_AUTH_TIP_TEXT = 'By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables will be used to authenticate with the Airbyte Cloud API.'
WORKSPACE_ID_TIP_TEXT = 'Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.'
class CloudSourceResult(pydantic.main.BaseModel):
44class CloudSourceResult(BaseModel):
45    """Information about a deployed source connector in Airbyte Cloud."""
46
47    id: str
48    """The source ID."""
49    name: str
50    """Display name of the source."""
51    url: str
52    """Web URL for managing this source in Airbyte Cloud."""

Information about a deployed source connector in Airbyte Cloud.

id: str = PydanticUndefined

The source ID.

name: str = PydanticUndefined

Display name of the source.

url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

class CloudDestinationResult(pydantic.main.BaseModel):
55class CloudDestinationResult(BaseModel):
56    """Information about a deployed destination connector in Airbyte Cloud."""
57
58    id: str
59    """The destination ID."""
60    name: str
61    """Display name of the destination."""
62    url: str
63    """Web URL for managing this destination in Airbyte Cloud."""

Information about a deployed destination connector in Airbyte Cloud.

id: str = PydanticUndefined

The destination ID.

name: str = PydanticUndefined

Display name of the destination.

url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

class CloudConnectionResult(pydantic.main.BaseModel):
66class CloudConnectionResult(BaseModel):
67    """Information about a deployed connection in Airbyte Cloud."""
68
69    id: str
70    """The connection ID."""
71    name: str
72    """Display name of the connection."""
73    url: str
74    """Web URL for managing this connection in Airbyte Cloud."""
75    source_id: str
76    """ID of the source used by this connection."""
77    destination_id: str
78    """ID of the destination used by this connection."""
79    last_job_status: str | None = None
80    """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled').
81    Only populated when with_connection_status=True."""
82    last_job_id: int | None = None
83    """Job ID of the most recent completed sync. Only populated when with_connection_status=True."""
84    last_job_time: str | None = None
85    """ISO 8601 timestamp of the most recent completed sync.
86    Only populated when with_connection_status=True."""
87    currently_running_job_id: int | None = None
88    """Job ID of a currently running sync, if any.
89    Only populated when with_connection_status=True."""
90    currently_running_job_start_time: str | None = None
91    """ISO 8601 timestamp of when the currently running sync started.
92    Only populated when with_connection_status=True."""

Information about a deployed connection in Airbyte Cloud.

id: str = PydanticUndefined

The connection ID.

name: str = PydanticUndefined

Display name of the connection.

url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

last_job_status: str | None = None

Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.

last_job_id: int | None = None

Job ID of the most recent completed sync. Only populated when with_connection_status=True.

last_job_time: str | None = None

ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.

currently_running_job_id: int | None = None

Job ID of a currently running sync, if any. Only populated when with_connection_status=True.

currently_running_job_start_time: str | None = None

ISO 8601 timestamp of when the currently running sync started. Only populated when with_connection_status=True.

class CloudSourceDetails(pydantic.main.BaseModel):
 95class CloudSourceDetails(BaseModel):
 96    """Detailed information about a deployed source connector in Airbyte Cloud."""
 97
 98    source_id: str
 99    """The source ID."""
100    source_name: str
101    """Display name of the source."""
102    source_url: str
103    """Web URL for managing this source in Airbyte Cloud."""
104    connector_definition_id: str
105    """The connector definition ID (e.g., the ID for 'source-postgres')."""

Detailed information about a deployed source connector in Airbyte Cloud.

source_id: str = PydanticUndefined

The source ID.

source_name: str = PydanticUndefined

Display name of the source.

source_url: str = PydanticUndefined

Web URL for managing this source in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'source-postgres').

class CloudDestinationDetails(pydantic.main.BaseModel):
108class CloudDestinationDetails(BaseModel):
109    """Detailed information about a deployed destination connector in Airbyte Cloud."""
110
111    destination_id: str
112    """The destination ID."""
113    destination_name: str
114    """Display name of the destination."""
115    destination_url: str
116    """Web URL for managing this destination in Airbyte Cloud."""
117    connector_definition_id: str
118    """The connector definition ID (e.g., the ID for 'destination-snowflake')."""

Detailed information about a deployed destination connector in Airbyte Cloud.

destination_id: str = PydanticUndefined

The destination ID.

destination_name: str = PydanticUndefined

Display name of the destination.

destination_url: str = PydanticUndefined

Web URL for managing this destination in Airbyte Cloud.

connector_definition_id: str = PydanticUndefined

The connector definition ID (e.g., the ID for 'destination-snowflake').

class CloudConnectionDetails(pydantic.main.BaseModel):
121class CloudConnectionDetails(BaseModel):
122    """Detailed information about a deployed connection in Airbyte Cloud."""
123
124    connection_id: str
125    """The connection ID."""
126    connection_name: str
127    """Display name of the connection."""
128    connection_url: str
129    """Web URL for managing this connection in Airbyte Cloud."""
130    source_id: str
131    """ID of the source used by this connection."""
132    source_name: str
133    """Display name of the source."""
134    destination_id: str
135    """ID of the destination used by this connection."""
136    destination_name: str
137    """Display name of the destination."""
138    selected_streams: list[str]
139    """List of stream names selected for syncing."""
140    table_prefix: str | None
141    """Table prefix applied when syncing to the destination."""

Detailed information about a deployed connection in Airbyte Cloud.

connection_id: str = PydanticUndefined

The connection ID.

connection_name: str = PydanticUndefined

Display name of the connection.

connection_url: str = PydanticUndefined

Web URL for managing this connection in Airbyte Cloud.

source_id: str = PydanticUndefined

ID of the source used by this connection.

source_name: str = PydanticUndefined

Display name of the source.

destination_id: str = PydanticUndefined

ID of the destination used by this connection.

destination_name: str = PydanticUndefined

Display name of the destination.

selected_streams: list[str] = PydanticUndefined

List of stream names selected for syncing.

table_prefix: str | None = PydanticUndefined

Table prefix applied when syncing to the destination.

class CloudOrganizationResult(pydantic.main.BaseModel):
144class CloudOrganizationResult(BaseModel):
145    """Information about an organization in Airbyte Cloud."""
146
147    id: str
148    """The organization ID."""
149    name: str
150    """Display name of the organization."""
151    email: str
152    """Email associated with the organization."""
153    payment_status: str | None = None
154    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
155    When 'disabled', syncs are blocked due to unpaid invoices."""
156    subscription_status: str | None = None
157    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
158    'unsubscribed')."""
159    is_account_locked: bool = False
160    """Whether the account is locked due to billing issues.
161    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
162    Defaults to False unless we have affirmative evidence of a locked state."""

Information about an organization in Airbyte Cloud.

id: str = PydanticUndefined

The organization ID.

name: str = PydanticUndefined

Display name of the organization.

email: str = PydanticUndefined

Email associated with the organization.

payment_status: str | None = None

Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices.

subscription_status: str | None = None

Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 'unsubscribed').

is_account_locked: bool = False

Whether the account is locked due to billing issues. True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.

class CloudWorkspaceResult(pydantic.main.BaseModel):
165class CloudWorkspaceResult(BaseModel):
166    """Information about a workspace in Airbyte Cloud."""
167
168    workspace_id: str
169    """The workspace ID."""
170    workspace_name: str
171    """Display name of the workspace."""
172    workspace_url: str | None = None
173    """URL to access the workspace in Airbyte Cloud."""
174    organization_id: str
175    """ID of the organization (requires ORGANIZATION_READER permission)."""
176    organization_name: str | None = None
177    """Name of the organization (requires ORGANIZATION_READER permission)."""
178    payment_status: str | None = None
179    """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked').
180    When 'disabled', syncs are blocked due to unpaid invoices.
181    Requires ORGANIZATION_READER permission."""
182    subscription_status: str | None = None
183    """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed',
184    'unsubscribed'). Requires ORGANIZATION_READER permission."""
185    is_account_locked: bool = False
186    """Whether the account is locked due to billing issues.
187    True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'.
188    Defaults to False unless we have affirmative evidence of a locked state.
189    Requires ORGANIZATION_READER permission."""

Information about a workspace in Airbyte Cloud.

workspace_id: str = PydanticUndefined

The workspace ID.

workspace_name: str = PydanticUndefined

Display name of the workspace.

workspace_url: str | None = None

URL to access the workspace in Airbyte Cloud.

organization_id: str = PydanticUndefined

ID of the organization (requires ORGANIZATION_READER permission).

organization_name: str | None = None

Name of the organization (requires ORGANIZATION_READER permission).

payment_status: str | None = None

Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices. Requires ORGANIZATION_READER permission.

subscription_status: str | None = None

Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 'unsubscribed'). Requires ORGANIZATION_READER permission.

is_account_locked: bool = False

Whether the account is locked due to billing issues. True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state. Requires ORGANIZATION_READER permission.

class LogReadResult(pydantic.main.BaseModel):
192class LogReadResult(BaseModel):
193    """Result of reading sync logs with pagination support."""
194
195    job_id: int
196    """The job ID the logs belong to."""
197    attempt_number: int
198    """The attempt number the logs belong to."""
199    log_text: str
200    """The string containing the log text we are returning."""
201    log_text_start_line: int
202    """1-based line index of the first line returned."""
203    log_text_line_count: int
204    """Count of lines we are returning."""
205    total_log_lines_available: int
206    """Total number of log lines available, shows if any lines were missed due to the limit."""

Result of reading sync logs with pagination support.

job_id: int = PydanticUndefined

The job ID the logs belong to.

attempt_number: int = PydanticUndefined

The attempt number the logs belong to.

log_text: str = PydanticUndefined

The string containing the log text we are returning.

log_text_start_line: int = PydanticUndefined

1-based line index of the first line returned.

log_text_line_count: int = PydanticUndefined

Count of lines we are returning.

total_log_lines_available: int = PydanticUndefined

Total number of log lines available, shows if any lines were missed due to the limit.

class SyncJobResult(pydantic.main.BaseModel):
209class SyncJobResult(BaseModel):
210    """Information about a sync job."""
211
212    job_id: int
213    """The job ID."""
214    status: str
215    """The job status (e.g., 'succeeded', 'failed', 'running', 'pending')."""
216    bytes_synced: int
217    """Number of bytes synced in this job."""
218    records_synced: int
219    """Number of records synced in this job."""
220    start_time: str
221    """ISO 8601 timestamp of when the job started."""
222    job_url: str
223    """URL to view the job in Airbyte Cloud."""

Information about a sync job.

job_id: int = PydanticUndefined

The job ID.

status: str = PydanticUndefined

The job status (e.g., 'succeeded', 'failed', 'running', 'pending').

bytes_synced: int = PydanticUndefined

Number of bytes synced in this job.

records_synced: int = PydanticUndefined

Number of records synced in this job.

start_time: str = PydanticUndefined

ISO 8601 timestamp of when the job started.

job_url: str = PydanticUndefined

URL to view the job in Airbyte Cloud.

class SyncJobListResult(pydantic.main.BaseModel):
226class SyncJobListResult(BaseModel):
227    """Result of listing sync jobs with pagination support."""
228
229    jobs: list[SyncJobResult]
230    """List of sync jobs."""
231    jobs_count: int
232    """Number of jobs returned in this response."""
233    jobs_offset: int
234    """Offset used for this request (0 if not specified)."""
235    from_tail: bool
236    """Whether jobs are ordered newest-first (True) or oldest-first (False)."""

Result of listing sync jobs with pagination support.

jobs: list[SyncJobResult] = PydanticUndefined

List of sync jobs.

jobs_count: int = PydanticUndefined

Number of jobs returned in this response.

jobs_offset: int = PydanticUndefined

Offset used for this request (0 if not specified).

from_tail: bool = PydanticUndefined

Whether jobs are ordered newest-first (True) or oldest-first (False).

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_source_to_cloud( ctx: fastmcp.server.context.Context, source_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the source.')], source_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the source connector (e.g., 'source-faker').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
274@mcp_tool(
275    open_world=True,
276    extra_help_text=CLOUD_AUTH_TIP_TEXT,
277)
278def deploy_source_to_cloud(
279    ctx: Context,
280    source_name: Annotated[
281        str,
282        Field(description="The name to use when deploying the source."),
283    ],
284    source_connector_name: Annotated[
285        str,
286        Field(description="The name of the source connector (e.g., 'source-faker')."),
287    ],
288    *,
289    workspace_id: Annotated[
290        str | None,
291        Field(
292            description=WORKSPACE_ID_TIP_TEXT,
293            default=None,
294        ),
295    ],
296    config: Annotated[
297        dict | str | None,
298        Field(
299            description="The configuration for the source connector.",
300            default=None,
301        ),
302    ],
303    config_secret_name: Annotated[
304        str | None,
305        Field(
306            description="The name of the secret containing the configuration.",
307            default=None,
308        ),
309    ],
310    unique: Annotated[
311        bool,
312        Field(
313            description="Whether to require a unique name.",
314            default=True,
315        ),
316    ],
317) -> str:
318    """Deploy a source connector to Airbyte Cloud."""
319    source = get_source(
320        source_connector_name,
321        no_executor=True,
322    )
323    config_dict = resolve_connector_config(
324        config=config,
325        config_secret_name=config_secret_name,
326        config_spec_jsonschema=source.config_spec,
327    )
328    source.set_config(config_dict, validate=True)
329
330    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
331    deployed_source = workspace.deploy_source(
332        name=source_name,
333        source=source,
334        unique=unique,
335    )
336
337    register_guid_created_in_session(deployed_source.connector_id)
338    return (
339        f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'"
340        f" and URL: {deployed_source.connector_url}"
341    )

Deploy a source connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_destination_to_cloud( ctx: fastmcp.server.context.Context, destination_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name to use when deploying the destination.')], destination_connector_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description="The name of the destination connector (e.g., 'destination-postgres').")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], config: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')]) -> str:
344@mcp_tool(
345    open_world=True,
346    extra_help_text=CLOUD_AUTH_TIP_TEXT,
347)
348def deploy_destination_to_cloud(
349    ctx: Context,
350    destination_name: Annotated[
351        str,
352        Field(description="The name to use when deploying the destination."),
353    ],
354    destination_connector_name: Annotated[
355        str,
356        Field(description="The name of the destination connector (e.g., 'destination-postgres')."),
357    ],
358    *,
359    workspace_id: Annotated[
360        str | None,
361        Field(
362            description=WORKSPACE_ID_TIP_TEXT,
363            default=None,
364        ),
365    ],
366    config: Annotated[
367        dict | str | None,
368        Field(
369            description="The configuration for the destination connector.",
370            default=None,
371        ),
372    ],
373    config_secret_name: Annotated[
374        str | None,
375        Field(
376            description="The name of the secret containing the configuration.",
377            default=None,
378        ),
379    ],
380    unique: Annotated[
381        bool,
382        Field(
383            description="Whether to require a unique name.",
384            default=True,
385        ),
386    ],
387) -> str:
388    """Deploy a destination connector to Airbyte Cloud."""
389    destination = get_destination(
390        destination_connector_name,
391        no_executor=True,
392    )
393    config_dict = resolve_connector_config(
394        config=config,
395        config_secret_name=config_secret_name,
396        config_spec_jsonschema=destination.config_spec,
397    )
398    destination.set_config(config_dict, validate=True)
399
400    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
401    deployed_destination = workspace.deploy_destination(
402        name=destination_name,
403        destination=destination,
404        unique=unique,
405    )
406
407    register_guid_created_in_session(deployed_destination.connector_id)
408    return (
409        f"Successfully deployed destination '{destination_name}' "
410        f"with ID: {deployed_destination.connector_id}"
411    )

Deploy a destination connector to Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def create_connection_on_cloud( ctx: fastmcp.server.context.Context, connection_name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name of the connection.')], source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source.')], destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination.')], selected_streams: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description="The selected stream names to sync within the connection. Must be an explicit stream name or list of streams. Cannot be empty or '*'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], table_prefix: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional table prefix to use when syncing to the destination.')]) -> str:
414@mcp_tool(
415    open_world=True,
416    extra_help_text=CLOUD_AUTH_TIP_TEXT,
417)
418def create_connection_on_cloud(
419    ctx: Context,
420    connection_name: Annotated[
421        str,
422        Field(description="The name of the connection."),
423    ],
424    source_id: Annotated[
425        str,
426        Field(description="The ID of the deployed source."),
427    ],
428    destination_id: Annotated[
429        str,
430        Field(description="The ID of the deployed destination."),
431    ],
432    selected_streams: Annotated[
433        str | list[str],
434        Field(
435            description=(
436                "The selected stream names to sync within the connection. "
437                "Must be an explicit stream name or list of streams. "
438                "Cannot be empty or '*'."
439            )
440        ),
441    ],
442    *,
443    workspace_id: Annotated[
444        str | None,
445        Field(
446            description=WORKSPACE_ID_TIP_TEXT,
447            default=None,
448        ),
449    ],
450    table_prefix: Annotated[
451        str | None,
452        Field(
453            description="Optional table prefix to use when syncing to the destination.",
454            default=None,
455        ),
456    ],
457) -> str:
458    """Create a connection between a deployed source and destination on Airbyte Cloud."""
459    resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
460    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
461    deployed_connection = workspace.deploy_connection(
462        connection_name=connection_name,
463        source=source_id,
464        destination=destination_id,
465        selected_streams=resolved_streams_list,
466        table_prefix=table_prefix,
467    )
468
469    register_guid_created_in_session(deployed_connection.connection_id)
470    return (
471        f"Successfully created connection '{connection_name}' "
472        f"with ID '{deployed_connection.connection_id}' and "
473        f"URL: {deployed_connection.connection_url}"
474    )

Create a connection between a deployed source and destination on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def run_cloud_sync( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], wait: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to wait for the sync to complete. Since a sync can take between several minutes and several hours, this option is not recommended for most scenarios.')], wait_timeout: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=300, description='Maximum time to wait for sync completion (seconds).')]) -> str:
477@mcp_tool(
478    open_world=True,
479    extra_help_text=CLOUD_AUTH_TIP_TEXT,
480)
481def run_cloud_sync(
482    ctx: Context,
483    connection_id: Annotated[
484        str,
485        Field(description="The ID of the Airbyte Cloud connection."),
486    ],
487    *,
488    workspace_id: Annotated[
489        str | None,
490        Field(
491            description=WORKSPACE_ID_TIP_TEXT,
492            default=None,
493        ),
494    ],
495    wait: Annotated[
496        bool,
497        Field(
498            description=(
499                "Whether to wait for the sync to complete. Since a sync can take between several "
500                "minutes and several hours, this option is not recommended for most "
501                "scenarios."
502            ),
503            default=False,
504        ),
505    ],
506    wait_timeout: Annotated[
507        int,
508        Field(
509            description="Maximum time to wait for sync completion (seconds).",
510            default=300,
511        ),
512    ],
513) -> str:
514    """Run a sync job on Airbyte Cloud."""
515    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
516    connection = workspace.get_connection(connection_id=connection_id)
517    sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout)
518
519    if wait:
520        status = sync_result.get_job_status()
521        return (
522            f"Sync completed with status: {status}. "
523            f"Job ID is '{sync_result.job_id}' and "
524            f"job URL is: {sync_result.job_url}"
525        )
526    return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"

Run a sync job on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def check_airbyte_cloud_workspace( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudWorkspaceResult:
529@mcp_tool(
530    read_only=True,
531    idempotent=True,
532    open_world=True,
533    extra_help_text=CLOUD_AUTH_TIP_TEXT,
534)
535def check_airbyte_cloud_workspace(
536    ctx: Context,
537    *,
538    workspace_id: Annotated[
539        str | None,
540        Field(
541            description=WORKSPACE_ID_TIP_TEXT,
542            default=None,
543        ),
544    ],
545) -> CloudWorkspaceResult:
546    """Check if we have a valid Airbyte Cloud connection and return workspace info.
547
548    Returns workspace details including workspace ID, name, organization info, and billing status.
549    """
550    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
551
552    # Get workspace details from the public API using workspace's credentials
553    workspace_response = api_util.get_workspace(
554        workspace_id=workspace.workspace_id,
555        api_root=workspace.api_root,
556        client_id=workspace.client_id,
557        client_secret=workspace.client_secret,
558        bearer_token=workspace.bearer_token,
559    )
560
561    # Try to get organization info (including billing), but fail gracefully if we don't have
562    # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the
563    # organization, which may not be available with workspace-scoped credentials.
564    organization = workspace.get_organization(raise_on_error=False)
565
566    return CloudWorkspaceResult(
567        workspace_id=workspace_response.workspace_id,
568        workspace_name=workspace_response.name,
569        workspace_url=workspace.workspace_url,
570        organization_id=(
571            organization.organization_id
572            if organization
573            else "[unavailable - requires ORGANIZATION_READER permission]"
574        ),
575        organization_name=organization.organization_name if organization else None,
576        payment_status=organization.payment_status if organization else None,
577        subscription_status=organization.subscription_status if organization else None,
578        is_account_locked=organization.is_account_locked if organization else False,
579    )

Check if we have a valid Airbyte Cloud connection and return workspace info.

Returns workspace details including workspace ID, name, organization info, and billing status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def deploy_noop_destination_to_cloud( ctx: fastmcp.server.context.Context, name: str = 'No-op Destination', *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], unique: bool = True) -> str:
582@mcp_tool(
583    open_world=True,
584    extra_help_text=CLOUD_AUTH_TIP_TEXT,
585)
586def deploy_noop_destination_to_cloud(
587    ctx: Context,
588    name: str = "No-op Destination",
589    *,
590    workspace_id: Annotated[
591        str | None,
592        Field(
593            description=WORKSPACE_ID_TIP_TEXT,
594            default=None,
595        ),
596    ],
597    unique: bool = True,
598) -> str:
599    """Deploy the No-op destination to Airbyte Cloud for testing purposes."""
600    destination = get_noop_destination()
601    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
602    deployed_destination = workspace.deploy_destination(
603        name=name,
604        destination=destination,
605        unique=unique,
606    )
607    register_guid_created_in_session(deployed_destination.connector_id)
608    return (
609        f"Successfully deployed No-op Destination "
610        f"with ID '{deployed_destination.connector_id}' and "
611        f"URL: {deployed_destination.connector_url}"
612    )

Deploy the No-op destination to Airbyte Cloud for testing purposes.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_status( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional job ID. If not provided, the latest job will be used.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], include_attempts: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to include detailed attempts information.')]) -> dict[str, typing.Any]:
615@mcp_tool(
616    read_only=True,
617    idempotent=True,
618    open_world=True,
619    extra_help_text=CLOUD_AUTH_TIP_TEXT,
620)
621def get_cloud_sync_status(
622    ctx: Context,
623    connection_id: Annotated[
624        str,
625        Field(
626            description="The ID of the Airbyte Cloud connection.",
627        ),
628    ],
629    job_id: Annotated[
630        int | None,
631        Field(
632            description="Optional job ID. If not provided, the latest job will be used.",
633            default=None,
634        ),
635    ],
636    *,
637    workspace_id: Annotated[
638        str | None,
639        Field(
640            description=WORKSPACE_ID_TIP_TEXT,
641            default=None,
642        ),
643    ],
644    include_attempts: Annotated[
645        bool,
646        Field(
647            description="Whether to include detailed attempts information.",
648            default=False,
649        ),
650    ],
651) -> dict[str, Any]:
652    """Get the status of a sync job from the Airbyte Cloud."""
653    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
654    connection = workspace.get_connection(connection_id=connection_id)
655
656    # If a job ID is provided, get the job by ID.
657    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
658
659    if not sync_result:
660        return {"status": None, "job_id": None, "attempts": []}
661
662    result = {
663        "status": sync_result.get_job_status(),
664        "job_id": sync_result.job_id,
665        "bytes_synced": sync_result.bytes_synced,
666        "records_synced": sync_result.records_synced,
667        "start_time": sync_result.start_time.isoformat(),
668        "job_url": sync_result.job_url,
669        "attempts": [],
670    }
671
672    if include_attempts:
673        attempts = sync_result.get_attempts()
674        result["attempts"] = [
675            {
676                "attempt_number": attempt.attempt_number,
677                "attempt_id": attempt.attempt_id,
678                "status": attempt.status,
679                "bytes_synced": attempt.bytes_synced,
680                "records_synced": attempt.records_synced,
681                "created_at": attempt.created_at.isoformat(),
682            }
683            for attempt in attempts
684        ]
685
686    return result

Get the status of a sync job from the Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_sync_jobs( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_jobs: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=20, description='Maximum number of jobs to return. Defaults to 20 if not specified. Maximum allowed value is 500.')], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description='When True, jobs are ordered newest-first (createdAt DESC). When False, jobs are ordered oldest-first (createdAt ASC). Defaults to True if `jobs_offset` is not specified. Cannot combine `from_tail=True` with `jobs_offset`.')], jobs_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of jobs to skip from the beginning. Cannot be combined with `from_tail=True`.')], job_type: typing.Annotated[airbyte_api.models.jobtypeenum.JobTypeEnum | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. If not specified, defaults to sync and reset jobs only (API default). Use 'refresh' to find refresh jobs or 'clear' to find clear jobs.")]) -> SyncJobListResult:
689@mcp_tool(
690    read_only=True,
691    idempotent=True,
692    open_world=True,
693    extra_help_text=CLOUD_AUTH_TIP_TEXT,
694)
695def list_cloud_sync_jobs(
696    ctx: Context,
697    connection_id: Annotated[
698        str,
699        Field(description="The ID of the Airbyte Cloud connection."),
700    ],
701    *,
702    workspace_id: Annotated[
703        str | None,
704        Field(
705            description=WORKSPACE_ID_TIP_TEXT,
706            default=None,
707        ),
708    ],
709    max_jobs: Annotated[
710        int,
711        Field(
712            description=(
713                "Maximum number of jobs to return. "
714                "Defaults to 20 if not specified. "
715                "Maximum allowed value is 500."
716            ),
717            default=20,
718        ),
719    ],
720    from_tail: Annotated[
721        bool | None,
722        Field(
723            description=(
724                "When True, jobs are ordered newest-first (createdAt DESC). "
725                "When False, jobs are ordered oldest-first (createdAt ASC). "
726                "Defaults to True if `jobs_offset` is not specified. "
727                "Cannot combine `from_tail=True` with `jobs_offset`."
728            ),
729            default=None,
730        ),
731    ],
732    jobs_offset: Annotated[
733        int | None,
734        Field(
735            description=(
736                "Number of jobs to skip from the beginning. "
737                "Cannot be combined with `from_tail=True`."
738            ),
739            default=None,
740        ),
741    ],
742    job_type: Annotated[
743        JobTypeEnum | None,
744        Field(
745            description=(
746                "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. "
747                "If not specified, defaults to sync and reset jobs only (API default). "
748                "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs."
749            ),
750            default=None,
751        ),
752    ],
753) -> SyncJobListResult:
754    """List sync jobs for a connection with pagination support.
755
756    This tool allows you to retrieve a list of sync jobs for a connection,
757    with control over ordering and pagination. By default, jobs are returned
758    newest-first (from_tail=True).
759    """
760    # Validate that jobs_offset and from_tail are not both set
761    if jobs_offset is not None and from_tail is True:
762        raise PyAirbyteInputError(
763            message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.",
764            context={"jobs_offset": jobs_offset, "from_tail": from_tail},
765        )
766
767    # Default to from_tail=True if neither is specified
768    if from_tail is None and jobs_offset is None:
769        from_tail = True
770    elif from_tail is None:
771        from_tail = False
772
773    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
774    connection = workspace.get_connection(connection_id=connection_id)
775
776    # Cap at 500 to avoid overloading agent context
777    effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20
778
779    sync_results = connection.get_previous_sync_logs(
780        limit=effective_limit,
781        offset=jobs_offset,
782        from_tail=from_tail,
783        job_type=job_type,
784    )
785
786    jobs = [
787        SyncJobResult(
788            job_id=sync_result.job_id,
789            status=str(sync_result.get_job_status()),
790            bytes_synced=sync_result.bytes_synced,
791            records_synced=sync_result.records_synced,
792            start_time=sync_result.start_time.isoformat(),
793            job_url=sync_result.job_url,
794        )
795        for sync_result in sync_results
796    ]
797
798    return SyncJobListResult(
799        jobs=jobs,
800        jobs_count=len(jobs),
801        jobs_offset=jobs_offset or 0,
802        from_tail=from_tail,
803    )

List sync jobs for a connection with pagination support.

This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_source_connectors( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter sources by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudSourceResult]:
806@mcp_tool(
807    read_only=True,
808    idempotent=True,
809    open_world=True,
810    extra_help_text=CLOUD_AUTH_TIP_TEXT,
811)
812def list_deployed_cloud_source_connectors(
813    ctx: Context,
814    *,
815    workspace_id: Annotated[
816        str | None,
817        Field(
818            description=WORKSPACE_ID_TIP_TEXT,
819            default=None,
820        ),
821    ],
822    name_contains: Annotated[
823        str | None,
824        Field(
825            description="Optional case-insensitive substring to filter sources by name",
826            default=None,
827        ),
828    ],
829    max_items_limit: Annotated[
830        int | None,
831        Field(
832            description="Optional maximum number of items to return (default: no limit)",
833            default=None,
834        ),
835    ],
836) -> list[CloudSourceResult]:
837    """List all deployed source connectors in the Airbyte Cloud workspace."""
838    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
839    sources = workspace.list_sources()
840
841    # Filter by name if requested
842    if name_contains:
843        needle = name_contains.lower()
844        sources = [s for s in sources if s.name is not None and needle in s.name.lower()]
845
846    # Apply limit if requested
847    if max_items_limit is not None:
848        sources = sources[:max_items_limit]
849
850    # Note: name and url are guaranteed non-null from list API responses
851    return [
852        CloudSourceResult(
853            id=source.source_id,
854            name=cast(str, source.name),
855            url=cast(str, source.connector_url),
856        )
857        for source in sources
858    ]

List all deployed source connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_destination_connectors( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter destinations by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudDestinationResult]:
861@mcp_tool(
862    read_only=True,
863    idempotent=True,
864    open_world=True,
865    extra_help_text=CLOUD_AUTH_TIP_TEXT,
866)
867def list_deployed_cloud_destination_connectors(
868    ctx: Context,
869    *,
870    workspace_id: Annotated[
871        str | None,
872        Field(
873            description=WORKSPACE_ID_TIP_TEXT,
874            default=None,
875        ),
876    ],
877    name_contains: Annotated[
878        str | None,
879        Field(
880            description="Optional case-insensitive substring to filter destinations by name",
881            default=None,
882        ),
883    ],
884    max_items_limit: Annotated[
885        int | None,
886        Field(
887            description="Optional maximum number of items to return (default: no limit)",
888            default=None,
889        ),
890    ],
891) -> list[CloudDestinationResult]:
892    """List all deployed destination connectors in the Airbyte Cloud workspace."""
893    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
894    destinations = workspace.list_destinations()
895
896    # Filter by name if requested
897    if name_contains:
898        needle = name_contains.lower()
899        destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()]
900
901    # Apply limit if requested
902    if max_items_limit is not None:
903        destinations = destinations[:max_items_limit]
904
905    # Note: name and url are guaranteed non-null from list API responses
906    return [
907        CloudDestinationResult(
908            id=destination.destination_id,
909            name=cast(str, destination.name),
910            url=cast(str, destination.connector_url),
911        )
912        for destination in destinations
913    ]

List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_source( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the source to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudSourceDetails:
916@mcp_tool(
917    read_only=True,
918    idempotent=True,
919    open_world=True,
920    extra_help_text=CLOUD_AUTH_TIP_TEXT,
921)
922def describe_cloud_source(
923    ctx: Context,
924    source_id: Annotated[
925        str,
926        Field(description="The ID of the source to describe."),
927    ],
928    *,
929    workspace_id: Annotated[
930        str | None,
931        Field(
932            description=WORKSPACE_ID_TIP_TEXT,
933            default=None,
934        ),
935    ],
936) -> CloudSourceDetails:
937    """Get detailed information about a specific deployed source connector."""
938    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
939    source = workspace.get_source(source_id=source_id)
940
941    # Access name property to ensure _connector_info is populated
942    source_name = cast(str, source.name)
943
944    return CloudSourceDetails(
945        source_id=source.source_id,
946        source_name=source_name,
947        source_url=source.connector_url,
948        connector_definition_id=source._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
949    )

Get detailed information about a specific deployed source connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_destination( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the destination to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudDestinationDetails:
952@mcp_tool(
953    read_only=True,
954    idempotent=True,
955    open_world=True,
956    extra_help_text=CLOUD_AUTH_TIP_TEXT,
957)
958def describe_cloud_destination(
959    ctx: Context,
960    destination_id: Annotated[
961        str,
962        Field(description="The ID of the destination to describe."),
963    ],
964    *,
965    workspace_id: Annotated[
966        str | None,
967        Field(
968            description=WORKSPACE_ID_TIP_TEXT,
969            default=None,
970        ),
971    ],
972) -> CloudDestinationDetails:
973    """Get detailed information about a specific deployed destination connector."""
974    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
975    destination = workspace.get_destination(destination_id=destination_id)
976
977    # Access name property to ensure _connector_info is populated
978    destination_name = cast(str, destination.name)
979
980    return CloudDestinationDetails(
981        destination_id=destination.destination_id,
982        destination_name=destination_name,
983        destination_url=destination.connector_url,
984        connector_definition_id=destination._connector_info.definition_id,  # noqa: SLF001  # type: ignore[union-attr]
985    )

Get detailed information about a specific deployed destination connector.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to describe.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> CloudConnectionDetails:
 988@mcp_tool(
 989    read_only=True,
 990    idempotent=True,
 991    open_world=True,
 992    extra_help_text=CLOUD_AUTH_TIP_TEXT,
 993)
 994def describe_cloud_connection(
 995    ctx: Context,
 996    connection_id: Annotated[
 997        str,
 998        Field(description="The ID of the connection to describe."),
 999    ],
1000    *,
1001    workspace_id: Annotated[
1002        str | None,
1003        Field(
1004            description=WORKSPACE_ID_TIP_TEXT,
1005            default=None,
1006        ),
1007    ],
1008) -> CloudConnectionDetails:
1009    """Get detailed information about a specific deployed connection."""
1010    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1011    connection = workspace.get_connection(connection_id=connection_id)
1012
1013    return CloudConnectionDetails(
1014        connection_id=connection.connection_id,
1015        connection_name=cast(str, connection.name),
1016        connection_url=cast(str, connection.connection_url),
1017        source_id=connection.source_id,
1018        source_name=cast(str, connection.source.name),
1019        destination_id=connection.destination_id,
1020        destination_name=cast(str, connection.destination.name),
1021        selected_streams=connection.stream_names,
1022        table_prefix=connection.table_prefix,
1023    )

Get detailed information about a specific deployed connection.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_cloud_sync_logs( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], job_id: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional job ID. If not provided, the latest job will be used.')] = None, attempt_number: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=True, description='Optional attempt number. If not provided, the latest attempt will be used.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], max_lines: typing.Annotated[int, FieldInfo(annotation=NoneType, required=False, default=4000, description="Maximum number of lines to return. Defaults to 4000 if not specified. If '0' is provided, no limit is applied.")], from_tail: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Pull from the end of the log text if total lines is greater than 'max_lines'. Defaults to True if `line_offset` is not specified. Cannot combine `from_tail=True` with `line_offset`.")], line_offset: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Number of lines to skip from the beginning of the logs. Cannot be combined with `from_tail=True`.')]) -> LogReadResult:
1026@mcp_tool(
1027    read_only=True,
1028    idempotent=True,
1029    open_world=True,
1030    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1031)
1032def get_cloud_sync_logs(
1033    ctx: Context,
1034    connection_id: Annotated[
1035        str,
1036        Field(description="The ID of the Airbyte Cloud connection."),
1037    ],
1038    job_id: Annotated[
1039        int | None,
1040        Field(description="Optional job ID. If not provided, the latest job will be used."),
1041    ] = None,
1042    attempt_number: Annotated[
1043        int | None,
1044        Field(
1045            description="Optional attempt number. If not provided, the latest attempt will be used."
1046        ),
1047    ] = None,
1048    *,
1049    workspace_id: Annotated[
1050        str | None,
1051        Field(
1052            description=WORKSPACE_ID_TIP_TEXT,
1053            default=None,
1054        ),
1055    ],
1056    max_lines: Annotated[
1057        int,
1058        Field(
1059            description=(
1060                "Maximum number of lines to return. "
1061                "Defaults to 4000 if not specified. "
1062                "If '0' is provided, no limit is applied."
1063            ),
1064            default=4000,
1065        ),
1066    ],
1067    from_tail: Annotated[
1068        bool | None,
1069        Field(
1070            description=(
1071                "Pull from the end of the log text if total lines is greater than 'max_lines'. "
1072                "Defaults to True if `line_offset` is not specified. "
1073                "Cannot combine `from_tail=True` with `line_offset`."
1074            ),
1075            default=None,
1076        ),
1077    ],
1078    line_offset: Annotated[
1079        int | None,
1080        Field(
1081            description=(
1082                "Number of lines to skip from the beginning of the logs. "
1083                "Cannot be combined with `from_tail=True`."
1084            ),
1085            default=None,
1086        ),
1087    ],
1088) -> LogReadResult:
1089    """Get the logs from a sync job attempt on Airbyte Cloud."""
1090    # Validate that line_offset and from_tail are not both set
1091    if line_offset is not None and from_tail:
1092        raise PyAirbyteInputError(
1093            message="Cannot specify both 'line_offset' and 'from_tail' parameters.",
1094            context={"line_offset": line_offset, "from_tail": from_tail},
1095        )
1096
1097    if from_tail is None and line_offset is None:
1098        from_tail = True
1099    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1100    connection = workspace.get_connection(connection_id=connection_id)
1101
1102    sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id)
1103
1104    if not sync_result:
1105        raise AirbyteMissingResourceError(
1106            resource_type="sync job",
1107            resource_name_or_id=connection_id,
1108        )
1109
1110    attempts = sync_result.get_attempts()
1111
1112    if not attempts:
1113        raise AirbyteMissingResourceError(
1114            resource_type="sync attempt",
1115            resource_name_or_id=str(sync_result.job_id),
1116        )
1117
1118    if attempt_number is not None:
1119        target_attempt = None
1120        for attempt in attempts:
1121            if attempt.attempt_number == attempt_number:
1122                target_attempt = attempt
1123                break
1124
1125        if target_attempt is None:
1126            raise AirbyteMissingResourceError(
1127                resource_type="sync attempt",
1128                resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}",
1129            )
1130    else:
1131        target_attempt = max(attempts, key=lambda a: a.attempt_number)
1132
1133    logs = target_attempt.get_full_log_text()
1134
1135    if not logs:
1136        # Return empty result with zero lines
1137        return LogReadResult(
1138            log_text=(
1139                f"[No logs available for job '{sync_result.job_id}', "
1140                f"attempt {target_attempt.attempt_number}.]"
1141            ),
1142            log_text_start_line=1,
1143            log_text_line_count=0,
1144            total_log_lines_available=0,
1145            job_id=sync_result.job_id,
1146            attempt_number=target_attempt.attempt_number,
1147        )
1148
1149    # Apply line limiting
1150    log_lines = logs.splitlines()
1151    total_lines = len(log_lines)
1152
1153    # Determine effective max_lines (0 means no limit)
1154    effective_max = total_lines if max_lines == 0 else max_lines
1155
1156    # Calculate start_index and slice based on from_tail or line_offset
1157    if from_tail:
1158        start_index = max(0, total_lines - effective_max)
1159        selected_lines = log_lines[start_index:][:effective_max]
1160    else:
1161        start_index = line_offset or 0
1162        selected_lines = log_lines[start_index : start_index + effective_max]
1163
1164    return LogReadResult(
1165        log_text="\n".join(selected_lines),
1166        log_text_start_line=start_index + 1,  # Convert to 1-based index
1167        log_text_line_count=len(selected_lines),
1168        total_log_lines_available=total_lines,
1169        job_id=sync_result.job_id,
1170        attempt_number=target_attempt.attempt_number,
1171    )

Get the logs from a sync job attempt on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_deployed_cloud_connections( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional case-insensitive substring to filter connections by name')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')], with_connection_status: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description="If True, include status info for each connection's most recent sync job")], failing_connections_only: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=False, description='If True, only return connections with failed/cancelled last sync')]) -> list[CloudConnectionResult]:
1174@mcp_tool(
1175    read_only=True,
1176    idempotent=True,
1177    open_world=True,
1178    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1179)
1180def list_deployed_cloud_connections(
1181    ctx: Context,
1182    *,
1183    workspace_id: Annotated[
1184        str | None,
1185        Field(
1186            description=WORKSPACE_ID_TIP_TEXT,
1187            default=None,
1188        ),
1189    ],
1190    name_contains: Annotated[
1191        str | None,
1192        Field(
1193            description="Optional case-insensitive substring to filter connections by name",
1194            default=None,
1195        ),
1196    ],
1197    max_items_limit: Annotated[
1198        int | None,
1199        Field(
1200            description="Optional maximum number of items to return (default: no limit)",
1201            default=None,
1202        ),
1203    ],
1204    with_connection_status: Annotated[
1205        bool | None,
1206        Field(
1207            description="If True, include status info for each connection's most recent sync job",
1208            default=False,
1209        ),
1210    ],
1211    failing_connections_only: Annotated[
1212        bool | None,
1213        Field(
1214            description="If True, only return connections with failed/cancelled last sync",
1215            default=False,
1216        ),
1217    ],
1218) -> list[CloudConnectionResult]:
1219    """List all deployed connections in the Airbyte Cloud workspace.
1220
1221    When with_connection_status is True, each connection result will include
1222    information about the most recent sync job status, skipping over any
1223    currently in-progress syncs to find the last completed job.
1224
1225    When failing_connections_only is True, only connections where the most
1226    recent completed sync job failed or was cancelled will be returned.
1227    This implicitly enables with_connection_status.
1228    """
1229    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1230    connections = workspace.list_connections()
1231
1232    # Filter by name if requested
1233    if name_contains:
1234        needle = name_contains.lower()
1235        connections = [c for c in connections if c.name is not None and needle in c.name.lower()]
1236
1237    # If failing_connections_only is True, implicitly enable with_connection_status
1238    if failing_connections_only:
1239        with_connection_status = True
1240
1241    results: list[CloudConnectionResult] = []
1242
1243    for connection in connections:
1244        last_job_status: str | None = None
1245        last_job_id: int | None = None
1246        last_job_time: str | None = None
1247        currently_running_job_id: int | None = None
1248        currently_running_job_start_time: str | None = None
1249
1250        if with_connection_status:
1251            sync_logs = connection.get_previous_sync_logs(limit=5)
1252            last_completed_job_status = None  # Keep enum for comparison
1253
1254            for sync_result in sync_logs:
1255                job_status = sync_result.get_job_status()
1256
1257                if not sync_result.is_job_complete():
1258                    currently_running_job_id = sync_result.job_id
1259                    currently_running_job_start_time = sync_result.start_time.isoformat()
1260                    continue
1261
1262                last_completed_job_status = job_status
1263                last_job_status = str(job_status.value) if job_status else None
1264                last_job_id = sync_result.job_id
1265                last_job_time = sync_result.start_time.isoformat()
1266                break
1267
1268            if failing_connections_only and (
1269                last_completed_job_status is None
1270                or last_completed_job_status not in FAILED_STATUSES
1271            ):
1272                continue
1273
1274        results.append(
1275            CloudConnectionResult(
1276                id=connection.connection_id,
1277                name=cast(str, connection.name),
1278                url=cast(str, connection.connection_url),
1279                source_id=connection.source_id,
1280                destination_id=connection.destination_id,
1281                last_job_status=last_job_status,
1282                last_job_id=last_job_id,
1283                last_job_time=last_job_time,
1284                currently_running_job_id=currently_running_job_id,
1285                currently_running_job_start_time=currently_running_job_start_time,
1286            )
1287        )
1288
1289        if max_items_limit is not None and len(results) >= max_items_limit:
1290            break
1291
1292    return results

List all deployed connections in the Airbyte Cloud workspace.

When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.

When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def list_cloud_workspaces( ctx: fastmcp.server.context.Context, *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')], name_contains: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional substring to filter workspaces by name (server-side filtering)')], max_items_limit: typing.Annotated[int | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional maximum number of items to return (default: no limit)')]) -> list[CloudWorkspaceResult]:
1412@mcp_tool(
1413    read_only=True,
1414    idempotent=True,
1415    open_world=True,
1416    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1417)
1418def list_cloud_workspaces(
1419    ctx: Context,
1420    *,
1421    organization_id: Annotated[
1422        str | None,
1423        Field(
1424            description="Organization ID. Required if organization_name is not provided.",
1425            default=None,
1426        ),
1427    ],
1428    organization_name: Annotated[
1429        str | None,
1430        Field(
1431            description=(
1432                "Organization name (exact match). " "Required if organization_id is not provided."
1433            ),
1434            default=None,
1435        ),
1436    ],
1437    name_contains: Annotated[
1438        str | None,
1439        Field(
1440            description="Optional substring to filter workspaces by name (server-side filtering)",
1441            default=None,
1442        ),
1443    ],
1444    max_items_limit: Annotated[
1445        int | None,
1446        Field(
1447            description="Optional maximum number of items to return (default: no limit)",
1448            default=None,
1449        ),
1450    ],
1451) -> list[CloudWorkspaceResult]:
1452    """List all workspaces in a specific organization.
1453
1454    Requires either organization_id OR organization_name (exact match) to be provided.
1455    This tool will NOT list workspaces across all organizations - you must specify
1456    which organization to list workspaces from.
1457    """
1458    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1459    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1460    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1461    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1462
1463    resolved_org_id = _resolve_organization_id(
1464        organization_id=organization_id,
1465        organization_name=organization_name,
1466        api_root=api_url,
1467        client_id=SecretString(client_id) if client_id else None,
1468        client_secret=SecretString(client_secret) if client_secret else None,
1469        bearer_token=SecretString(bearer_token) if bearer_token else None,
1470    )
1471
1472    workspaces = api_util.list_workspaces_in_organization(
1473        organization_id=resolved_org_id,
1474        api_root=api_url,
1475        client_id=SecretString(client_id) if client_id else None,
1476        client_secret=SecretString(client_secret) if client_secret else None,
1477        bearer_token=SecretString(bearer_token) if bearer_token else None,
1478        name_contains=name_contains,
1479        max_items_limit=max_items_limit,
1480    )
1481
1482    return [
1483        CloudWorkspaceResult(
1484            workspace_id=ws.get("workspaceId", ""),
1485            workspace_name=ws.get("name", ""),
1486            organization_id=ws.get("organizationId", ""),
1487        )
1488        for ws in workspaces
1489    ]

List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def describe_cloud_organization( ctx: fastmcp.server.context.Context, *, organization_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization ID. Required if organization_name is not provided.')], organization_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Organization name (exact match). Required if organization_id is not provided.')]) -> CloudOrganizationResult:
1492@mcp_tool(
1493    read_only=True,
1494    idempotent=True,
1495    open_world=True,
1496    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1497)
1498def describe_cloud_organization(
1499    ctx: Context,
1500    *,
1501    organization_id: Annotated[
1502        str | None,
1503        Field(
1504            description="Organization ID. Required if organization_name is not provided.",
1505            default=None,
1506        ),
1507    ],
1508    organization_name: Annotated[
1509        str | None,
1510        Field(
1511            description=(
1512                "Organization name (exact match). " "Required if organization_id is not provided."
1513            ),
1514            default=None,
1515        ),
1516    ],
1517) -> CloudOrganizationResult:
1518    """Get details about a specific organization including billing status.
1519
1520    Requires either organization_id OR organization_name (exact match) to be provided.
1521    This tool is useful for looking up an organization's ID from its name, or vice versa.
1522    """
1523    bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN)
1524    client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID)
1525    client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET)
1526    api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT
1527
1528    org = _resolve_organization(
1529        organization_id=organization_id,
1530        organization_name=organization_name,
1531        api_root=api_url,
1532        client_id=SecretString(client_id) if client_id else None,
1533        client_secret=SecretString(client_secret) if client_secret else None,
1534        bearer_token=SecretString(bearer_token) if bearer_token else None,
1535    )
1536
1537    # CloudOrganization has lazy loading of billing properties
1538    return CloudOrganizationResult(
1539        id=org.organization_id,
1540        name=org.organization_name,
1541        email=org.email,
1542        payment_status=org.payment_status,
1543        subscription_status=org.subscription_status,
1544        is_account_locked=org.is_account_locked,
1545    )

Get details about a specific organization including billing status.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def publish_custom_source_definition( ctx: fastmcp.server.context.Context, name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The name for the custom connector definition.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The Low-code CDK manifest as a YAML string or file path. Required for YAML connectors.')] = None, unique: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to require a unique name.')] = True, pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before publishing.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project, allowing immediate test read operations.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1562@mcp_tool(
1563    open_world=True,
1564    extra_help_text=CLOUD_AUTH_TIP_TEXT,
1565)
1566def publish_custom_source_definition(
1567    ctx: Context,
1568    name: Annotated[
1569        str,
1570        Field(description="The name for the custom connector definition."),
1571    ],
1572    *,
1573    workspace_id: Annotated[
1574        str | None,
1575        Field(
1576            description=WORKSPACE_ID_TIP_TEXT,
1577            default=None,
1578        ),
1579    ],
1580    manifest_yaml: Annotated[
1581        str | Path | None,
1582        Field(
1583            description=(
1584                "The Low-code CDK manifest as a YAML string or file path. "
1585                "Required for YAML connectors."
1586            ),
1587            default=None,
1588        ),
1589    ] = None,
1590    unique: Annotated[
1591        bool,
1592        Field(
1593            description="Whether to require a unique name.",
1594            default=True,
1595        ),
1596    ] = True,
1597    pre_validate: Annotated[
1598        bool,
1599        Field(
1600            description="Whether to validate the manifest client-side before publishing.",
1601            default=True,
1602        ),
1603    ] = True,
1604    testing_values: Annotated[
1605        dict | str | None,
1606        Field(
1607            description=(
1608                "Optional testing configuration values for the Builder UI. "
1609                "Can be provided as a JSON object or JSON string. "
1610                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1611                "If provided, these values replace any existing testing values "
1612                "for the connector builder project, allowing immediate test read operations."
1613            ),
1614            default=None,
1615        ),
1616    ],
1617    testing_values_secret_name: Annotated[
1618        str | None,
1619        Field(
1620            description=(
1621                "Optional name of a secret containing testing configuration values "
1622                "in JSON or YAML format. The secret will be resolved by the MCP "
1623                "server and merged into testing_values, with secret values taking "
1624                "precedence. This lets the agent reference secrets without sending "
1625                "raw values as tool arguments."
1626            ),
1627            default=None,
1628        ),
1629    ],
1630) -> str:
1631    """Publish a custom YAML source connector definition to Airbyte Cloud.
1632
1633    Note: Only YAML (declarative) connectors are currently supported.
1634    Docker-based custom sources are not yet available.
1635    """
1636    processed_manifest = manifest_yaml
1637    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1638        processed_manifest = Path(manifest_yaml)
1639
1640    # Resolve testing values from inline config and/or secret
1641    testing_values_dict: dict[str, Any] | None = None
1642    if testing_values is not None or testing_values_secret_name is not None:
1643        testing_values_dict = (
1644            resolve_connector_config(
1645                config=testing_values,
1646                config_secret_name=testing_values_secret_name,
1647            )
1648            or None
1649        )
1650
1651    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1652    custom_source = workspace.publish_custom_source_definition(
1653        name=name,
1654        manifest_yaml=processed_manifest,
1655        unique=unique,
1656        pre_validate=pre_validate,
1657        testing_values=testing_values_dict,
1658    )
1659    register_guid_created_in_session(custom_source.definition_id)
1660    return (
1661        "Successfully published custom YAML source definition:\n"
1662        + _get_custom_source_definition_description(
1663            custom_source=custom_source,
1664        )
1665        + "\n"
1666    )

Publish a custom YAML source connector definition to Airbyte Cloud.

Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True)
def list_custom_source_definitions( ctx: fastmcp.server.context.Context, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> list[dict[str, typing.Any]]:
1669@mcp_tool(
1670    read_only=True,
1671    idempotent=True,
1672    open_world=True,
1673)
1674def list_custom_source_definitions(
1675    ctx: Context,
1676    *,
1677    workspace_id: Annotated[
1678        str | None,
1679        Field(
1680            description=WORKSPACE_ID_TIP_TEXT,
1681            default=None,
1682        ),
1683    ],
1684) -> list[dict[str, Any]]:
1685    """List custom YAML source definitions in the Airbyte Cloud workspace.
1686
1687    Note: Only YAML (declarative) connectors are currently supported.
1688    Docker-based custom sources are not yet available.
1689    """
1690    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1691    definitions = workspace.list_custom_source_definitions(
1692        definition_type="yaml",
1693    )
1694
1695    return [
1696        {
1697            "definition_id": d.definition_id,
1698            "name": d.name,
1699            "version": d.version,
1700            "connector_builder_project_url": d.connector_builder_project_url,
1701        }
1702        for d in definitions
1703    ]

List custom YAML source definitions in the Airbyte Cloud workspace.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(read_only=True, idempotent=True, open_world=True)
def get_custom_source_definition( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to retrieve.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], include_draft: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description="Whether to include the Connector Builder draft manifest in the response. If True and a draft exists, the response will include 'has_draft' and 'draft_manifest' fields. Defaults to False.")] = False) -> dict[str, typing.Any]:
1706@mcp_tool(
1707    read_only=True,
1708    idempotent=True,
1709    open_world=True,
1710)
1711def get_custom_source_definition(
1712    ctx: Context,
1713    definition_id: Annotated[
1714        str,
1715        Field(description="The ID of the custom source definition to retrieve."),
1716    ],
1717    *,
1718    workspace_id: Annotated[
1719        str | None,
1720        Field(
1721            description=WORKSPACE_ID_TIP_TEXT,
1722            default=None,
1723        ),
1724    ],
1725    include_draft: Annotated[
1726        bool,
1727        Field(
1728            description=(
1729                "Whether to include the Connector Builder draft manifest in the response. "
1730                "If True and a draft exists, the response will include 'has_draft' and "
1731                "'draft_manifest' fields. Defaults to False."
1732            ),
1733            default=False,
1734        ),
1735    ] = False,
1736) -> dict[str, Any]:
1737    """Get a custom YAML source definition from Airbyte Cloud, including its manifest.
1738
1739    Returns the full definition details including the published manifest YAML content.
1740    Optionally includes the Connector Builder draft manifest (unpublished changes)
1741    when include_draft=True.
1742
1743    Note: Only YAML (declarative) connectors are currently supported.
1744    Docker-based custom sources are not yet available.
1745    """
1746    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1747    definition = workspace.get_custom_source_definition(
1748        definition_id=definition_id,
1749        definition_type="yaml",
1750    )
1751
1752    result: dict[str, Any] = {
1753        "definition_id": definition.definition_id,
1754        "name": definition.name,
1755        "version": definition.version,
1756        "connector_builder_project_id": definition.connector_builder_project_id,
1757        "connector_builder_project_url": definition.connector_builder_project_url,
1758        "manifest": definition.manifest,
1759    }
1760
1761    if include_draft:
1762        result["has_draft"] = definition.has_draft
1763        result["draft_manifest"] = definition.draft_manifest
1764
1765    return result

Get a custom YAML source definition from Airbyte Cloud, including its manifest.

Returns the full definition details including the published manifest YAML content. Optionally includes the Connector Builder draft manifest (unpublished changes) when include_draft=True.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(read_only=True, idempotent=True, open_world=True)
def get_connector_builder_draft_manifest( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to retrieve the draft for.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any]:
1768@mcp_tool(
1769    read_only=True,
1770    idempotent=True,
1771    open_world=True,
1772)
1773def get_connector_builder_draft_manifest(
1774    ctx: Context,
1775    definition_id: Annotated[
1776        str,
1777        Field(description="The ID of the custom source definition to retrieve the draft for."),
1778    ],
1779    *,
1780    workspace_id: Annotated[
1781        str | None,
1782        Field(
1783            description=WORKSPACE_ID_TIP_TEXT,
1784            default=None,
1785        ),
1786    ],
1787) -> dict[str, Any]:
1788    """Get the Connector Builder draft manifest for a custom source definition.
1789
1790    Returns the working draft manifest that has been saved in the Connector Builder UI
1791    but not yet published. This is useful for inspecting what a user is currently working
1792    on before they publish their changes.
1793
1794    If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None.
1795    The published manifest is always included for comparison.
1796    """
1797    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1798    definition = workspace.get_custom_source_definition(
1799        definition_id=definition_id,
1800        definition_type="yaml",
1801    )
1802
1803    return {
1804        "definition_id": definition.definition_id,
1805        "name": definition.name,
1806        "connector_builder_project_id": definition.connector_builder_project_id,
1807        "connector_builder_project_url": definition.connector_builder_project_url,
1808        "has_draft": definition.has_draft,
1809        "draft_manifest": definition.draft_manifest,
1810        "published_manifest": definition.manifest,
1811    }

Get the Connector Builder draft manifest for a custom source definition.

Returns the working draft manifest that has been saved in the Connector Builder UI but not yet published. This is useful for inspecting what a user is currently working on before they publish their changes.

If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. The published manifest is always included for comparison.

@mcp_tool(destructive=True, open_world=True)
def update_custom_source_definition( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the definition to update.')], manifest_yaml: typing.Annotated[str | pathlib.Path | None, FieldInfo(annotation=NoneType, required=False, default=None, description='New manifest as YAML string or file path. Optional; omit to update only testing values.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')], pre_validate: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=True, description='Whether to validate the manifest client-side before updating.')] = True, testing_values: typing.Annotated[dict | str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Optional testing configuration values for the Builder UI. Can be provided as a JSON object or JSON string. Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. If provided, these values replace any existing testing values for the connector builder project. The entire testing values object is overwritten, so pass the full set of values you want to persist.")], testing_values_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Optional name of a secret containing testing configuration values in JSON or YAML format. The secret will be resolved by the MCP server and merged into testing_values, with secret values taking precedence. This lets the agent reference secrets without sending raw values as tool arguments.')]) -> str:
1814@mcp_tool(
1815    destructive=True,
1816    open_world=True,
1817)
1818def update_custom_source_definition(
1819    ctx: Context,
1820    definition_id: Annotated[
1821        str,
1822        Field(description="The ID of the definition to update."),
1823    ],
1824    manifest_yaml: Annotated[
1825        str | Path | None,
1826        Field(
1827            description=(
1828                "New manifest as YAML string or file path. "
1829                "Optional; omit to update only testing values."
1830            ),
1831            default=None,
1832        ),
1833    ] = None,
1834    *,
1835    workspace_id: Annotated[
1836        str | None,
1837        Field(
1838            description=WORKSPACE_ID_TIP_TEXT,
1839            default=None,
1840        ),
1841    ],
1842    pre_validate: Annotated[
1843        bool,
1844        Field(
1845            description="Whether to validate the manifest client-side before updating.",
1846            default=True,
1847        ),
1848    ] = True,
1849    testing_values: Annotated[
1850        dict | str | None,
1851        Field(
1852            description=(
1853                "Optional testing configuration values for the Builder UI. "
1854                "Can be provided as a JSON object or JSON string. "
1855                "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. "
1856                "If provided, these values replace any existing testing values "
1857                "for the connector builder project. The entire testing values object "
1858                "is overwritten, so pass the full set of values you want to persist."
1859            ),
1860            default=None,
1861        ),
1862    ],
1863    testing_values_secret_name: Annotated[
1864        str | None,
1865        Field(
1866            description=(
1867                "Optional name of a secret containing testing configuration values "
1868                "in JSON or YAML format. The secret will be resolved by the MCP "
1869                "server and merged into testing_values, with secret values taking "
1870                "precedence. This lets the agent reference secrets without sending "
1871                "raw values as tool arguments."
1872            ),
1873            default=None,
1874        ),
1875    ],
1876) -> str:
1877    """Update a custom YAML source definition in Airbyte Cloud.
1878
1879    Updates the manifest and/or testing values for an existing custom source definition.
1880    At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1881    """
1882    check_guid_created_in_session(definition_id)
1883
1884    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1885
1886    if manifest_yaml is None and testing_values is None and testing_values_secret_name is None:
1887        raise PyAirbyteInputError(
1888            message=(
1889                "At least one of manifest_yaml, testing_values, or testing_values_secret_name "
1890                "must be provided to update a custom source definition."
1891            ),
1892            context={
1893                "definition_id": definition_id,
1894                "workspace_id": workspace.workspace_id,
1895            },
1896        )
1897
1898    processed_manifest: str | Path | None = manifest_yaml
1899    if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml:
1900        processed_manifest = Path(manifest_yaml)
1901
1902    # Resolve testing values from inline config and/or secret
1903    testing_values_dict: dict[str, Any] | None = None
1904    if testing_values is not None or testing_values_secret_name is not None:
1905        testing_values_dict = (
1906            resolve_connector_config(
1907                config=testing_values,
1908                config_secret_name=testing_values_secret_name,
1909            )
1910            or None
1911        )
1912
1913    definition = workspace.get_custom_source_definition(
1914        definition_id=definition_id,
1915        definition_type="yaml",
1916    )
1917    custom_source: CustomCloudSourceDefinition = definition
1918
1919    if processed_manifest is not None:
1920        custom_source = definition.update_definition(
1921            manifest_yaml=processed_manifest,
1922            pre_validate=pre_validate,
1923        )
1924
1925    if testing_values_dict is not None:
1926        custom_source.set_testing_values(testing_values_dict)
1927
1928    return (
1929        "Successfully updated custom YAML source definition:\n"
1930        + _get_custom_source_definition_description(
1931            custom_source=custom_source,
1932        )
1933    )

Update a custom YAML source definition in Airbyte Cloud.

Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.

@mcp_tool(destructive=True, open_world=True)
def permanently_delete_custom_source_definition( ctx: fastmcp.server.context.Context, definition_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the custom source definition to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the custom source definition (for verification).')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
1936@mcp_tool(
1937    destructive=True,
1938    open_world=True,
1939)
1940def permanently_delete_custom_source_definition(
1941    ctx: Context,
1942    definition_id: Annotated[
1943        str,
1944        Field(description="The ID of the custom source definition to delete."),
1945    ],
1946    name: Annotated[
1947        str,
1948        Field(description="The expected name of the custom source definition (for verification)."),
1949    ],
1950    *,
1951    workspace_id: Annotated[
1952        str | None,
1953        Field(
1954            description=WORKSPACE_ID_TIP_TEXT,
1955            default=None,
1956        ),
1957    ],
1958) -> str:
1959    """Permanently delete a custom YAML source definition from Airbyte Cloud.
1960
1961    IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme"
1962    (case insensitive).
1963
1964    If the connector does not meet this requirement, the deletion will be rejected with a
1965    helpful error message. Instruct the user to rename the connector appropriately to authorize
1966    the deletion.
1967
1968    The provided name must match the actual name of the definition for the operation to proceed.
1969    This is a safety measure to ensure you are deleting the correct resource.
1970
1971    Note: Only YAML (declarative) connectors are currently supported.
1972    Docker-based custom sources are not yet available.
1973    """
1974    check_guid_created_in_session(definition_id)
1975    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
1976    definition = workspace.get_custom_source_definition(
1977        definition_id=definition_id,
1978        definition_type="yaml",
1979    )
1980    actual_name: str = definition.name
1981
1982    # Verify the name matches
1983    if actual_name != name:
1984        raise PyAirbyteInputError(
1985            message=(
1986                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
1987                "The provided name must exactly match the definition's actual name. "
1988                "This is a safety measure to prevent accidental deletion."
1989            ),
1990            context={
1991                "definition_id": definition_id,
1992                "expected_name": name,
1993                "actual_name": actual_name,
1994            },
1995        )
1996
1997    definition.permanently_delete(
1998        safe_mode=True,  # Hard-coded safe mode for extra protection when running in LLM agents.
1999    )
2000    return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"

Permanently delete a custom YAML source definition from Airbyte Cloud.

IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).

If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.

The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.

Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_source( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the source (for verification).')]) -> str:
2003@mcp_tool(
2004    destructive=True,
2005    open_world=True,
2006    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2007)
2008def permanently_delete_cloud_source(
2009    ctx: Context,
2010    source_id: Annotated[
2011        str,
2012        Field(description="The ID of the deployed source to delete."),
2013    ],
2014    name: Annotated[
2015        str,
2016        Field(description="The expected name of the source (for verification)."),
2017    ],
2018) -> str:
2019    """Permanently delete a deployed source connector from Airbyte Cloud.
2020
2021    IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
2022    (case insensitive).
2023
2024    If the source does not meet this requirement, the deletion will be rejected with a
2025    helpful error message. Instruct the user to rename the source appropriately to authorize
2026    the deletion.
2027
2028    The provided name must match the actual name of the source for the operation to proceed.
2029    This is a safety measure to ensure you are deleting the correct resource.
2030    """
2031    check_guid_created_in_session(source_id)
2032    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2033    source = workspace.get_source(source_id=source_id)
2034    actual_name: str = cast(str, source.name)
2035
2036    # Verify the name matches
2037    if actual_name != name:
2038        raise PyAirbyteInputError(
2039            message=(
2040                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2041                "The provided name must exactly match the source's actual name. "
2042                "This is a safety measure to prevent accidental deletion."
2043            ),
2044            context={
2045                "source_id": source_id,
2046                "expected_name": name,
2047                "actual_name": actual_name,
2048            },
2049        )
2050
2051    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2052    workspace.permanently_delete_source(
2053        source=source_id,
2054        safe_mode=True,  # Requires name to contain "delete-me" or "deleteme" (case insensitive)
2055    )
2056    return f"Successfully deleted source '{actual_name}' (ID: {source_id})"

Permanently delete a deployed source connector from Airbyte Cloud.

IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).

If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.

The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_destination( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the destination (for verification).')]) -> str:
2059@mcp_tool(
2060    destructive=True,
2061    open_world=True,
2062    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2063)
2064def permanently_delete_cloud_destination(
2065    ctx: Context,
2066    destination_id: Annotated[
2067        str,
2068        Field(description="The ID of the deployed destination to delete."),
2069    ],
2070    name: Annotated[
2071        str,
2072        Field(description="The expected name of the destination (for verification)."),
2073    ],
2074) -> str:
2075    """Permanently delete a deployed destination connector from Airbyte Cloud.
2076
2077    IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
2078    (case insensitive).
2079
2080    If the destination does not meet this requirement, the deletion will be rejected with a
2081    helpful error message. Instruct the user to rename the destination appropriately to authorize
2082    the deletion.
2083
2084    The provided name must match the actual name of the destination for the operation to proceed.
2085    This is a safety measure to ensure you are deleting the correct resource.
2086    """
2087    check_guid_created_in_session(destination_id)
2088    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2089    destination = workspace.get_destination(destination_id=destination_id)
2090    actual_name: str = cast(str, destination.name)
2091
2092    # Verify the name matches
2093    if actual_name != name:
2094        raise PyAirbyteInputError(
2095            message=(
2096                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2097                "The provided name must exactly match the destination's actual name. "
2098                "This is a safety measure to prevent accidental deletion."
2099            ),
2100            context={
2101                "destination_id": destination_id,
2102                "expected_name": name,
2103                "actual_name": actual_name,
2104            },
2105        )
2106
2107    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2108    workspace.permanently_delete_destination(
2109        destination=destination_id,
2110        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2111    )
2112    return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"

Permanently delete a deployed destination connector from Airbyte Cloud.

IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).

If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.

The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def permanently_delete_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to delete.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The expected name of the connection (for verification).')], *, cascade_delete_source: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the source connector associated with this connection.')] = False, cascade_delete_destination: typing.Annotated[bool, FieldInfo(annotation=NoneType, required=False, default=False, description='Whether to also delete the destination connector associated with this connection.')] = False) -> str:
2115@mcp_tool(
2116    destructive=True,
2117    open_world=True,
2118    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2119)
2120def permanently_delete_cloud_connection(
2121    ctx: Context,
2122    connection_id: Annotated[
2123        str,
2124        Field(description="The ID of the connection to delete."),
2125    ],
2126    name: Annotated[
2127        str,
2128        Field(description="The expected name of the connection (for verification)."),
2129    ],
2130    *,
2131    cascade_delete_source: Annotated[
2132        bool,
2133        Field(
2134            description=(
2135                "Whether to also delete the source connector associated with this connection."
2136            ),
2137            default=False,
2138        ),
2139    ] = False,
2140    cascade_delete_destination: Annotated[
2141        bool,
2142        Field(
2143            description=(
2144                "Whether to also delete the destination connector associated with this connection."
2145            ),
2146            default=False,
2147        ),
2148    ] = False,
2149) -> str:
2150    """Permanently delete a connection from Airbyte Cloud.
2151
2152    IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
2153    (case insensitive).
2154
2155    If the connection does not meet this requirement, the deletion will be rejected with a
2156    helpful error message. Instruct the user to rename the connection appropriately to authorize
2157    the deletion.
2158
2159    The provided name must match the actual name of the connection for the operation to proceed.
2160    This is a safety measure to ensure you are deleting the correct resource.
2161    """
2162    check_guid_created_in_session(connection_id)
2163    workspace: CloudWorkspace = _get_cloud_workspace(ctx)
2164    connection = workspace.get_connection(connection_id=connection_id)
2165    actual_name: str = cast(str, connection.name)
2166
2167    # Verify the name matches
2168    if actual_name != name:
2169        raise PyAirbyteInputError(
2170            message=(
2171                f"Name mismatch: expected '{name}' but found '{actual_name}'. "
2172                "The provided name must exactly match the connection's actual name. "
2173                "This is a safety measure to prevent accidental deletion."
2174            ),
2175            context={
2176                "connection_id": connection_id,
2177                "expected_name": name,
2178                "actual_name": actual_name,
2179            },
2180        )
2181
2182    # Safe mode is hard-coded to True for extra protection when running in LLM agents
2183    workspace.permanently_delete_connection(
2184        safe_mode=True,  # Requires name-based delete disposition ("delete-me" or "deleteme")
2185        connection=connection_id,
2186        cascade_delete_source=cascade_delete_source,
2187        cascade_delete_destination=cascade_delete_destination,
2188    )
2189    return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"

Permanently delete a connection from Airbyte Cloud.

IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).

If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.

The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_source( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the source.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2192@mcp_tool(
2193    open_world=True,
2194    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2195)
2196def rename_cloud_source(
2197    ctx: Context,
2198    source_id: Annotated[
2199        str,
2200        Field(description="The ID of the deployed source to rename."),
2201    ],
2202    name: Annotated[
2203        str,
2204        Field(description="New name for the source."),
2205    ],
2206    *,
2207    workspace_id: Annotated[
2208        str | None,
2209        Field(
2210            description=WORKSPACE_ID_TIP_TEXT,
2211            default=None,
2212        ),
2213    ],
2214) -> str:
2215    """Rename a deployed source connector on Airbyte Cloud."""
2216    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2217    source = workspace.get_source(source_id=source_id)
2218    source.rename(name=name)
2219    return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"

Rename a deployed source connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_source_config( ctx: fastmcp.server.context.Context, source_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed source to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the source connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')] = None, *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2222@mcp_tool(
2223    destructive=True,
2224    open_world=True,
2225    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2226)
2227def update_cloud_source_config(
2228    ctx: Context,
2229    source_id: Annotated[
2230        str,
2231        Field(description="The ID of the deployed source to update."),
2232    ],
2233    config: Annotated[
2234        dict | str,
2235        Field(
2236            description="New configuration for the source connector.",
2237        ),
2238    ],
2239    config_secret_name: Annotated[
2240        str | None,
2241        Field(
2242            description="The name of the secret containing the configuration.",
2243            default=None,
2244        ),
2245    ] = None,
2246    *,
2247    workspace_id: Annotated[
2248        str | None,
2249        Field(
2250            description=WORKSPACE_ID_TIP_TEXT,
2251            default=None,
2252        ),
2253    ],
2254) -> str:
2255    """Update a deployed source connector's configuration on Airbyte Cloud.
2256
2257    This is a destructive operation that can break existing connections if the
2258    configuration is changed incorrectly. Use with caution.
2259    """
2260    check_guid_created_in_session(source_id)
2261    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2262    source = workspace.get_source(source_id=source_id)
2263
2264    config_dict = resolve_connector_config(
2265        config=config,
2266        config_secret_name=config_secret_name,
2267        config_spec_jsonschema=None,  # We don't have the spec here
2268    )
2269
2270    source.update_config(config=config_dict)
2271    return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"

Update a deployed source connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_destination( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2274@mcp_tool(
2275    open_world=True,
2276    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2277)
2278def rename_cloud_destination(
2279    ctx: Context,
2280    destination_id: Annotated[
2281        str,
2282        Field(description="The ID of the deployed destination to rename."),
2283    ],
2284    name: Annotated[
2285        str,
2286        Field(description="New name for the destination."),
2287    ],
2288    *,
2289    workspace_id: Annotated[
2290        str | None,
2291        Field(
2292            description=WORKSPACE_ID_TIP_TEXT,
2293            default=None,
2294        ),
2295    ],
2296) -> str:
2297    """Rename a deployed destination connector on Airbyte Cloud."""
2298    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2299    destination = workspace.get_destination(destination_id=destination_id)
2300    destination.rename(name=name)
2301    return (
2302        f"Successfully renamed destination '{destination_id}' to '{name}'. "
2303        f"URL: {destination.connector_url}"
2304    )

Rename a deployed destination connector on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_destination_config( ctx: fastmcp.server.context.Context, destination_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the deployed destination to update.')], config: typing.Annotated[dict | str, FieldInfo(annotation=NoneType, required=True, description='New configuration for the destination connector.')], config_secret_name: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='The name of the secret containing the configuration.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2307@mcp_tool(
2308    destructive=True,
2309    open_world=True,
2310    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2311)
2312def update_cloud_destination_config(
2313    ctx: Context,
2314    destination_id: Annotated[
2315        str,
2316        Field(description="The ID of the deployed destination to update."),
2317    ],
2318    config: Annotated[
2319        dict | str,
2320        Field(
2321            description="New configuration for the destination connector.",
2322        ),
2323    ],
2324    config_secret_name: Annotated[
2325        str | None,
2326        Field(
2327            description="The name of the secret containing the configuration.",
2328            default=None,
2329        ),
2330    ],
2331    *,
2332    workspace_id: Annotated[
2333        str | None,
2334        Field(
2335            description=WORKSPACE_ID_TIP_TEXT,
2336            default=None,
2337        ),
2338    ],
2339) -> str:
2340    """Update a deployed destination connector's configuration on Airbyte Cloud.
2341
2342    This is a destructive operation that can break existing connections if the
2343    configuration is changed incorrectly. Use with caution.
2344    """
2345    check_guid_created_in_session(destination_id)
2346    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2347    destination = workspace.get_destination(destination_id=destination_id)
2348
2349    config_dict = resolve_connector_config(
2350        config=config,
2351        config_secret_name=config_secret_name,
2352        config_spec_jsonschema=None,  # We don't have the spec here
2353    )
2354
2355    destination.update_config(config=config_dict)
2356    return (
2357        f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}"
2358    )

Update a deployed destination connector's configuration on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def rename_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to rename.')], name: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New name for the connection.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2361@mcp_tool(
2362    open_world=True,
2363    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2364)
2365def rename_cloud_connection(
2366    ctx: Context,
2367    connection_id: Annotated[
2368        str,
2369        Field(description="The ID of the connection to rename."),
2370    ],
2371    name: Annotated[
2372        str,
2373        Field(description="New name for the connection."),
2374    ],
2375    *,
2376    workspace_id: Annotated[
2377        str | None,
2378        Field(
2379            description=WORKSPACE_ID_TIP_TEXT,
2380            default=None,
2381        ),
2382    ],
2383) -> str:
2384    """Rename a connection on Airbyte Cloud."""
2385    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2386    connection = workspace.get_connection(connection_id=connection_id)
2387    connection.rename(name=name)
2388    return (
2389        f"Successfully renamed connection '{connection_id}' to '{name}'. "
2390        f"URL: {connection.connection_url}"
2391    )

Rename a connection on Airbyte Cloud.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_table_prefix( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], prefix: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='New table prefix to use when syncing to the destination.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2394@mcp_tool(
2395    destructive=True,
2396    open_world=True,
2397    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2398)
2399def set_cloud_connection_table_prefix(
2400    ctx: Context,
2401    connection_id: Annotated[
2402        str,
2403        Field(description="The ID of the connection to update."),
2404    ],
2405    prefix: Annotated[
2406        str,
2407        Field(description="New table prefix to use when syncing to the destination."),
2408    ],
2409    *,
2410    workspace_id: Annotated[
2411        str | None,
2412        Field(
2413            description=WORKSPACE_ID_TIP_TEXT,
2414            default=None,
2415        ),
2416    ],
2417) -> str:
2418    """Set the table prefix for a connection on Airbyte Cloud.
2419
2420    This is a destructive operation that can break downstream dependencies if the
2421    table prefix is changed incorrectly. Use with caution.
2422    """
2423    check_guid_created_in_session(connection_id)
2424    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2425    connection = workspace.get_connection(connection_id=connection_id)
2426    connection.set_table_prefix(prefix=prefix)
2427    return (
2428        f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. "
2429        f"URL: {connection.connection_url}"
2430    )

Set the table prefix for a connection on Airbyte Cloud.

This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(destructive=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def set_cloud_connection_selected_streams( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], stream_names: typing.Annotated[str | list[str], FieldInfo(annotation=NoneType, required=True, description='The selected stream names to sync within the connection. Must be an explicit stream name or list of streams.')], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2433@mcp_tool(
2434    destructive=True,
2435    open_world=True,
2436    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2437)
2438def set_cloud_connection_selected_streams(
2439    ctx: Context,
2440    connection_id: Annotated[
2441        str,
2442        Field(description="The ID of the connection to update."),
2443    ],
2444    stream_names: Annotated[
2445        str | list[str],
2446        Field(
2447            description=(
2448                "The selected stream names to sync within the connection. "
2449                "Must be an explicit stream name or list of streams."
2450            )
2451        ),
2452    ],
2453    *,
2454    workspace_id: Annotated[
2455        str | None,
2456        Field(
2457            description=WORKSPACE_ID_TIP_TEXT,
2458            default=None,
2459        ),
2460    ],
2461) -> str:
2462    """Set the selected streams for a connection on Airbyte Cloud.
2463
2464    This is a destructive operation that can break existing connections if the
2465    stream selection is changed incorrectly. Use with caution.
2466    """
2467    check_guid_created_in_session(connection_id)
2468    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2469    connection = workspace.get_connection(connection_id=connection_id)
2470
2471    resolved_streams_list: list[str] = resolve_list_of_strings(stream_names)
2472    connection.set_selected_streams(stream_names=resolved_streams_list)
2473
2474    return (
2475        f"Successfully set selected streams for connection '{connection_id}' "
2476        f"to {resolved_streams_list}. URL: {connection.connection_url}"
2477    )

Set the selected streams for a connection on Airbyte Cloud.

This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(open_world=True, destructive=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def update_cloud_connection( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the connection to update.')], *, enabled: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Set the connection's enabled status. True enables the connection (status='active'), False disables it (status='inactive'). Leave unset to keep the current status.")], cron_expression: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description="A cron expression defining when syncs should run. Examples: '0 0 * * *' (daily at midnight UTC), '0 */6 * * *' (every 6 hours), '0 0 * * 0' (weekly on Sunday at midnight UTC). Leave unset to keep the current schedule. Cannot be used together with 'manual_schedule'.")], manual_schedule: typing.Annotated[bool | None, FieldInfo(annotation=NoneType, required=False, default=None, description="Set to True to disable automatic syncs (manual scheduling only). Syncs will only run when manually triggered. Cannot be used together with 'cron_expression'.")], workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> str:
2480@mcp_tool(
2481    open_world=True,
2482    destructive=True,
2483    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2484)
2485def update_cloud_connection(
2486    ctx: Context,
2487    connection_id: Annotated[
2488        str,
2489        Field(description="The ID of the connection to update."),
2490    ],
2491    *,
2492    enabled: Annotated[
2493        bool | None,
2494        Field(
2495            description=(
2496                "Set the connection's enabled status. "
2497                "True enables the connection (status='active'), "
2498                "False disables it (status='inactive'). "
2499                "Leave unset to keep the current status."
2500            ),
2501            default=None,
2502        ),
2503    ],
2504    cron_expression: Annotated[
2505        str | None,
2506        Field(
2507            description=(
2508                "A cron expression defining when syncs should run. "
2509                "Examples: '0 0 * * *' (daily at midnight UTC), "
2510                "'0 */6 * * *' (every 6 hours), "
2511                "'0 0 * * 0' (weekly on Sunday at midnight UTC). "
2512                "Leave unset to keep the current schedule. "
2513                "Cannot be used together with 'manual_schedule'."
2514            ),
2515            default=None,
2516        ),
2517    ],
2518    manual_schedule: Annotated[
2519        bool | None,
2520        Field(
2521            description=(
2522                "Set to True to disable automatic syncs (manual scheduling only). "
2523                "Syncs will only run when manually triggered. "
2524                "Cannot be used together with 'cron_expression'."
2525            ),
2526            default=None,
2527        ),
2528    ],
2529    workspace_id: Annotated[
2530        str | None,
2531        Field(
2532            description=WORKSPACE_ID_TIP_TEXT,
2533            default=None,
2534        ),
2535    ],
2536) -> str:
2537    """Update a connection's settings on Airbyte Cloud.
2538
2539    This tool allows updating multiple connection settings in a single call:
2540    - Enable or disable the connection
2541    - Set a cron schedule for automatic syncs
2542    - Switch to manual scheduling (no automatic syncs)
2543
2544    At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
2545    parameters are mutually exclusive.
2546    """
2547    check_guid_created_in_session(connection_id)
2548
2549    # Validate that at least one setting is provided
2550    if enabled is None and cron_expression is None and manual_schedule is None:
2551        raise ValueError(
2552            "At least one setting must be provided: 'enabled', 'cron_expression', "
2553            "or 'manual_schedule'."
2554        )
2555
2556    # Validate mutually exclusive schedule options
2557    if cron_expression is not None and manual_schedule is True:
2558        raise ValueError(
2559            "Cannot specify both 'cron_expression' and 'manual_schedule=True'. "
2560            "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' "
2561            "for manual-only syncs."
2562        )
2563
2564    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2565    connection = workspace.get_connection(connection_id=connection_id)
2566
2567    changes_made: list[str] = []
2568
2569    # Apply enabled status change
2570    if enabled is not None:
2571        connection.set_enabled(enabled=enabled)
2572        status_str = "enabled" if enabled else "disabled"
2573        changes_made.append(f"status set to '{status_str}'")
2574
2575    # Apply schedule change
2576    if cron_expression is not None:
2577        connection.set_schedule(cron_expression=cron_expression)
2578        changes_made.append(f"schedule set to '{cron_expression}'")
2579    elif manual_schedule is True:
2580        connection.set_manual_schedule()
2581        changes_made.append("schedule set to 'manual'")
2582
2583    changes_summary = ", ".join(changes_made)
2584    return (
2585        f"Successfully updated connection '{connection_id}': {changes_summary}. "
2586        f"URL: {connection.connection_url}"
2587    )

Update a connection's settings on Airbyte Cloud.

This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)

At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

@mcp_tool(read_only=True, idempotent=True, open_world=True, extra_help_text=CLOUD_AUTH_TIP_TEXT)
def get_connection_artifact( ctx: fastmcp.server.context.Context, connection_id: typing.Annotated[str, FieldInfo(annotation=NoneType, required=True, description='The ID of the Airbyte Cloud connection.')], artifact_type: Annotated[Literal['state', 'catalog'], FieldInfo(annotation=NoneType, required=True, description="The type of artifact to retrieve: 'state' or 'catalog'.")], *, workspace_id: typing.Annotated[str | None, FieldInfo(annotation=NoneType, required=False, default=None, description='Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var.')]) -> dict[str, typing.Any]:
2590@mcp_tool(
2591    read_only=True,
2592    idempotent=True,
2593    open_world=True,
2594    extra_help_text=CLOUD_AUTH_TIP_TEXT,
2595)
2596def get_connection_artifact(
2597    ctx: Context,
2598    connection_id: Annotated[
2599        str,
2600        Field(description="The ID of the Airbyte Cloud connection."),
2601    ],
2602    artifact_type: Annotated[
2603        Literal["state", "catalog"],
2604        Field(description="The type of artifact to retrieve: 'state' or 'catalog'."),
2605    ],
2606    *,
2607    workspace_id: Annotated[
2608        str | None,
2609        Field(
2610            description=WORKSPACE_ID_TIP_TEXT,
2611            default=None,
2612        ),
2613    ],
2614) -> dict[str, Any]:
2615    """Get a connection artifact (state or catalog) from Airbyte Cloud.
2616
2617    Retrieves the specified artifact for a connection:
2618    - 'state': Returns the full raw connection state including stateType and all
2619      state data, or {"ERROR": "..."} if no state is set.
2620    - 'catalog': Returns the configured catalog (syncCatalog) as a dict,
2621      or {"ERROR": "..."} if not found.
2622    """
2623    workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
2624    connection = workspace.get_connection(connection_id=connection_id)
2625
2626    if artifact_type == "state":
2627        result = connection.dump_raw_state()
2628        if result.get("stateType") == "not_set":
2629            return {"ERROR": "No state is set for this connection (stateType: not_set)"}
2630        return result
2631
2632    # artifact_type == "catalog"
2633    result = connection.dump_raw_catalog()
2634    if result is None:
2635        return {"ERROR": "No catalog found for this connection"}
2636    return result

Get a connection artifact (state or catalog) from Airbyte Cloud.

Retrieves the specified artifact for a connection:
- 'state': Returns the full raw connection state including stateType and all
  state data, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
  or {"ERROR": "..."} if not found.

By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.

def register_cloud_tools(app: fastmcp.server.server.FastMCP) -> None:
2676def register_cloud_tools(app: FastMCP) -> None:
2677    """Register cloud tools with the FastMCP app.
2678
2679    Args:
2680        app: FastMCP application instance
2681    """
2682    exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None
2683    if exclude_args:
2684        _add_defaults_for_exclude_args(exclude_args)
2685    register_mcp_tools(
2686        app,
2687        mcp_module=__name__,
2688        exclude_args=exclude_args,
2689    )

Register cloud tools with the FastMCP app.

Arguments:
  • app: FastMCP application instance