airbyte.mcp.cloud_ops
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from pathlib import Path 5from typing import Annotated, Any, Literal, cast 6 7from fastmcp import FastMCP 8from pydantic import BaseModel, Field 9 10from airbyte import cloud, get_destination, get_source 11from airbyte._util import api_util 12from airbyte.cloud.connectors import CustomCloudSourceDefinition 13from airbyte.cloud.constants import FAILED_STATUSES 14from airbyte.cloud.workspaces import CloudWorkspace 15from airbyte.destinations.util import get_noop_destination 16from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 17from airbyte.mcp._tool_utils import ( 18 check_guid_created_in_session, 19 mcp_tool, 20 register_guid_created_in_session, 21 register_tools, 22) 23from airbyte.mcp._util import ( 24 resolve_cloud_credentials, 25 resolve_config, 26 resolve_list_of_strings, 27 resolve_workspace_id, 28) 29from airbyte.secrets import SecretString 30 31 32CLOUD_AUTH_TIP_TEXT = ( 33 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 34 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 35 "will be used to authenticate with the Airbyte Cloud API." 36) 37WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 38 39 40class CloudSourceResult(BaseModel): 41 """Information about a deployed source connector in Airbyte Cloud.""" 42 43 id: str 44 """The source ID.""" 45 name: str 46 """Display name of the source.""" 47 url: str 48 """Web URL for managing this source in Airbyte Cloud.""" 49 50 51class CloudDestinationResult(BaseModel): 52 """Information about a deployed destination connector in Airbyte Cloud.""" 53 54 id: str 55 """The destination ID.""" 56 name: str 57 """Display name of the destination.""" 58 url: str 59 """Web URL for managing this destination in Airbyte Cloud.""" 60 61 62class CloudConnectionResult(BaseModel): 63 """Information about a deployed connection in Airbyte Cloud.""" 64 65 id: str 66 """The connection ID.""" 67 name: str 68 """Display name of the connection.""" 69 url: str 70 """Web URL for managing this connection in Airbyte Cloud.""" 71 source_id: str 72 """ID of the source used by this connection.""" 73 destination_id: str 74 """ID of the destination used by this connection.""" 75 last_job_status: str | None = None 76 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 77 Only populated when with_connection_status=True.""" 78 last_job_id: int | None = None 79 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 80 last_job_time: str | None = None 81 """ISO 8601 timestamp of the most recent completed sync. 82 Only populated when with_connection_status=True.""" 83 currently_running_job_id: int | None = None 84 """Job ID of a currently running sync, if any. 85 Only populated when with_connection_status=True.""" 86 currently_running_job_start_time: str | None = None 87 """ISO 8601 timestamp of when the currently running sync started. 88 Only populated when with_connection_status=True.""" 89 90 91class CloudSourceDetails(BaseModel): 92 """Detailed information about a deployed source connector in Airbyte Cloud.""" 93 94 source_id: str 95 """The source ID.""" 96 source_name: str 97 """Display name of the source.""" 98 source_url: str 99 """Web URL for managing this source in Airbyte Cloud.""" 100 connector_definition_id: str 101 """The connector definition ID (e.g., the ID for 'source-postgres').""" 102 103 104class CloudDestinationDetails(BaseModel): 105 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 106 107 destination_id: str 108 """The destination ID.""" 109 destination_name: str 110 """Display name of the destination.""" 111 destination_url: str 112 """Web URL for managing this destination in Airbyte Cloud.""" 113 connector_definition_id: str 114 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 115 116 117class CloudConnectionDetails(BaseModel): 118 """Detailed information about a deployed connection in Airbyte Cloud.""" 119 120 connection_id: str 121 """The connection ID.""" 122 connection_name: str 123 """Display name of the connection.""" 124 connection_url: str 125 """Web URL for managing this connection in Airbyte Cloud.""" 126 source_id: str 127 """ID of the source used by this connection.""" 128 source_name: str 129 """Display name of the source.""" 130 destination_id: str 131 """ID of the destination used by this connection.""" 132 destination_name: str 133 """Display name of the destination.""" 134 selected_streams: list[str] 135 """List of stream names selected for syncing.""" 136 table_prefix: str | None 137 """Table prefix applied when syncing to the destination.""" 138 139 140class CloudOrganizationResult(BaseModel): 141 """Information about an organization in Airbyte Cloud.""" 142 143 id: str 144 """The organization ID.""" 145 name: str 146 """Display name of the organization.""" 147 email: str 148 """Email associated with the organization.""" 149 150 151class CloudWorkspaceResult(BaseModel): 152 """Information about a workspace in Airbyte Cloud.""" 153 154 workspace_id: str 155 """The workspace ID.""" 156 workspace_name: str 157 """Display name of the workspace.""" 158 workspace_url: str | None = None 159 """URL to access the workspace in Airbyte Cloud.""" 160 organization_id: str 161 """ID of the organization (requires ORGANIZATION_READER permission).""" 162 organization_name: str | None = None 163 """Name of the organization (requires ORGANIZATION_READER permission).""" 164 165 166class LogReadResult(BaseModel): 167 """Result of reading sync logs with pagination support.""" 168 169 job_id: int 170 """The job ID the logs belong to.""" 171 attempt_number: int 172 """The attempt number the logs belong to.""" 173 log_text: str 174 """The string containing the log text we are returning.""" 175 log_text_start_line: int 176 """1-based line index of the first line returned.""" 177 log_text_line_count: int 178 """Count of lines we are returning.""" 179 total_log_lines_available: int 180 """Total number of log lines available, shows if any lines were missed due to the limit.""" 181 182 183class SyncJobResult(BaseModel): 184 """Information about a sync job.""" 185 186 job_id: int 187 """The job ID.""" 188 status: str 189 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 190 bytes_synced: int 191 """Number of bytes synced in this job.""" 192 records_synced: int 193 """Number of records synced in this job.""" 194 start_time: str 195 """ISO 8601 timestamp of when the job started.""" 196 job_url: str 197 """URL to view the job in Airbyte Cloud.""" 198 199 200class SyncJobListResult(BaseModel): 201 """Result of listing sync jobs with pagination support.""" 202 203 jobs: list[SyncJobResult] 204 """List of sync jobs.""" 205 jobs_count: int 206 """Number of jobs returned in this response.""" 207 jobs_offset: int 208 """Offset used for this request (0 if not specified).""" 209 from_tail: bool 210 """Whether jobs are ordered newest-first (True) or oldest-first (False).""" 211 212 213def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: 214 """Get an authenticated CloudWorkspace. 215 216 Resolves credentials from multiple sources in order: 217 1. HTTP headers (when running as MCP server with HTTP/SSE transport) 218 2. Environment variables 219 220 Args: 221 workspace_id: Optional workspace ID. If not provided, uses HTTP headers 222 or the AIRBYTE_CLOUD_WORKSPACE_ID environment variable. 223 """ 224 credentials = resolve_cloud_credentials() 225 resolved_workspace_id = resolve_workspace_id(workspace_id) 226 227 return CloudWorkspace( 228 workspace_id=resolved_workspace_id, 229 client_id=credentials.client_id, 230 client_secret=credentials.client_secret, 231 bearer_token=credentials.bearer_token, 232 api_root=credentials.api_root, 233 ) 234 235 236@mcp_tool( 237 domain="cloud", 238 open_world=True, 239 extra_help_text=CLOUD_AUTH_TIP_TEXT, 240) 241def deploy_source_to_cloud( 242 source_name: Annotated[ 243 str, 244 Field(description="The name to use when deploying the source."), 245 ], 246 source_connector_name: Annotated[ 247 str, 248 Field(description="The name of the source connector (e.g., 'source-faker')."), 249 ], 250 *, 251 workspace_id: Annotated[ 252 str | None, 253 Field( 254 description=WORKSPACE_ID_TIP_TEXT, 255 default=None, 256 ), 257 ], 258 config: Annotated[ 259 dict | str | None, 260 Field( 261 description="The configuration for the source connector.", 262 default=None, 263 ), 264 ], 265 config_secret_name: Annotated[ 266 str | None, 267 Field( 268 description="The name of the secret containing the configuration.", 269 default=None, 270 ), 271 ], 272 unique: Annotated[ 273 bool, 274 Field( 275 description="Whether to require a unique name.", 276 default=True, 277 ), 278 ], 279) -> str: 280 """Deploy a source connector to Airbyte Cloud.""" 281 source = get_source( 282 source_connector_name, 283 no_executor=True, 284 ) 285 config_dict = resolve_config( 286 config=config, 287 config_secret_name=config_secret_name, 288 config_spec_jsonschema=source.config_spec, 289 ) 290 source.set_config(config_dict, validate=True) 291 292 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 293 deployed_source = workspace.deploy_source( 294 name=source_name, 295 source=source, 296 unique=unique, 297 ) 298 299 register_guid_created_in_session(deployed_source.connector_id) 300 return ( 301 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 302 f" and URL: {deployed_source.connector_url}" 303 ) 304 305 306@mcp_tool( 307 domain="cloud", 308 open_world=True, 309 extra_help_text=CLOUD_AUTH_TIP_TEXT, 310) 311def deploy_destination_to_cloud( 312 destination_name: Annotated[ 313 str, 314 Field(description="The name to use when deploying the destination."), 315 ], 316 destination_connector_name: Annotated[ 317 str, 318 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 319 ], 320 *, 321 workspace_id: Annotated[ 322 str | None, 323 Field( 324 description=WORKSPACE_ID_TIP_TEXT, 325 default=None, 326 ), 327 ], 328 config: Annotated[ 329 dict | str | None, 330 Field( 331 description="The configuration for the destination connector.", 332 default=None, 333 ), 334 ], 335 config_secret_name: Annotated[ 336 str | None, 337 Field( 338 description="The name of the secret containing the configuration.", 339 default=None, 340 ), 341 ], 342 unique: Annotated[ 343 bool, 344 Field( 345 description="Whether to require a unique name.", 346 default=True, 347 ), 348 ], 349) -> str: 350 """Deploy a destination connector to Airbyte Cloud.""" 351 destination = get_destination( 352 destination_connector_name, 353 no_executor=True, 354 ) 355 config_dict = resolve_config( 356 config=config, 357 config_secret_name=config_secret_name, 358 config_spec_jsonschema=destination.config_spec, 359 ) 360 destination.set_config(config_dict, validate=True) 361 362 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 363 deployed_destination = workspace.deploy_destination( 364 name=destination_name, 365 destination=destination, 366 unique=unique, 367 ) 368 369 register_guid_created_in_session(deployed_destination.connector_id) 370 return ( 371 f"Successfully deployed destination '{destination_name}' " 372 f"with ID: {deployed_destination.connector_id}" 373 ) 374 375 376@mcp_tool( 377 domain="cloud", 378 open_world=True, 379 extra_help_text=CLOUD_AUTH_TIP_TEXT, 380) 381def create_connection_on_cloud( 382 connection_name: Annotated[ 383 str, 384 Field(description="The name of the connection."), 385 ], 386 source_id: Annotated[ 387 str, 388 Field(description="The ID of the deployed source."), 389 ], 390 destination_id: Annotated[ 391 str, 392 Field(description="The ID of the deployed destination."), 393 ], 394 selected_streams: Annotated[ 395 str | list[str], 396 Field( 397 description=( 398 "The selected stream names to sync within the connection. " 399 "Must be an explicit stream name or list of streams. " 400 "Cannot be empty or '*'." 401 ) 402 ), 403 ], 404 *, 405 workspace_id: Annotated[ 406 str | None, 407 Field( 408 description=WORKSPACE_ID_TIP_TEXT, 409 default=None, 410 ), 411 ], 412 table_prefix: Annotated[ 413 str | None, 414 Field( 415 description="Optional table prefix to use when syncing to the destination.", 416 default=None, 417 ), 418 ], 419) -> str: 420 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 421 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 422 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 423 deployed_connection = workspace.deploy_connection( 424 connection_name=connection_name, 425 source=source_id, 426 destination=destination_id, 427 selected_streams=resolved_streams_list, 428 table_prefix=table_prefix, 429 ) 430 431 register_guid_created_in_session(deployed_connection.connection_id) 432 return ( 433 f"Successfully created connection '{connection_name}' " 434 f"with ID '{deployed_connection.connection_id}' and " 435 f"URL: {deployed_connection.connection_url}" 436 ) 437 438 439@mcp_tool( 440 domain="cloud", 441 open_world=True, 442 extra_help_text=CLOUD_AUTH_TIP_TEXT, 443) 444def run_cloud_sync( 445 connection_id: Annotated[ 446 str, 447 Field(description="The ID of the Airbyte Cloud connection."), 448 ], 449 *, 450 workspace_id: Annotated[ 451 str | None, 452 Field( 453 description=WORKSPACE_ID_TIP_TEXT, 454 default=None, 455 ), 456 ], 457 wait: Annotated[ 458 bool, 459 Field( 460 description=( 461 "Whether to wait for the sync to complete. Since a sync can take between several " 462 "minutes and several hours, this option is not recommended for most " 463 "scenarios." 464 ), 465 default=False, 466 ), 467 ], 468 wait_timeout: Annotated[ 469 int, 470 Field( 471 description="Maximum time to wait for sync completion (seconds).", 472 default=300, 473 ), 474 ], 475) -> str: 476 """Run a sync job on Airbyte Cloud.""" 477 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 478 connection = workspace.get_connection(connection_id=connection_id) 479 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 480 481 if wait: 482 status = sync_result.get_job_status() 483 return ( 484 f"Sync completed with status: {status}. " 485 f"Job ID is '{sync_result.job_id}' and " 486 f"job URL is: {sync_result.job_url}" 487 ) 488 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 489 490 491@mcp_tool( 492 domain="cloud", 493 read_only=True, 494 idempotent=True, 495 open_world=True, 496 extra_help_text=CLOUD_AUTH_TIP_TEXT, 497) 498def check_airbyte_cloud_workspace( 499 *, 500 workspace_id: Annotated[ 501 str | None, 502 Field( 503 description=WORKSPACE_ID_TIP_TEXT, 504 default=None, 505 ), 506 ], 507) -> CloudWorkspaceResult: 508 """Check if we have a valid Airbyte Cloud connection and return workspace info. 509 510 Returns workspace details including workspace ID, name, and organization info. 511 """ 512 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 513 514 # Get workspace details from the public API using workspace's credentials 515 workspace_response = api_util.get_workspace( 516 workspace_id=workspace.workspace_id, 517 api_root=workspace.api_root, 518 client_id=workspace.client_id, 519 client_secret=workspace.client_secret, 520 bearer_token=workspace.bearer_token, 521 ) 522 523 # Try to get organization info, but fail gracefully if we don't have permissions. 524 # Fetching organization info requires ORGANIZATION_READER permissions on the organization, 525 # which may not be available with workspace-scoped credentials. 526 organization = workspace.get_organization(raise_on_error=False) 527 528 return CloudWorkspaceResult( 529 workspace_id=workspace_response.workspace_id, 530 workspace_name=workspace_response.name, 531 workspace_url=workspace.workspace_url, 532 organization_id=( 533 organization.organization_id 534 if organization 535 else "[unavailable - requires ORGANIZATION_READER permission]" 536 ), 537 organization_name=organization.organization_name if organization else None, 538 ) 539 540 541@mcp_tool( 542 domain="cloud", 543 open_world=True, 544 extra_help_text=CLOUD_AUTH_TIP_TEXT, 545) 546def deploy_noop_destination_to_cloud( 547 name: str = "No-op Destination", 548 *, 549 workspace_id: Annotated[ 550 str | None, 551 Field( 552 description=WORKSPACE_ID_TIP_TEXT, 553 default=None, 554 ), 555 ], 556 unique: bool = True, 557) -> str: 558 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 559 destination = get_noop_destination() 560 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 561 deployed_destination = workspace.deploy_destination( 562 name=name, 563 destination=destination, 564 unique=unique, 565 ) 566 register_guid_created_in_session(deployed_destination.connector_id) 567 return ( 568 f"Successfully deployed No-op Destination " 569 f"with ID '{deployed_destination.connector_id}' and " 570 f"URL: {deployed_destination.connector_url}" 571 ) 572 573 574@mcp_tool( 575 domain="cloud", 576 read_only=True, 577 idempotent=True, 578 open_world=True, 579 extra_help_text=CLOUD_AUTH_TIP_TEXT, 580) 581def get_cloud_sync_status( 582 connection_id: Annotated[ 583 str, 584 Field( 585 description="The ID of the Airbyte Cloud connection.", 586 ), 587 ], 588 job_id: Annotated[ 589 int | None, 590 Field( 591 description="Optional job ID. If not provided, the latest job will be used.", 592 default=None, 593 ), 594 ], 595 *, 596 workspace_id: Annotated[ 597 str | None, 598 Field( 599 description=WORKSPACE_ID_TIP_TEXT, 600 default=None, 601 ), 602 ], 603 include_attempts: Annotated[ 604 bool, 605 Field( 606 description="Whether to include detailed attempts information.", 607 default=False, 608 ), 609 ], 610) -> dict[str, Any]: 611 """Get the status of a sync job from the Airbyte Cloud.""" 612 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 613 connection = workspace.get_connection(connection_id=connection_id) 614 615 # If a job ID is provided, get the job by ID. 616 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 617 618 if not sync_result: 619 return {"status": None, "job_id": None, "attempts": []} 620 621 result = { 622 "status": sync_result.get_job_status(), 623 "job_id": sync_result.job_id, 624 "bytes_synced": sync_result.bytes_synced, 625 "records_synced": sync_result.records_synced, 626 "start_time": sync_result.start_time.isoformat(), 627 "job_url": sync_result.job_url, 628 "attempts": [], 629 } 630 631 if include_attempts: 632 attempts = sync_result.get_attempts() 633 result["attempts"] = [ 634 { 635 "attempt_number": attempt.attempt_number, 636 "attempt_id": attempt.attempt_id, 637 "status": attempt.status, 638 "bytes_synced": attempt.bytes_synced, 639 "records_synced": attempt.records_synced, 640 "created_at": attempt.created_at.isoformat(), 641 } 642 for attempt in attempts 643 ] 644 645 return result 646 647 648@mcp_tool( 649 domain="cloud", 650 read_only=True, 651 idempotent=True, 652 open_world=True, 653 extra_help_text=CLOUD_AUTH_TIP_TEXT, 654) 655def list_cloud_sync_jobs( 656 connection_id: Annotated[ 657 str, 658 Field(description="The ID of the Airbyte Cloud connection."), 659 ], 660 *, 661 workspace_id: Annotated[ 662 str | None, 663 Field( 664 description=WORKSPACE_ID_TIP_TEXT, 665 default=None, 666 ), 667 ], 668 max_jobs: Annotated[ 669 int, 670 Field( 671 description=( 672 "Maximum number of jobs to return. " 673 "Defaults to 20 if not specified. " 674 "Maximum allowed value is 500." 675 ), 676 default=20, 677 ), 678 ], 679 from_tail: Annotated[ 680 bool | None, 681 Field( 682 description=( 683 "When True, jobs are ordered newest-first (createdAt DESC). " 684 "When False, jobs are ordered oldest-first (createdAt ASC). " 685 "Defaults to True if `jobs_offset` is not specified. " 686 "Cannot combine `from_tail=True` with `jobs_offset`." 687 ), 688 default=None, 689 ), 690 ], 691 jobs_offset: Annotated[ 692 int | None, 693 Field( 694 description=( 695 "Number of jobs to skip from the beginning. " 696 "Cannot be combined with `from_tail=True`." 697 ), 698 default=None, 699 ), 700 ], 701) -> SyncJobListResult: 702 """List sync jobs for a connection with pagination support. 703 704 This tool allows you to retrieve a list of sync jobs for a connection, 705 with control over ordering and pagination. By default, jobs are returned 706 newest-first (from_tail=True). 707 """ 708 # Validate that jobs_offset and from_tail are not both set 709 if jobs_offset is not None and from_tail is True: 710 raise PyAirbyteInputError( 711 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 712 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 713 ) 714 715 # Default to from_tail=True if neither is specified 716 if from_tail is None and jobs_offset is None: 717 from_tail = True 718 elif from_tail is None: 719 from_tail = False 720 721 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 722 connection = workspace.get_connection(connection_id=connection_id) 723 724 # Cap at 500 to avoid overloading agent context 725 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 726 727 sync_results = connection.get_previous_sync_logs( 728 limit=effective_limit, 729 offset=jobs_offset, 730 from_tail=from_tail, 731 ) 732 733 jobs = [ 734 SyncJobResult( 735 job_id=sync_result.job_id, 736 status=str(sync_result.get_job_status()), 737 bytes_synced=sync_result.bytes_synced, 738 records_synced=sync_result.records_synced, 739 start_time=sync_result.start_time.isoformat(), 740 job_url=sync_result.job_url, 741 ) 742 for sync_result in sync_results 743 ] 744 745 return SyncJobListResult( 746 jobs=jobs, 747 jobs_count=len(jobs), 748 jobs_offset=jobs_offset or 0, 749 from_tail=from_tail, 750 ) 751 752 753@mcp_tool( 754 domain="cloud", 755 read_only=True, 756 idempotent=True, 757 open_world=True, 758 extra_help_text=CLOUD_AUTH_TIP_TEXT, 759) 760def list_deployed_cloud_source_connectors( 761 *, 762 workspace_id: Annotated[ 763 str | None, 764 Field( 765 description=WORKSPACE_ID_TIP_TEXT, 766 default=None, 767 ), 768 ], 769 name_contains: Annotated[ 770 str | None, 771 Field( 772 description="Optional case-insensitive substring to filter sources by name", 773 default=None, 774 ), 775 ], 776 max_items_limit: Annotated[ 777 int | None, 778 Field( 779 description="Optional maximum number of items to return (default: no limit)", 780 default=None, 781 ), 782 ], 783) -> list[CloudSourceResult]: 784 """List all deployed source connectors in the Airbyte Cloud workspace.""" 785 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 786 sources = workspace.list_sources() 787 788 # Filter by name if requested 789 if name_contains: 790 needle = name_contains.lower() 791 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 792 793 # Apply limit if requested 794 if max_items_limit is not None: 795 sources = sources[:max_items_limit] 796 797 # Note: name and url are guaranteed non-null from list API responses 798 return [ 799 CloudSourceResult( 800 id=source.source_id, 801 name=cast(str, source.name), 802 url=cast(str, source.connector_url), 803 ) 804 for source in sources 805 ] 806 807 808@mcp_tool( 809 domain="cloud", 810 read_only=True, 811 idempotent=True, 812 open_world=True, 813 extra_help_text=CLOUD_AUTH_TIP_TEXT, 814) 815def list_deployed_cloud_destination_connectors( 816 *, 817 workspace_id: Annotated[ 818 str | None, 819 Field( 820 description=WORKSPACE_ID_TIP_TEXT, 821 default=None, 822 ), 823 ], 824 name_contains: Annotated[ 825 str | None, 826 Field( 827 description="Optional case-insensitive substring to filter destinations by name", 828 default=None, 829 ), 830 ], 831 max_items_limit: Annotated[ 832 int | None, 833 Field( 834 description="Optional maximum number of items to return (default: no limit)", 835 default=None, 836 ), 837 ], 838) -> list[CloudDestinationResult]: 839 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 840 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 841 destinations = workspace.list_destinations() 842 843 # Filter by name if requested 844 if name_contains: 845 needle = name_contains.lower() 846 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 847 848 # Apply limit if requested 849 if max_items_limit is not None: 850 destinations = destinations[:max_items_limit] 851 852 # Note: name and url are guaranteed non-null from list API responses 853 return [ 854 CloudDestinationResult( 855 id=destination.destination_id, 856 name=cast(str, destination.name), 857 url=cast(str, destination.connector_url), 858 ) 859 for destination in destinations 860 ] 861 862 863@mcp_tool( 864 domain="cloud", 865 read_only=True, 866 idempotent=True, 867 open_world=True, 868 extra_help_text=CLOUD_AUTH_TIP_TEXT, 869) 870def describe_cloud_source( 871 source_id: Annotated[ 872 str, 873 Field(description="The ID of the source to describe."), 874 ], 875 *, 876 workspace_id: Annotated[ 877 str | None, 878 Field( 879 description=WORKSPACE_ID_TIP_TEXT, 880 default=None, 881 ), 882 ], 883) -> CloudSourceDetails: 884 """Get detailed information about a specific deployed source connector.""" 885 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 886 source = workspace.get_source(source_id=source_id) 887 888 # Access name property to ensure _connector_info is populated 889 source_name = cast(str, source.name) 890 891 return CloudSourceDetails( 892 source_id=source.source_id, 893 source_name=source_name, 894 source_url=source.connector_url, 895 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 896 ) 897 898 899@mcp_tool( 900 domain="cloud", 901 read_only=True, 902 idempotent=True, 903 open_world=True, 904 extra_help_text=CLOUD_AUTH_TIP_TEXT, 905) 906def describe_cloud_destination( 907 destination_id: Annotated[ 908 str, 909 Field(description="The ID of the destination to describe."), 910 ], 911 *, 912 workspace_id: Annotated[ 913 str | None, 914 Field( 915 description=WORKSPACE_ID_TIP_TEXT, 916 default=None, 917 ), 918 ], 919) -> CloudDestinationDetails: 920 """Get detailed information about a specific deployed destination connector.""" 921 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 922 destination = workspace.get_destination(destination_id=destination_id) 923 924 # Access name property to ensure _connector_info is populated 925 destination_name = cast(str, destination.name) 926 927 return CloudDestinationDetails( 928 destination_id=destination.destination_id, 929 destination_name=destination_name, 930 destination_url=destination.connector_url, 931 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 932 ) 933 934 935@mcp_tool( 936 domain="cloud", 937 read_only=True, 938 idempotent=True, 939 open_world=True, 940 extra_help_text=CLOUD_AUTH_TIP_TEXT, 941) 942def describe_cloud_connection( 943 connection_id: Annotated[ 944 str, 945 Field(description="The ID of the connection to describe."), 946 ], 947 *, 948 workspace_id: Annotated[ 949 str | None, 950 Field( 951 description=WORKSPACE_ID_TIP_TEXT, 952 default=None, 953 ), 954 ], 955) -> CloudConnectionDetails: 956 """Get detailed information about a specific deployed connection.""" 957 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 958 connection = workspace.get_connection(connection_id=connection_id) 959 960 return CloudConnectionDetails( 961 connection_id=connection.connection_id, 962 connection_name=cast(str, connection.name), 963 connection_url=cast(str, connection.connection_url), 964 source_id=connection.source_id, 965 source_name=cast(str, connection.source.name), 966 destination_id=connection.destination_id, 967 destination_name=cast(str, connection.destination.name), 968 selected_streams=connection.stream_names, 969 table_prefix=connection.table_prefix, 970 ) 971 972 973@mcp_tool( 974 domain="cloud", 975 read_only=True, 976 idempotent=True, 977 open_world=True, 978 extra_help_text=CLOUD_AUTH_TIP_TEXT, 979) 980def get_cloud_sync_logs( 981 connection_id: Annotated[ 982 str, 983 Field(description="The ID of the Airbyte Cloud connection."), 984 ], 985 job_id: Annotated[ 986 int | None, 987 Field(description="Optional job ID. If not provided, the latest job will be used."), 988 ] = None, 989 attempt_number: Annotated[ 990 int | None, 991 Field( 992 description="Optional attempt number. If not provided, the latest attempt will be used." 993 ), 994 ] = None, 995 *, 996 workspace_id: Annotated[ 997 str | None, 998 Field( 999 description=WORKSPACE_ID_TIP_TEXT, 1000 default=None, 1001 ), 1002 ], 1003 max_lines: Annotated[ 1004 int, 1005 Field( 1006 description=( 1007 "Maximum number of lines to return. " 1008 "Defaults to 4000 if not specified. " 1009 "If '0' is provided, no limit is applied." 1010 ), 1011 default=4000, 1012 ), 1013 ], 1014 from_tail: Annotated[ 1015 bool | None, 1016 Field( 1017 description=( 1018 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1019 "Defaults to True if `line_offset` is not specified. " 1020 "Cannot combine `from_tail=True` with `line_offset`." 1021 ), 1022 default=None, 1023 ), 1024 ], 1025 line_offset: Annotated[ 1026 int | None, 1027 Field( 1028 description=( 1029 "Number of lines to skip from the beginning of the logs. " 1030 "Cannot be combined with `from_tail=True`." 1031 ), 1032 default=None, 1033 ), 1034 ], 1035) -> LogReadResult: 1036 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1037 # Validate that line_offset and from_tail are not both set 1038 if line_offset is not None and from_tail: 1039 raise PyAirbyteInputError( 1040 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1041 context={"line_offset": line_offset, "from_tail": from_tail}, 1042 ) 1043 1044 if from_tail is None and line_offset is None: 1045 from_tail = True 1046 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1047 connection = workspace.get_connection(connection_id=connection_id) 1048 1049 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1050 1051 if not sync_result: 1052 raise AirbyteMissingResourceError( 1053 resource_type="sync job", 1054 resource_name_or_id=connection_id, 1055 ) 1056 1057 attempts = sync_result.get_attempts() 1058 1059 if not attempts: 1060 raise AirbyteMissingResourceError( 1061 resource_type="sync attempt", 1062 resource_name_or_id=str(sync_result.job_id), 1063 ) 1064 1065 if attempt_number is not None: 1066 target_attempt = None 1067 for attempt in attempts: 1068 if attempt.attempt_number == attempt_number: 1069 target_attempt = attempt 1070 break 1071 1072 if target_attempt is None: 1073 raise AirbyteMissingResourceError( 1074 resource_type="sync attempt", 1075 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1076 ) 1077 else: 1078 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1079 1080 logs = target_attempt.get_full_log_text() 1081 1082 if not logs: 1083 # Return empty result with zero lines 1084 return LogReadResult( 1085 log_text=( 1086 f"[No logs available for job '{sync_result.job_id}', " 1087 f"attempt {target_attempt.attempt_number}.]" 1088 ), 1089 log_text_start_line=1, 1090 log_text_line_count=0, 1091 total_log_lines_available=0, 1092 job_id=sync_result.job_id, 1093 attempt_number=target_attempt.attempt_number, 1094 ) 1095 1096 # Apply line limiting 1097 log_lines = logs.splitlines() 1098 total_lines = len(log_lines) 1099 1100 # Determine effective max_lines (0 means no limit) 1101 effective_max = total_lines if max_lines == 0 else max_lines 1102 1103 # Calculate start_index and slice based on from_tail or line_offset 1104 if from_tail: 1105 start_index = max(0, total_lines - effective_max) 1106 selected_lines = log_lines[start_index:][:effective_max] 1107 else: 1108 start_index = line_offset or 0 1109 selected_lines = log_lines[start_index : start_index + effective_max] 1110 1111 return LogReadResult( 1112 log_text="\n".join(selected_lines), 1113 log_text_start_line=start_index + 1, # Convert to 1-based index 1114 log_text_line_count=len(selected_lines), 1115 total_log_lines_available=total_lines, 1116 job_id=sync_result.job_id, 1117 attempt_number=target_attempt.attempt_number, 1118 ) 1119 1120 1121@mcp_tool( 1122 domain="cloud", 1123 read_only=True, 1124 idempotent=True, 1125 open_world=True, 1126 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1127) 1128def list_deployed_cloud_connections( 1129 *, 1130 workspace_id: Annotated[ 1131 str | None, 1132 Field( 1133 description=WORKSPACE_ID_TIP_TEXT, 1134 default=None, 1135 ), 1136 ], 1137 name_contains: Annotated[ 1138 str | None, 1139 Field( 1140 description="Optional case-insensitive substring to filter connections by name", 1141 default=None, 1142 ), 1143 ], 1144 max_items_limit: Annotated[ 1145 int | None, 1146 Field( 1147 description="Optional maximum number of items to return (default: no limit)", 1148 default=None, 1149 ), 1150 ], 1151 with_connection_status: Annotated[ 1152 bool | None, 1153 Field( 1154 description="If True, include status info for each connection's most recent sync job", 1155 default=False, 1156 ), 1157 ], 1158 failing_connections_only: Annotated[ 1159 bool | None, 1160 Field( 1161 description="If True, only return connections with failed/cancelled last sync", 1162 default=False, 1163 ), 1164 ], 1165) -> list[CloudConnectionResult]: 1166 """List all deployed connections in the Airbyte Cloud workspace. 1167 1168 When with_connection_status is True, each connection result will include 1169 information about the most recent sync job status, skipping over any 1170 currently in-progress syncs to find the last completed job. 1171 1172 When failing_connections_only is True, only connections where the most 1173 recent completed sync job failed or was cancelled will be returned. 1174 This implicitly enables with_connection_status. 1175 """ 1176 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1177 connections = workspace.list_connections() 1178 1179 # Filter by name if requested 1180 if name_contains: 1181 needle = name_contains.lower() 1182 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1183 1184 # If failing_connections_only is True, implicitly enable with_connection_status 1185 if failing_connections_only: 1186 with_connection_status = True 1187 1188 results: list[CloudConnectionResult] = [] 1189 1190 for connection in connections: 1191 last_job_status: str | None = None 1192 last_job_id: int | None = None 1193 last_job_time: str | None = None 1194 currently_running_job_id: int | None = None 1195 currently_running_job_start_time: str | None = None 1196 1197 if with_connection_status: 1198 sync_logs = connection.get_previous_sync_logs(limit=5) 1199 last_completed_job_status = None # Keep enum for comparison 1200 1201 for sync_result in sync_logs: 1202 job_status = sync_result.get_job_status() 1203 1204 if not sync_result.is_job_complete(): 1205 currently_running_job_id = sync_result.job_id 1206 currently_running_job_start_time = sync_result.start_time.isoformat() 1207 continue 1208 1209 last_completed_job_status = job_status 1210 last_job_status = str(job_status.value) if job_status else None 1211 last_job_id = sync_result.job_id 1212 last_job_time = sync_result.start_time.isoformat() 1213 break 1214 1215 if failing_connections_only and ( 1216 last_completed_job_status is None 1217 or last_completed_job_status not in FAILED_STATUSES 1218 ): 1219 continue 1220 1221 results.append( 1222 CloudConnectionResult( 1223 id=connection.connection_id, 1224 name=cast(str, connection.name), 1225 url=cast(str, connection.connection_url), 1226 source_id=connection.source_id, 1227 destination_id=connection.destination_id, 1228 last_job_status=last_job_status, 1229 last_job_id=last_job_id, 1230 last_job_time=last_job_time, 1231 currently_running_job_id=currently_running_job_id, 1232 currently_running_job_start_time=currently_running_job_start_time, 1233 ) 1234 ) 1235 1236 if max_items_limit is not None and len(results) >= max_items_limit: 1237 break 1238 1239 return results 1240 1241 1242def _resolve_organization( 1243 organization_id: str | None, 1244 organization_name: str | None, 1245 *, 1246 api_root: str, 1247 client_id: SecretString | None, 1248 client_secret: SecretString | None, 1249 bearer_token: SecretString | None = None, 1250) -> api_util.models.OrganizationResponse: 1251 """Resolve organization from either ID or exact name match. 1252 1253 Args: 1254 organization_id: The organization ID (if provided directly) 1255 organization_name: The organization name (exact match required) 1256 api_root: The API root URL 1257 client_id: OAuth client ID (optional if bearer_token is provided) 1258 client_secret: OAuth client secret (optional if bearer_token is provided) 1259 bearer_token: Bearer token for authentication (optional if client credentials provided) 1260 1261 Returns: 1262 The resolved OrganizationResponse object 1263 1264 Raises: 1265 PyAirbyteInputError: If neither or both parameters are provided, 1266 or if no organization matches the exact name 1267 AirbyteMissingResourceError: If the organization is not found 1268 """ 1269 if organization_id and organization_name: 1270 raise PyAirbyteInputError( 1271 message="Provide either 'organization_id' or 'organization_name', not both." 1272 ) 1273 if not organization_id and not organization_name: 1274 raise PyAirbyteInputError( 1275 message="Either 'organization_id' or 'organization_name' must be provided." 1276 ) 1277 1278 # Get all organizations for the user 1279 orgs = api_util.list_organizations_for_user( 1280 api_root=api_root, 1281 client_id=client_id, 1282 client_secret=client_secret, 1283 bearer_token=bearer_token, 1284 ) 1285 1286 if organization_id: 1287 # Find by ID 1288 matching_orgs = [org for org in orgs if org.organization_id == organization_id] 1289 if not matching_orgs: 1290 raise AirbyteMissingResourceError( 1291 resource_type="organization", 1292 context={ 1293 "organization_id": organization_id, 1294 "message": f"No organization found with ID '{organization_id}' " 1295 "for the current user.", 1296 }, 1297 ) 1298 return matching_orgs[0] 1299 1300 # Find by exact name match (case-sensitive) 1301 matching_orgs = [org for org in orgs if org.organization_name == organization_name] 1302 1303 if not matching_orgs: 1304 raise AirbyteMissingResourceError( 1305 resource_type="organization", 1306 context={ 1307 "organization_name": organization_name, 1308 "message": f"No organization found with exact name '{organization_name}' " 1309 "for the current user.", 1310 }, 1311 ) 1312 1313 if len(matching_orgs) > 1: 1314 raise PyAirbyteInputError( 1315 message=f"Multiple organizations found with name '{organization_name}'. " 1316 "Please use 'organization_id' instead to specify the exact organization." 1317 ) 1318 1319 return matching_orgs[0] 1320 1321 1322def _resolve_organization_id( 1323 organization_id: str | None, 1324 organization_name: str | None, 1325 *, 1326 api_root: str, 1327 client_id: SecretString | None, 1328 client_secret: SecretString | None, 1329 bearer_token: SecretString | None = None, 1330) -> str: 1331 """Resolve organization ID from either ID or exact name match. 1332 1333 This is a convenience wrapper around _resolve_organization that returns just the ID. 1334 """ 1335 org = _resolve_organization( 1336 organization_id=organization_id, 1337 organization_name=organization_name, 1338 api_root=api_root, 1339 client_id=client_id, 1340 client_secret=client_secret, 1341 bearer_token=bearer_token, 1342 ) 1343 return org.organization_id 1344 1345 1346@mcp_tool( 1347 domain="cloud", 1348 read_only=True, 1349 idempotent=True, 1350 open_world=True, 1351 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1352) 1353def list_cloud_workspaces( 1354 *, 1355 organization_id: Annotated[ 1356 str | None, 1357 Field( 1358 description="Organization ID. Required if organization_name is not provided.", 1359 default=None, 1360 ), 1361 ], 1362 organization_name: Annotated[ 1363 str | None, 1364 Field( 1365 description=( 1366 "Organization name (exact match). " "Required if organization_id is not provided." 1367 ), 1368 default=None, 1369 ), 1370 ], 1371 name_contains: Annotated[ 1372 str | None, 1373 Field( 1374 description="Optional substring to filter workspaces by name (server-side filtering)", 1375 default=None, 1376 ), 1377 ], 1378 max_items_limit: Annotated[ 1379 int | None, 1380 Field( 1381 description="Optional maximum number of items to return (default: no limit)", 1382 default=None, 1383 ), 1384 ], 1385) -> list[CloudWorkspaceResult]: 1386 """List all workspaces in a specific organization. 1387 1388 Requires either organization_id OR organization_name (exact match) to be provided. 1389 This tool will NOT list workspaces across all organizations - you must specify 1390 which organization to list workspaces from. 1391 """ 1392 credentials = resolve_cloud_credentials() 1393 1394 resolved_org_id = _resolve_organization_id( 1395 organization_id=organization_id, 1396 organization_name=organization_name, 1397 api_root=credentials.api_root, 1398 client_id=credentials.client_id, 1399 client_secret=credentials.client_secret, 1400 bearer_token=credentials.bearer_token, 1401 ) 1402 1403 workspaces = api_util.list_workspaces_in_organization( 1404 organization_id=resolved_org_id, 1405 api_root=credentials.api_root, 1406 client_id=credentials.client_id, 1407 client_secret=credentials.client_secret, 1408 bearer_token=credentials.bearer_token, 1409 name_contains=name_contains, 1410 max_items_limit=max_items_limit, 1411 ) 1412 1413 return [ 1414 CloudWorkspaceResult( 1415 workspace_id=ws.get("workspaceId", ""), 1416 workspace_name=ws.get("name", ""), 1417 organization_id=ws.get("organizationId", ""), 1418 ) 1419 for ws in workspaces 1420 ] 1421 1422 1423@mcp_tool( 1424 domain="cloud", 1425 read_only=True, 1426 idempotent=True, 1427 open_world=True, 1428 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1429) 1430def describe_cloud_organization( 1431 *, 1432 organization_id: Annotated[ 1433 str | None, 1434 Field( 1435 description="Organization ID. Required if organization_name is not provided.", 1436 default=None, 1437 ), 1438 ], 1439 organization_name: Annotated[ 1440 str | None, 1441 Field( 1442 description=( 1443 "Organization name (exact match). " "Required if organization_id is not provided." 1444 ), 1445 default=None, 1446 ), 1447 ], 1448) -> CloudOrganizationResult: 1449 """Get details about a specific organization. 1450 1451 Requires either organization_id OR organization_name (exact match) to be provided. 1452 This tool is useful for looking up an organization's ID from its name, or vice versa. 1453 """ 1454 credentials = resolve_cloud_credentials() 1455 1456 org = _resolve_organization( 1457 organization_id=organization_id, 1458 organization_name=organization_name, 1459 api_root=credentials.api_root, 1460 client_id=credentials.client_id, 1461 client_secret=credentials.client_secret, 1462 bearer_token=credentials.bearer_token, 1463 ) 1464 1465 return CloudOrganizationResult( 1466 id=org.organization_id, 1467 name=org.organization_name, 1468 email=org.email, 1469 ) 1470 1471 1472def _get_custom_source_definition_description( 1473 custom_source: CustomCloudSourceDefinition, 1474) -> str: 1475 return "\n".join( 1476 [ 1477 f" - Custom Source Name: {custom_source.name}", 1478 f" - Definition ID: {custom_source.definition_id}", 1479 f" - Definition Version: {custom_source.version}", 1480 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1481 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1482 ] 1483 ) 1484 1485 1486@mcp_tool( 1487 domain="cloud", 1488 open_world=True, 1489 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1490) 1491def publish_custom_source_definition( 1492 name: Annotated[ 1493 str, 1494 Field(description="The name for the custom connector definition."), 1495 ], 1496 *, 1497 workspace_id: Annotated[ 1498 str | None, 1499 Field( 1500 description=WORKSPACE_ID_TIP_TEXT, 1501 default=None, 1502 ), 1503 ], 1504 manifest_yaml: Annotated[ 1505 str | Path | None, 1506 Field( 1507 description=( 1508 "The Low-code CDK manifest as a YAML string or file path. " 1509 "Required for YAML connectors." 1510 ), 1511 default=None, 1512 ), 1513 ] = None, 1514 unique: Annotated[ 1515 bool, 1516 Field( 1517 description="Whether to require a unique name.", 1518 default=True, 1519 ), 1520 ] = True, 1521 pre_validate: Annotated[ 1522 bool, 1523 Field( 1524 description="Whether to validate the manifest client-side before publishing.", 1525 default=True, 1526 ), 1527 ] = True, 1528 testing_values: Annotated[ 1529 dict | str | None, 1530 Field( 1531 description=( 1532 "Optional testing configuration values for the Builder UI. " 1533 "Can be provided as a JSON object or JSON string. " 1534 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1535 "If provided, these values replace any existing testing values " 1536 "for the connector builder project, allowing immediate test read operations." 1537 ), 1538 default=None, 1539 ), 1540 ], 1541 testing_values_secret_name: Annotated[ 1542 str | None, 1543 Field( 1544 description=( 1545 "Optional name of a secret containing testing configuration values " 1546 "in JSON or YAML format. The secret will be resolved by the MCP " 1547 "server and merged into testing_values, with secret values taking " 1548 "precedence. This lets the agent reference secrets without sending " 1549 "raw values as tool arguments." 1550 ), 1551 default=None, 1552 ), 1553 ], 1554) -> str: 1555 """Publish a custom YAML source connector definition to Airbyte Cloud. 1556 1557 Note: Only YAML (declarative) connectors are currently supported. 1558 Docker-based custom sources are not yet available. 1559 """ 1560 processed_manifest = manifest_yaml 1561 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1562 processed_manifest = Path(manifest_yaml) 1563 1564 # Resolve testing values from inline config and/or secret 1565 testing_values_dict: dict[str, Any] | None = None 1566 if testing_values is not None or testing_values_secret_name is not None: 1567 testing_values_dict = ( 1568 resolve_config( 1569 config=testing_values, 1570 config_secret_name=testing_values_secret_name, 1571 ) 1572 or None 1573 ) 1574 1575 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1576 custom_source = workspace.publish_custom_source_definition( 1577 name=name, 1578 manifest_yaml=processed_manifest, 1579 unique=unique, 1580 pre_validate=pre_validate, 1581 testing_values=testing_values_dict, 1582 ) 1583 register_guid_created_in_session(custom_source.definition_id) 1584 return ( 1585 "Successfully published custom YAML source definition:\n" 1586 + _get_custom_source_definition_description( 1587 custom_source=custom_source, 1588 ) 1589 + "\n" 1590 ) 1591 1592 1593@mcp_tool( 1594 domain="cloud", 1595 read_only=True, 1596 idempotent=True, 1597 open_world=True, 1598) 1599def list_custom_source_definitions( 1600 *, 1601 workspace_id: Annotated[ 1602 str | None, 1603 Field( 1604 description=WORKSPACE_ID_TIP_TEXT, 1605 default=None, 1606 ), 1607 ], 1608) -> list[dict[str, Any]]: 1609 """List custom YAML source definitions in the Airbyte Cloud workspace. 1610 1611 Note: Only YAML (declarative) connectors are currently supported. 1612 Docker-based custom sources are not yet available. 1613 """ 1614 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1615 definitions = workspace.list_custom_source_definitions( 1616 definition_type="yaml", 1617 ) 1618 1619 return [ 1620 { 1621 "definition_id": d.definition_id, 1622 "name": d.name, 1623 "version": d.version, 1624 "connector_builder_project_url": d.connector_builder_project_url, 1625 } 1626 for d in definitions 1627 ] 1628 1629 1630@mcp_tool( 1631 domain="cloud", 1632 read_only=True, 1633 idempotent=True, 1634 open_world=True, 1635) 1636def get_custom_source_definition( 1637 definition_id: Annotated[ 1638 str, 1639 Field(description="The ID of the custom source definition to retrieve."), 1640 ], 1641 *, 1642 workspace_id: Annotated[ 1643 str | None, 1644 Field( 1645 description=WORKSPACE_ID_TIP_TEXT, 1646 default=None, 1647 ), 1648 ], 1649) -> dict[str, Any]: 1650 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1651 1652 Returns the full definition details including the manifest YAML content, 1653 which can be used to inspect or store the connector configuration locally. 1654 1655 Note: Only YAML (declarative) connectors are currently supported. 1656 Docker-based custom sources are not yet available. 1657 """ 1658 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1659 definition = workspace.get_custom_source_definition( 1660 definition_id=definition_id, 1661 definition_type="yaml", 1662 ) 1663 1664 return { 1665 "definition_id": definition.definition_id, 1666 "name": definition.name, 1667 "version": definition.version, 1668 "connector_builder_project_id": definition.connector_builder_project_id, 1669 "connector_builder_project_url": definition.connector_builder_project_url, 1670 "manifest": definition.manifest, 1671 } 1672 1673 1674@mcp_tool( 1675 domain="cloud", 1676 destructive=True, 1677 open_world=True, 1678) 1679def update_custom_source_definition( 1680 definition_id: Annotated[ 1681 str, 1682 Field(description="The ID of the definition to update."), 1683 ], 1684 manifest_yaml: Annotated[ 1685 str | Path | None, 1686 Field( 1687 description=( 1688 "New manifest as YAML string or file path. " 1689 "Optional; omit to update only testing values." 1690 ), 1691 default=None, 1692 ), 1693 ] = None, 1694 *, 1695 workspace_id: Annotated[ 1696 str | None, 1697 Field( 1698 description=WORKSPACE_ID_TIP_TEXT, 1699 default=None, 1700 ), 1701 ], 1702 pre_validate: Annotated[ 1703 bool, 1704 Field( 1705 description="Whether to validate the manifest client-side before updating.", 1706 default=True, 1707 ), 1708 ] = True, 1709 testing_values: Annotated[ 1710 dict | str | None, 1711 Field( 1712 description=( 1713 "Optional testing configuration values for the Builder UI. " 1714 "Can be provided as a JSON object or JSON string. " 1715 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1716 "If provided, these values replace any existing testing values " 1717 "for the connector builder project. The entire testing values object " 1718 "is overwritten, so pass the full set of values you want to persist." 1719 ), 1720 default=None, 1721 ), 1722 ], 1723 testing_values_secret_name: Annotated[ 1724 str | None, 1725 Field( 1726 description=( 1727 "Optional name of a secret containing testing configuration values " 1728 "in JSON or YAML format. The secret will be resolved by the MCP " 1729 "server and merged into testing_values, with secret values taking " 1730 "precedence. This lets the agent reference secrets without sending " 1731 "raw values as tool arguments." 1732 ), 1733 default=None, 1734 ), 1735 ], 1736) -> str: 1737 """Update a custom YAML source definition in Airbyte Cloud. 1738 1739 Updates the manifest and/or testing values for an existing custom source definition. 1740 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1741 """ 1742 check_guid_created_in_session(definition_id) 1743 1744 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1745 1746 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1747 raise PyAirbyteInputError( 1748 message=( 1749 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1750 "must be provided to update a custom source definition." 1751 ), 1752 context={ 1753 "definition_id": definition_id, 1754 "workspace_id": workspace.workspace_id, 1755 }, 1756 ) 1757 1758 processed_manifest: str | Path | None = manifest_yaml 1759 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1760 processed_manifest = Path(manifest_yaml) 1761 1762 # Resolve testing values from inline config and/or secret 1763 testing_values_dict: dict[str, Any] | None = None 1764 if testing_values is not None or testing_values_secret_name is not None: 1765 testing_values_dict = ( 1766 resolve_config( 1767 config=testing_values, 1768 config_secret_name=testing_values_secret_name, 1769 ) 1770 or None 1771 ) 1772 1773 definition = workspace.get_custom_source_definition( 1774 definition_id=definition_id, 1775 definition_type="yaml", 1776 ) 1777 custom_source: CustomCloudSourceDefinition = definition 1778 1779 if processed_manifest is not None: 1780 custom_source = definition.update_definition( 1781 manifest_yaml=processed_manifest, 1782 pre_validate=pre_validate, 1783 ) 1784 1785 if testing_values_dict is not None: 1786 custom_source.set_testing_values(testing_values_dict) 1787 1788 return ( 1789 "Successfully updated custom YAML source definition:\n" 1790 + _get_custom_source_definition_description( 1791 custom_source=custom_source, 1792 ) 1793 ) 1794 1795 1796@mcp_tool( 1797 domain="cloud", 1798 destructive=True, 1799 open_world=True, 1800) 1801def permanently_delete_custom_source_definition( 1802 definition_id: Annotated[ 1803 str, 1804 Field(description="The ID of the custom source definition to delete."), 1805 ], 1806 name: Annotated[ 1807 str, 1808 Field(description="The expected name of the custom source definition (for verification)."), 1809 ], 1810 *, 1811 workspace_id: Annotated[ 1812 str | None, 1813 Field( 1814 description=WORKSPACE_ID_TIP_TEXT, 1815 default=None, 1816 ), 1817 ], 1818) -> str: 1819 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1820 1821 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1822 (case insensitive). 1823 1824 If the connector does not meet this requirement, the deletion will be rejected with a 1825 helpful error message. Instruct the user to rename the connector appropriately to authorize 1826 the deletion. 1827 1828 The provided name must match the actual name of the definition for the operation to proceed. 1829 This is a safety measure to ensure you are deleting the correct resource. 1830 1831 Note: Only YAML (declarative) connectors are currently supported. 1832 Docker-based custom sources are not yet available. 1833 """ 1834 check_guid_created_in_session(definition_id) 1835 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1836 definition = workspace.get_custom_source_definition( 1837 definition_id=definition_id, 1838 definition_type="yaml", 1839 ) 1840 actual_name: str = definition.name 1841 1842 # Verify the name matches 1843 if actual_name != name: 1844 raise PyAirbyteInputError( 1845 message=( 1846 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1847 "The provided name must exactly match the definition's actual name. " 1848 "This is a safety measure to prevent accidental deletion." 1849 ), 1850 context={ 1851 "definition_id": definition_id, 1852 "expected_name": name, 1853 "actual_name": actual_name, 1854 }, 1855 ) 1856 1857 definition.permanently_delete( 1858 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1859 ) 1860 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 1861 1862 1863@mcp_tool( 1864 domain="cloud", 1865 destructive=True, 1866 open_world=True, 1867 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1868) 1869def permanently_delete_cloud_source( 1870 source_id: Annotated[ 1871 str, 1872 Field(description="The ID of the deployed source to delete."), 1873 ], 1874 name: Annotated[ 1875 str, 1876 Field(description="The expected name of the source (for verification)."), 1877 ], 1878) -> str: 1879 """Permanently delete a deployed source connector from Airbyte Cloud. 1880 1881 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1882 (case insensitive). 1883 1884 If the source does not meet this requirement, the deletion will be rejected with a 1885 helpful error message. Instruct the user to rename the source appropriately to authorize 1886 the deletion. 1887 1888 The provided name must match the actual name of the source for the operation to proceed. 1889 This is a safety measure to ensure you are deleting the correct resource. 1890 """ 1891 check_guid_created_in_session(source_id) 1892 workspace: CloudWorkspace = _get_cloud_workspace() 1893 source = workspace.get_source(source_id=source_id) 1894 actual_name: str = cast(str, source.name) 1895 1896 # Verify the name matches 1897 if actual_name != name: 1898 raise PyAirbyteInputError( 1899 message=( 1900 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1901 "The provided name must exactly match the source's actual name. " 1902 "This is a safety measure to prevent accidental deletion." 1903 ), 1904 context={ 1905 "source_id": source_id, 1906 "expected_name": name, 1907 "actual_name": actual_name, 1908 }, 1909 ) 1910 1911 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1912 workspace.permanently_delete_source( 1913 source=source_id, 1914 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1915 ) 1916 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 1917 1918 1919@mcp_tool( 1920 domain="cloud", 1921 destructive=True, 1922 open_world=True, 1923 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1924) 1925def permanently_delete_cloud_destination( 1926 destination_id: Annotated[ 1927 str, 1928 Field(description="The ID of the deployed destination to delete."), 1929 ], 1930 name: Annotated[ 1931 str, 1932 Field(description="The expected name of the destination (for verification)."), 1933 ], 1934) -> str: 1935 """Permanently delete a deployed destination connector from Airbyte Cloud. 1936 1937 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1938 (case insensitive). 1939 1940 If the destination does not meet this requirement, the deletion will be rejected with a 1941 helpful error message. Instruct the user to rename the destination appropriately to authorize 1942 the deletion. 1943 1944 The provided name must match the actual name of the destination for the operation to proceed. 1945 This is a safety measure to ensure you are deleting the correct resource. 1946 """ 1947 check_guid_created_in_session(destination_id) 1948 workspace: CloudWorkspace = _get_cloud_workspace() 1949 destination = workspace.get_destination(destination_id=destination_id) 1950 actual_name: str = cast(str, destination.name) 1951 1952 # Verify the name matches 1953 if actual_name != name: 1954 raise PyAirbyteInputError( 1955 message=( 1956 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1957 "The provided name must exactly match the destination's actual name. " 1958 "This is a safety measure to prevent accidental deletion." 1959 ), 1960 context={ 1961 "destination_id": destination_id, 1962 "expected_name": name, 1963 "actual_name": actual_name, 1964 }, 1965 ) 1966 1967 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1968 workspace.permanently_delete_destination( 1969 destination=destination_id, 1970 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1971 ) 1972 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 1973 1974 1975@mcp_tool( 1976 domain="cloud", 1977 destructive=True, 1978 open_world=True, 1979 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1980) 1981def permanently_delete_cloud_connection( 1982 connection_id: Annotated[ 1983 str, 1984 Field(description="The ID of the connection to delete."), 1985 ], 1986 name: Annotated[ 1987 str, 1988 Field(description="The expected name of the connection (for verification)."), 1989 ], 1990 *, 1991 cascade_delete_source: Annotated[ 1992 bool, 1993 Field( 1994 description=( 1995 "Whether to also delete the source connector associated with this connection." 1996 ), 1997 default=False, 1998 ), 1999 ] = False, 2000 cascade_delete_destination: Annotated[ 2001 bool, 2002 Field( 2003 description=( 2004 "Whether to also delete the destination connector associated with this connection." 2005 ), 2006 default=False, 2007 ), 2008 ] = False, 2009) -> str: 2010 """Permanently delete a connection from Airbyte Cloud. 2011 2012 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2013 (case insensitive). 2014 2015 If the connection does not meet this requirement, the deletion will be rejected with a 2016 helpful error message. Instruct the user to rename the connection appropriately to authorize 2017 the deletion. 2018 2019 The provided name must match the actual name of the connection for the operation to proceed. 2020 This is a safety measure to ensure you are deleting the correct resource. 2021 """ 2022 check_guid_created_in_session(connection_id) 2023 workspace: CloudWorkspace = _get_cloud_workspace() 2024 connection = workspace.get_connection(connection_id=connection_id) 2025 actual_name: str = cast(str, connection.name) 2026 2027 # Verify the name matches 2028 if actual_name != name: 2029 raise PyAirbyteInputError( 2030 message=( 2031 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2032 "The provided name must exactly match the connection's actual name. " 2033 "This is a safety measure to prevent accidental deletion." 2034 ), 2035 context={ 2036 "connection_id": connection_id, 2037 "expected_name": name, 2038 "actual_name": actual_name, 2039 }, 2040 ) 2041 2042 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2043 workspace.permanently_delete_connection( 2044 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2045 connection=connection_id, 2046 cascade_delete_source=cascade_delete_source, 2047 cascade_delete_destination=cascade_delete_destination, 2048 ) 2049 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 2050 2051 2052@mcp_tool( 2053 domain="cloud", 2054 open_world=True, 2055 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2056) 2057def rename_cloud_source( 2058 source_id: Annotated[ 2059 str, 2060 Field(description="The ID of the deployed source to rename."), 2061 ], 2062 name: Annotated[ 2063 str, 2064 Field(description="New name for the source."), 2065 ], 2066 *, 2067 workspace_id: Annotated[ 2068 str | None, 2069 Field( 2070 description=WORKSPACE_ID_TIP_TEXT, 2071 default=None, 2072 ), 2073 ], 2074) -> str: 2075 """Rename a deployed source connector on Airbyte Cloud.""" 2076 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2077 source = workspace.get_source(source_id=source_id) 2078 source.rename(name=name) 2079 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 2080 2081 2082@mcp_tool( 2083 domain="cloud", 2084 destructive=True, 2085 open_world=True, 2086 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2087) 2088def update_cloud_source_config( 2089 source_id: Annotated[ 2090 str, 2091 Field(description="The ID of the deployed source to update."), 2092 ], 2093 config: Annotated[ 2094 dict | str, 2095 Field( 2096 description="New configuration for the source connector.", 2097 ), 2098 ], 2099 config_secret_name: Annotated[ 2100 str | None, 2101 Field( 2102 description="The name of the secret containing the configuration.", 2103 default=None, 2104 ), 2105 ] = None, 2106 *, 2107 workspace_id: Annotated[ 2108 str | None, 2109 Field( 2110 description=WORKSPACE_ID_TIP_TEXT, 2111 default=None, 2112 ), 2113 ], 2114) -> str: 2115 """Update a deployed source connector's configuration on Airbyte Cloud. 2116 2117 This is a destructive operation that can break existing connections if the 2118 configuration is changed incorrectly. Use with caution. 2119 """ 2120 check_guid_created_in_session(source_id) 2121 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2122 source = workspace.get_source(source_id=source_id) 2123 2124 config_dict = resolve_config( 2125 config=config, 2126 config_secret_name=config_secret_name, 2127 config_spec_jsonschema=None, # We don't have the spec here 2128 ) 2129 2130 source.update_config(config=config_dict) 2131 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 2132 2133 2134@mcp_tool( 2135 domain="cloud", 2136 open_world=True, 2137 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2138) 2139def rename_cloud_destination( 2140 destination_id: Annotated[ 2141 str, 2142 Field(description="The ID of the deployed destination to rename."), 2143 ], 2144 name: Annotated[ 2145 str, 2146 Field(description="New name for the destination."), 2147 ], 2148 *, 2149 workspace_id: Annotated[ 2150 str | None, 2151 Field( 2152 description=WORKSPACE_ID_TIP_TEXT, 2153 default=None, 2154 ), 2155 ], 2156) -> str: 2157 """Rename a deployed destination connector on Airbyte Cloud.""" 2158 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2159 destination = workspace.get_destination(destination_id=destination_id) 2160 destination.rename(name=name) 2161 return ( 2162 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2163 f"URL: {destination.connector_url}" 2164 ) 2165 2166 2167@mcp_tool( 2168 domain="cloud", 2169 destructive=True, 2170 open_world=True, 2171 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2172) 2173def update_cloud_destination_config( 2174 destination_id: Annotated[ 2175 str, 2176 Field(description="The ID of the deployed destination to update."), 2177 ], 2178 config: Annotated[ 2179 dict | str, 2180 Field( 2181 description="New configuration for the destination connector.", 2182 ), 2183 ], 2184 config_secret_name: Annotated[ 2185 str | None, 2186 Field( 2187 description="The name of the secret containing the configuration.", 2188 default=None, 2189 ), 2190 ], 2191 *, 2192 workspace_id: Annotated[ 2193 str | None, 2194 Field( 2195 description=WORKSPACE_ID_TIP_TEXT, 2196 default=None, 2197 ), 2198 ], 2199) -> str: 2200 """Update a deployed destination connector's configuration on Airbyte Cloud. 2201 2202 This is a destructive operation that can break existing connections if the 2203 configuration is changed incorrectly. Use with caution. 2204 """ 2205 check_guid_created_in_session(destination_id) 2206 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2207 destination = workspace.get_destination(destination_id=destination_id) 2208 2209 config_dict = resolve_config( 2210 config=config, 2211 config_secret_name=config_secret_name, 2212 config_spec_jsonschema=None, # We don't have the spec here 2213 ) 2214 2215 destination.update_config(config=config_dict) 2216 return ( 2217 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2218 ) 2219 2220 2221@mcp_tool( 2222 domain="cloud", 2223 open_world=True, 2224 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2225) 2226def rename_cloud_connection( 2227 connection_id: Annotated[ 2228 str, 2229 Field(description="The ID of the connection to rename."), 2230 ], 2231 name: Annotated[ 2232 str, 2233 Field(description="New name for the connection."), 2234 ], 2235 *, 2236 workspace_id: Annotated[ 2237 str | None, 2238 Field( 2239 description=WORKSPACE_ID_TIP_TEXT, 2240 default=None, 2241 ), 2242 ], 2243) -> str: 2244 """Rename a connection on Airbyte Cloud.""" 2245 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2246 connection = workspace.get_connection(connection_id=connection_id) 2247 connection.rename(name=name) 2248 return ( 2249 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2250 f"URL: {connection.connection_url}" 2251 ) 2252 2253 2254@mcp_tool( 2255 domain="cloud", 2256 destructive=True, 2257 open_world=True, 2258 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2259) 2260def set_cloud_connection_table_prefix( 2261 connection_id: Annotated[ 2262 str, 2263 Field(description="The ID of the connection to update."), 2264 ], 2265 prefix: Annotated[ 2266 str, 2267 Field(description="New table prefix to use when syncing to the destination."), 2268 ], 2269 *, 2270 workspace_id: Annotated[ 2271 str | None, 2272 Field( 2273 description=WORKSPACE_ID_TIP_TEXT, 2274 default=None, 2275 ), 2276 ], 2277) -> str: 2278 """Set the table prefix for a connection on Airbyte Cloud. 2279 2280 This is a destructive operation that can break downstream dependencies if the 2281 table prefix is changed incorrectly. Use with caution. 2282 """ 2283 check_guid_created_in_session(connection_id) 2284 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2285 connection = workspace.get_connection(connection_id=connection_id) 2286 connection.set_table_prefix(prefix=prefix) 2287 return ( 2288 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2289 f"URL: {connection.connection_url}" 2290 ) 2291 2292 2293@mcp_tool( 2294 domain="cloud", 2295 destructive=True, 2296 open_world=True, 2297 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2298) 2299def set_cloud_connection_selected_streams( 2300 connection_id: Annotated[ 2301 str, 2302 Field(description="The ID of the connection to update."), 2303 ], 2304 stream_names: Annotated[ 2305 str | list[str], 2306 Field( 2307 description=( 2308 "The selected stream names to sync within the connection. " 2309 "Must be an explicit stream name or list of streams." 2310 ) 2311 ), 2312 ], 2313 *, 2314 workspace_id: Annotated[ 2315 str | None, 2316 Field( 2317 description=WORKSPACE_ID_TIP_TEXT, 2318 default=None, 2319 ), 2320 ], 2321) -> str: 2322 """Set the selected streams for a connection on Airbyte Cloud. 2323 2324 This is a destructive operation that can break existing connections if the 2325 stream selection is changed incorrectly. Use with caution. 2326 """ 2327 check_guid_created_in_session(connection_id) 2328 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2329 connection = workspace.get_connection(connection_id=connection_id) 2330 2331 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2332 connection.set_selected_streams(stream_names=resolved_streams_list) 2333 2334 return ( 2335 f"Successfully set selected streams for connection '{connection_id}' " 2336 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2337 ) 2338 2339 2340@mcp_tool( 2341 domain="cloud", 2342 open_world=True, 2343 destructive=True, 2344 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2345) 2346def update_cloud_connection( 2347 connection_id: Annotated[ 2348 str, 2349 Field(description="The ID of the connection to update."), 2350 ], 2351 *, 2352 enabled: Annotated[ 2353 bool | None, 2354 Field( 2355 description=( 2356 "Set the connection's enabled status. " 2357 "True enables the connection (status='active'), " 2358 "False disables it (status='inactive'). " 2359 "Leave unset to keep the current status." 2360 ), 2361 default=None, 2362 ), 2363 ], 2364 cron_expression: Annotated[ 2365 str | None, 2366 Field( 2367 description=( 2368 "A cron expression defining when syncs should run. " 2369 "Examples: '0 0 * * *' (daily at midnight UTC), " 2370 "'0 */6 * * *' (every 6 hours), " 2371 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2372 "Leave unset to keep the current schedule. " 2373 "Cannot be used together with 'manual_schedule'." 2374 ), 2375 default=None, 2376 ), 2377 ], 2378 manual_schedule: Annotated[ 2379 bool | None, 2380 Field( 2381 description=( 2382 "Set to True to disable automatic syncs (manual scheduling only). " 2383 "Syncs will only run when manually triggered. " 2384 "Cannot be used together with 'cron_expression'." 2385 ), 2386 default=None, 2387 ), 2388 ], 2389 workspace_id: Annotated[ 2390 str | None, 2391 Field( 2392 description=WORKSPACE_ID_TIP_TEXT, 2393 default=None, 2394 ), 2395 ], 2396) -> str: 2397 """Update a connection's settings on Airbyte Cloud. 2398 2399 This tool allows updating multiple connection settings in a single call: 2400 - Enable or disable the connection 2401 - Set a cron schedule for automatic syncs 2402 - Switch to manual scheduling (no automatic syncs) 2403 2404 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2405 parameters are mutually exclusive. 2406 """ 2407 check_guid_created_in_session(connection_id) 2408 2409 # Validate that at least one setting is provided 2410 if enabled is None and cron_expression is None and manual_schedule is None: 2411 raise ValueError( 2412 "At least one setting must be provided: 'enabled', 'cron_expression', " 2413 "or 'manual_schedule'." 2414 ) 2415 2416 # Validate mutually exclusive schedule options 2417 if cron_expression is not None and manual_schedule is True: 2418 raise ValueError( 2419 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2420 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2421 "for manual-only syncs." 2422 ) 2423 2424 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2425 connection = workspace.get_connection(connection_id=connection_id) 2426 2427 changes_made: list[str] = [] 2428 2429 # Apply enabled status change 2430 if enabled is not None: 2431 connection.set_enabled(enabled=enabled) 2432 status_str = "enabled" if enabled else "disabled" 2433 changes_made.append(f"status set to '{status_str}'") 2434 2435 # Apply schedule change 2436 if cron_expression is not None: 2437 connection.set_schedule(cron_expression=cron_expression) 2438 changes_made.append(f"schedule set to '{cron_expression}'") 2439 elif manual_schedule is True: 2440 connection.set_manual_schedule() 2441 changes_made.append("schedule set to 'manual'") 2442 2443 changes_summary = ", ".join(changes_made) 2444 return ( 2445 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2446 f"URL: {connection.connection_url}" 2447 ) 2448 2449 2450@mcp_tool( 2451 domain="cloud", 2452 read_only=True, 2453 idempotent=True, 2454 open_world=True, 2455 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2456) 2457def get_connection_artifact( 2458 connection_id: Annotated[ 2459 str, 2460 Field(description="The ID of the Airbyte Cloud connection."), 2461 ], 2462 artifact_type: Annotated[ 2463 Literal["state", "catalog"], 2464 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2465 ], 2466 *, 2467 workspace_id: Annotated[ 2468 str | None, 2469 Field( 2470 description=WORKSPACE_ID_TIP_TEXT, 2471 default=None, 2472 ), 2473 ], 2474) -> dict[str, Any] | list[dict[str, Any]]: 2475 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2476 2477 Retrieves the specified artifact for a connection: 2478 - 'state': Returns the persisted state for incremental syncs as a list of 2479 stream state objects, or {"ERROR": "..."} if no state is set. 2480 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2481 or {"ERROR": "..."} if not found. 2482 """ 2483 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2484 connection = workspace.get_connection(connection_id=connection_id) 2485 2486 if artifact_type == "state": 2487 result = connection.get_state_artifacts() 2488 if result is None: 2489 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2490 return result 2491 2492 # artifact_type == "catalog" 2493 result = connection.get_catalog_artifact() 2494 if result is None: 2495 return {"ERROR": "No catalog found for this connection"} 2496 return result 2497 2498 2499def register_cloud_ops_tools(app: FastMCP) -> None: 2500 """@private Register tools with the FastMCP app. 2501 2502 This is an internal function and should not be called directly. 2503 2504 Tools are filtered based on mode settings: 2505 - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered 2506 - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive 2507 operations are protected by runtime session checks 2508 """ 2509 register_tools(app, domain="cloud")
41class CloudSourceResult(BaseModel): 42 """Information about a deployed source connector in Airbyte Cloud.""" 43 44 id: str 45 """The source ID.""" 46 name: str 47 """Display name of the source.""" 48 url: str 49 """Web URL for managing this source in Airbyte Cloud."""
Information about a deployed source connector in Airbyte Cloud.
52class CloudDestinationResult(BaseModel): 53 """Information about a deployed destination connector in Airbyte Cloud.""" 54 55 id: str 56 """The destination ID.""" 57 name: str 58 """Display name of the destination.""" 59 url: str 60 """Web URL for managing this destination in Airbyte Cloud."""
Information about a deployed destination connector in Airbyte Cloud.
63class CloudConnectionResult(BaseModel): 64 """Information about a deployed connection in Airbyte Cloud.""" 65 66 id: str 67 """The connection ID.""" 68 name: str 69 """Display name of the connection.""" 70 url: str 71 """Web URL for managing this connection in Airbyte Cloud.""" 72 source_id: str 73 """ID of the source used by this connection.""" 74 destination_id: str 75 """ID of the destination used by this connection.""" 76 last_job_status: str | None = None 77 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 78 Only populated when with_connection_status=True.""" 79 last_job_id: int | None = None 80 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 81 last_job_time: str | None = None 82 """ISO 8601 timestamp of the most recent completed sync. 83 Only populated when with_connection_status=True.""" 84 currently_running_job_id: int | None = None 85 """Job ID of a currently running sync, if any. 86 Only populated when with_connection_status=True.""" 87 currently_running_job_start_time: str | None = None 88 """ISO 8601 timestamp of when the currently running sync started. 89 Only populated when with_connection_status=True."""
Information about a deployed connection in Airbyte Cloud.
Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.
Job ID of the most recent completed sync. Only populated when with_connection_status=True.
ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.
92class CloudSourceDetails(BaseModel): 93 """Detailed information about a deployed source connector in Airbyte Cloud.""" 94 95 source_id: str 96 """The source ID.""" 97 source_name: str 98 """Display name of the source.""" 99 source_url: str 100 """Web URL for managing this source in Airbyte Cloud.""" 101 connector_definition_id: str 102 """The connector definition ID (e.g., the ID for 'source-postgres')."""
Detailed information about a deployed source connector in Airbyte Cloud.
105class CloudDestinationDetails(BaseModel): 106 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 107 108 destination_id: str 109 """The destination ID.""" 110 destination_name: str 111 """Display name of the destination.""" 112 destination_url: str 113 """Web URL for managing this destination in Airbyte Cloud.""" 114 connector_definition_id: str 115 """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
Detailed information about a deployed destination connector in Airbyte Cloud.
118class CloudConnectionDetails(BaseModel): 119 """Detailed information about a deployed connection in Airbyte Cloud.""" 120 121 connection_id: str 122 """The connection ID.""" 123 connection_name: str 124 """Display name of the connection.""" 125 connection_url: str 126 """Web URL for managing this connection in Airbyte Cloud.""" 127 source_id: str 128 """ID of the source used by this connection.""" 129 source_name: str 130 """Display name of the source.""" 131 destination_id: str 132 """ID of the destination used by this connection.""" 133 destination_name: str 134 """Display name of the destination.""" 135 selected_streams: list[str] 136 """List of stream names selected for syncing.""" 137 table_prefix: str | None 138 """Table prefix applied when syncing to the destination."""
Detailed information about a deployed connection in Airbyte Cloud.
141class CloudOrganizationResult(BaseModel): 142 """Information about an organization in Airbyte Cloud.""" 143 144 id: str 145 """The organization ID.""" 146 name: str 147 """Display name of the organization.""" 148 email: str 149 """Email associated with the organization."""
Information about an organization in Airbyte Cloud.
152class CloudWorkspaceResult(BaseModel): 153 """Information about a workspace in Airbyte Cloud.""" 154 155 workspace_id: str 156 """The workspace ID.""" 157 workspace_name: str 158 """Display name of the workspace.""" 159 workspace_url: str | None = None 160 """URL to access the workspace in Airbyte Cloud.""" 161 organization_id: str 162 """ID of the organization (requires ORGANIZATION_READER permission).""" 163 organization_name: str | None = None 164 """Name of the organization (requires ORGANIZATION_READER permission)."""
Information about a workspace in Airbyte Cloud.
167class LogReadResult(BaseModel): 168 """Result of reading sync logs with pagination support.""" 169 170 job_id: int 171 """The job ID the logs belong to.""" 172 attempt_number: int 173 """The attempt number the logs belong to.""" 174 log_text: str 175 """The string containing the log text we are returning.""" 176 log_text_start_line: int 177 """1-based line index of the first line returned.""" 178 log_text_line_count: int 179 """Count of lines we are returning.""" 180 total_log_lines_available: int 181 """Total number of log lines available, shows if any lines were missed due to the limit."""
Result of reading sync logs with pagination support.
184class SyncJobResult(BaseModel): 185 """Information about a sync job.""" 186 187 job_id: int 188 """The job ID.""" 189 status: str 190 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 191 bytes_synced: int 192 """Number of bytes synced in this job.""" 193 records_synced: int 194 """Number of records synced in this job.""" 195 start_time: str 196 """ISO 8601 timestamp of when the job started.""" 197 job_url: str 198 """URL to view the job in Airbyte Cloud."""
Information about a sync job.
201class SyncJobListResult(BaseModel): 202 """Result of listing sync jobs with pagination support.""" 203 204 jobs: list[SyncJobResult] 205 """List of sync jobs.""" 206 jobs_count: int 207 """Number of jobs returned in this response.""" 208 jobs_offset: int 209 """Offset used for this request (0 if not specified).""" 210 from_tail: bool 211 """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
Result of listing sync jobs with pagination support.
237@mcp_tool( 238 domain="cloud", 239 open_world=True, 240 extra_help_text=CLOUD_AUTH_TIP_TEXT, 241) 242def deploy_source_to_cloud( 243 source_name: Annotated[ 244 str, 245 Field(description="The name to use when deploying the source."), 246 ], 247 source_connector_name: Annotated[ 248 str, 249 Field(description="The name of the source connector (e.g., 'source-faker')."), 250 ], 251 *, 252 workspace_id: Annotated[ 253 str | None, 254 Field( 255 description=WORKSPACE_ID_TIP_TEXT, 256 default=None, 257 ), 258 ], 259 config: Annotated[ 260 dict | str | None, 261 Field( 262 description="The configuration for the source connector.", 263 default=None, 264 ), 265 ], 266 config_secret_name: Annotated[ 267 str | None, 268 Field( 269 description="The name of the secret containing the configuration.", 270 default=None, 271 ), 272 ], 273 unique: Annotated[ 274 bool, 275 Field( 276 description="Whether to require a unique name.", 277 default=True, 278 ), 279 ], 280) -> str: 281 """Deploy a source connector to Airbyte Cloud.""" 282 source = get_source( 283 source_connector_name, 284 no_executor=True, 285 ) 286 config_dict = resolve_config( 287 config=config, 288 config_secret_name=config_secret_name, 289 config_spec_jsonschema=source.config_spec, 290 ) 291 source.set_config(config_dict, validate=True) 292 293 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 294 deployed_source = workspace.deploy_source( 295 name=source_name, 296 source=source, 297 unique=unique, 298 ) 299 300 register_guid_created_in_session(deployed_source.connector_id) 301 return ( 302 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 303 f" and URL: {deployed_source.connector_url}" 304 )
Deploy a source connector to Airbyte Cloud.
307@mcp_tool( 308 domain="cloud", 309 open_world=True, 310 extra_help_text=CLOUD_AUTH_TIP_TEXT, 311) 312def deploy_destination_to_cloud( 313 destination_name: Annotated[ 314 str, 315 Field(description="The name to use when deploying the destination."), 316 ], 317 destination_connector_name: Annotated[ 318 str, 319 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 320 ], 321 *, 322 workspace_id: Annotated[ 323 str | None, 324 Field( 325 description=WORKSPACE_ID_TIP_TEXT, 326 default=None, 327 ), 328 ], 329 config: Annotated[ 330 dict | str | None, 331 Field( 332 description="The configuration for the destination connector.", 333 default=None, 334 ), 335 ], 336 config_secret_name: Annotated[ 337 str | None, 338 Field( 339 description="The name of the secret containing the configuration.", 340 default=None, 341 ), 342 ], 343 unique: Annotated[ 344 bool, 345 Field( 346 description="Whether to require a unique name.", 347 default=True, 348 ), 349 ], 350) -> str: 351 """Deploy a destination connector to Airbyte Cloud.""" 352 destination = get_destination( 353 destination_connector_name, 354 no_executor=True, 355 ) 356 config_dict = resolve_config( 357 config=config, 358 config_secret_name=config_secret_name, 359 config_spec_jsonschema=destination.config_spec, 360 ) 361 destination.set_config(config_dict, validate=True) 362 363 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 364 deployed_destination = workspace.deploy_destination( 365 name=destination_name, 366 destination=destination, 367 unique=unique, 368 ) 369 370 register_guid_created_in_session(deployed_destination.connector_id) 371 return ( 372 f"Successfully deployed destination '{destination_name}' " 373 f"with ID: {deployed_destination.connector_id}" 374 )
Deploy a destination connector to Airbyte Cloud.
377@mcp_tool( 378 domain="cloud", 379 open_world=True, 380 extra_help_text=CLOUD_AUTH_TIP_TEXT, 381) 382def create_connection_on_cloud( 383 connection_name: Annotated[ 384 str, 385 Field(description="The name of the connection."), 386 ], 387 source_id: Annotated[ 388 str, 389 Field(description="The ID of the deployed source."), 390 ], 391 destination_id: Annotated[ 392 str, 393 Field(description="The ID of the deployed destination."), 394 ], 395 selected_streams: Annotated[ 396 str | list[str], 397 Field( 398 description=( 399 "The selected stream names to sync within the connection. " 400 "Must be an explicit stream name or list of streams. " 401 "Cannot be empty or '*'." 402 ) 403 ), 404 ], 405 *, 406 workspace_id: Annotated[ 407 str | None, 408 Field( 409 description=WORKSPACE_ID_TIP_TEXT, 410 default=None, 411 ), 412 ], 413 table_prefix: Annotated[ 414 str | None, 415 Field( 416 description="Optional table prefix to use when syncing to the destination.", 417 default=None, 418 ), 419 ], 420) -> str: 421 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 422 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 423 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 424 deployed_connection = workspace.deploy_connection( 425 connection_name=connection_name, 426 source=source_id, 427 destination=destination_id, 428 selected_streams=resolved_streams_list, 429 table_prefix=table_prefix, 430 ) 431 432 register_guid_created_in_session(deployed_connection.connection_id) 433 return ( 434 f"Successfully created connection '{connection_name}' " 435 f"with ID '{deployed_connection.connection_id}' and " 436 f"URL: {deployed_connection.connection_url}" 437 )
Create a connection between a deployed source and destination on Airbyte Cloud.
440@mcp_tool( 441 domain="cloud", 442 open_world=True, 443 extra_help_text=CLOUD_AUTH_TIP_TEXT, 444) 445def run_cloud_sync( 446 connection_id: Annotated[ 447 str, 448 Field(description="The ID of the Airbyte Cloud connection."), 449 ], 450 *, 451 workspace_id: Annotated[ 452 str | None, 453 Field( 454 description=WORKSPACE_ID_TIP_TEXT, 455 default=None, 456 ), 457 ], 458 wait: Annotated[ 459 bool, 460 Field( 461 description=( 462 "Whether to wait for the sync to complete. Since a sync can take between several " 463 "minutes and several hours, this option is not recommended for most " 464 "scenarios." 465 ), 466 default=False, 467 ), 468 ], 469 wait_timeout: Annotated[ 470 int, 471 Field( 472 description="Maximum time to wait for sync completion (seconds).", 473 default=300, 474 ), 475 ], 476) -> str: 477 """Run a sync job on Airbyte Cloud.""" 478 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 479 connection = workspace.get_connection(connection_id=connection_id) 480 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 481 482 if wait: 483 status = sync_result.get_job_status() 484 return ( 485 f"Sync completed with status: {status}. " 486 f"Job ID is '{sync_result.job_id}' and " 487 f"job URL is: {sync_result.job_url}" 488 ) 489 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
Run a sync job on Airbyte Cloud.
492@mcp_tool( 493 domain="cloud", 494 read_only=True, 495 idempotent=True, 496 open_world=True, 497 extra_help_text=CLOUD_AUTH_TIP_TEXT, 498) 499def check_airbyte_cloud_workspace( 500 *, 501 workspace_id: Annotated[ 502 str | None, 503 Field( 504 description=WORKSPACE_ID_TIP_TEXT, 505 default=None, 506 ), 507 ], 508) -> CloudWorkspaceResult: 509 """Check if we have a valid Airbyte Cloud connection and return workspace info. 510 511 Returns workspace details including workspace ID, name, and organization info. 512 """ 513 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 514 515 # Get workspace details from the public API using workspace's credentials 516 workspace_response = api_util.get_workspace( 517 workspace_id=workspace.workspace_id, 518 api_root=workspace.api_root, 519 client_id=workspace.client_id, 520 client_secret=workspace.client_secret, 521 bearer_token=workspace.bearer_token, 522 ) 523 524 # Try to get organization info, but fail gracefully if we don't have permissions. 525 # Fetching organization info requires ORGANIZATION_READER permissions on the organization, 526 # which may not be available with workspace-scoped credentials. 527 organization = workspace.get_organization(raise_on_error=False) 528 529 return CloudWorkspaceResult( 530 workspace_id=workspace_response.workspace_id, 531 workspace_name=workspace_response.name, 532 workspace_url=workspace.workspace_url, 533 organization_id=( 534 organization.organization_id 535 if organization 536 else "[unavailable - requires ORGANIZATION_READER permission]" 537 ), 538 organization_name=organization.organization_name if organization else None, 539 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace details including workspace ID, name, and organization info.
542@mcp_tool( 543 domain="cloud", 544 open_world=True, 545 extra_help_text=CLOUD_AUTH_TIP_TEXT, 546) 547def deploy_noop_destination_to_cloud( 548 name: str = "No-op Destination", 549 *, 550 workspace_id: Annotated[ 551 str | None, 552 Field( 553 description=WORKSPACE_ID_TIP_TEXT, 554 default=None, 555 ), 556 ], 557 unique: bool = True, 558) -> str: 559 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 560 destination = get_noop_destination() 561 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 562 deployed_destination = workspace.deploy_destination( 563 name=name, 564 destination=destination, 565 unique=unique, 566 ) 567 register_guid_created_in_session(deployed_destination.connector_id) 568 return ( 569 f"Successfully deployed No-op Destination " 570 f"with ID '{deployed_destination.connector_id}' and " 571 f"URL: {deployed_destination.connector_url}" 572 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
575@mcp_tool( 576 domain="cloud", 577 read_only=True, 578 idempotent=True, 579 open_world=True, 580 extra_help_text=CLOUD_AUTH_TIP_TEXT, 581) 582def get_cloud_sync_status( 583 connection_id: Annotated[ 584 str, 585 Field( 586 description="The ID of the Airbyte Cloud connection.", 587 ), 588 ], 589 job_id: Annotated[ 590 int | None, 591 Field( 592 description="Optional job ID. If not provided, the latest job will be used.", 593 default=None, 594 ), 595 ], 596 *, 597 workspace_id: Annotated[ 598 str | None, 599 Field( 600 description=WORKSPACE_ID_TIP_TEXT, 601 default=None, 602 ), 603 ], 604 include_attempts: Annotated[ 605 bool, 606 Field( 607 description="Whether to include detailed attempts information.", 608 default=False, 609 ), 610 ], 611) -> dict[str, Any]: 612 """Get the status of a sync job from the Airbyte Cloud.""" 613 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 614 connection = workspace.get_connection(connection_id=connection_id) 615 616 # If a job ID is provided, get the job by ID. 617 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 618 619 if not sync_result: 620 return {"status": None, "job_id": None, "attempts": []} 621 622 result = { 623 "status": sync_result.get_job_status(), 624 "job_id": sync_result.job_id, 625 "bytes_synced": sync_result.bytes_synced, 626 "records_synced": sync_result.records_synced, 627 "start_time": sync_result.start_time.isoformat(), 628 "job_url": sync_result.job_url, 629 "attempts": [], 630 } 631 632 if include_attempts: 633 attempts = sync_result.get_attempts() 634 result["attempts"] = [ 635 { 636 "attempt_number": attempt.attempt_number, 637 "attempt_id": attempt.attempt_id, 638 "status": attempt.status, 639 "bytes_synced": attempt.bytes_synced, 640 "records_synced": attempt.records_synced, 641 "created_at": attempt.created_at.isoformat(), 642 } 643 for attempt in attempts 644 ] 645 646 return result
Get the status of a sync job from the Airbyte Cloud.
649@mcp_tool( 650 domain="cloud", 651 read_only=True, 652 idempotent=True, 653 open_world=True, 654 extra_help_text=CLOUD_AUTH_TIP_TEXT, 655) 656def list_cloud_sync_jobs( 657 connection_id: Annotated[ 658 str, 659 Field(description="The ID of the Airbyte Cloud connection."), 660 ], 661 *, 662 workspace_id: Annotated[ 663 str | None, 664 Field( 665 description=WORKSPACE_ID_TIP_TEXT, 666 default=None, 667 ), 668 ], 669 max_jobs: Annotated[ 670 int, 671 Field( 672 description=( 673 "Maximum number of jobs to return. " 674 "Defaults to 20 if not specified. " 675 "Maximum allowed value is 500." 676 ), 677 default=20, 678 ), 679 ], 680 from_tail: Annotated[ 681 bool | None, 682 Field( 683 description=( 684 "When True, jobs are ordered newest-first (createdAt DESC). " 685 "When False, jobs are ordered oldest-first (createdAt ASC). " 686 "Defaults to True if `jobs_offset` is not specified. " 687 "Cannot combine `from_tail=True` with `jobs_offset`." 688 ), 689 default=None, 690 ), 691 ], 692 jobs_offset: Annotated[ 693 int | None, 694 Field( 695 description=( 696 "Number of jobs to skip from the beginning. " 697 "Cannot be combined with `from_tail=True`." 698 ), 699 default=None, 700 ), 701 ], 702) -> SyncJobListResult: 703 """List sync jobs for a connection with pagination support. 704 705 This tool allows you to retrieve a list of sync jobs for a connection, 706 with control over ordering and pagination. By default, jobs are returned 707 newest-first (from_tail=True). 708 """ 709 # Validate that jobs_offset and from_tail are not both set 710 if jobs_offset is not None and from_tail is True: 711 raise PyAirbyteInputError( 712 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 713 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 714 ) 715 716 # Default to from_tail=True if neither is specified 717 if from_tail is None and jobs_offset is None: 718 from_tail = True 719 elif from_tail is None: 720 from_tail = False 721 722 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 723 connection = workspace.get_connection(connection_id=connection_id) 724 725 # Cap at 500 to avoid overloading agent context 726 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 727 728 sync_results = connection.get_previous_sync_logs( 729 limit=effective_limit, 730 offset=jobs_offset, 731 from_tail=from_tail, 732 ) 733 734 jobs = [ 735 SyncJobResult( 736 job_id=sync_result.job_id, 737 status=str(sync_result.get_job_status()), 738 bytes_synced=sync_result.bytes_synced, 739 records_synced=sync_result.records_synced, 740 start_time=sync_result.start_time.isoformat(), 741 job_url=sync_result.job_url, 742 ) 743 for sync_result in sync_results 744 ] 745 746 return SyncJobListResult( 747 jobs=jobs, 748 jobs_count=len(jobs), 749 jobs_offset=jobs_offset or 0, 750 from_tail=from_tail, 751 )
List sync jobs for a connection with pagination support.
This tool allows you to retrieve a list of sync jobs for a connection, with control over ordering and pagination. By default, jobs are returned newest-first (from_tail=True).
754@mcp_tool( 755 domain="cloud", 756 read_only=True, 757 idempotent=True, 758 open_world=True, 759 extra_help_text=CLOUD_AUTH_TIP_TEXT, 760) 761def list_deployed_cloud_source_connectors( 762 *, 763 workspace_id: Annotated[ 764 str | None, 765 Field( 766 description=WORKSPACE_ID_TIP_TEXT, 767 default=None, 768 ), 769 ], 770 name_contains: Annotated[ 771 str | None, 772 Field( 773 description="Optional case-insensitive substring to filter sources by name", 774 default=None, 775 ), 776 ], 777 max_items_limit: Annotated[ 778 int | None, 779 Field( 780 description="Optional maximum number of items to return (default: no limit)", 781 default=None, 782 ), 783 ], 784) -> list[CloudSourceResult]: 785 """List all deployed source connectors in the Airbyte Cloud workspace.""" 786 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 787 sources = workspace.list_sources() 788 789 # Filter by name if requested 790 if name_contains: 791 needle = name_contains.lower() 792 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 793 794 # Apply limit if requested 795 if max_items_limit is not None: 796 sources = sources[:max_items_limit] 797 798 # Note: name and url are guaranteed non-null from list API responses 799 return [ 800 CloudSourceResult( 801 id=source.source_id, 802 name=cast(str, source.name), 803 url=cast(str, source.connector_url), 804 ) 805 for source in sources 806 ]
List all deployed source connectors in the Airbyte Cloud workspace.
809@mcp_tool( 810 domain="cloud", 811 read_only=True, 812 idempotent=True, 813 open_world=True, 814 extra_help_text=CLOUD_AUTH_TIP_TEXT, 815) 816def list_deployed_cloud_destination_connectors( 817 *, 818 workspace_id: Annotated[ 819 str | None, 820 Field( 821 description=WORKSPACE_ID_TIP_TEXT, 822 default=None, 823 ), 824 ], 825 name_contains: Annotated[ 826 str | None, 827 Field( 828 description="Optional case-insensitive substring to filter destinations by name", 829 default=None, 830 ), 831 ], 832 max_items_limit: Annotated[ 833 int | None, 834 Field( 835 description="Optional maximum number of items to return (default: no limit)", 836 default=None, 837 ), 838 ], 839) -> list[CloudDestinationResult]: 840 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 841 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 842 destinations = workspace.list_destinations() 843 844 # Filter by name if requested 845 if name_contains: 846 needle = name_contains.lower() 847 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 848 849 # Apply limit if requested 850 if max_items_limit is not None: 851 destinations = destinations[:max_items_limit] 852 853 # Note: name and url are guaranteed non-null from list API responses 854 return [ 855 CloudDestinationResult( 856 id=destination.destination_id, 857 name=cast(str, destination.name), 858 url=cast(str, destination.connector_url), 859 ) 860 for destination in destinations 861 ]
List all deployed destination connectors in the Airbyte Cloud workspace.
864@mcp_tool( 865 domain="cloud", 866 read_only=True, 867 idempotent=True, 868 open_world=True, 869 extra_help_text=CLOUD_AUTH_TIP_TEXT, 870) 871def describe_cloud_source( 872 source_id: Annotated[ 873 str, 874 Field(description="The ID of the source to describe."), 875 ], 876 *, 877 workspace_id: Annotated[ 878 str | None, 879 Field( 880 description=WORKSPACE_ID_TIP_TEXT, 881 default=None, 882 ), 883 ], 884) -> CloudSourceDetails: 885 """Get detailed information about a specific deployed source connector.""" 886 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 887 source = workspace.get_source(source_id=source_id) 888 889 # Access name property to ensure _connector_info is populated 890 source_name = cast(str, source.name) 891 892 return CloudSourceDetails( 893 source_id=source.source_id, 894 source_name=source_name, 895 source_url=source.connector_url, 896 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 897 )
Get detailed information about a specific deployed source connector.
900@mcp_tool( 901 domain="cloud", 902 read_only=True, 903 idempotent=True, 904 open_world=True, 905 extra_help_text=CLOUD_AUTH_TIP_TEXT, 906) 907def describe_cloud_destination( 908 destination_id: Annotated[ 909 str, 910 Field(description="The ID of the destination to describe."), 911 ], 912 *, 913 workspace_id: Annotated[ 914 str | None, 915 Field( 916 description=WORKSPACE_ID_TIP_TEXT, 917 default=None, 918 ), 919 ], 920) -> CloudDestinationDetails: 921 """Get detailed information about a specific deployed destination connector.""" 922 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 923 destination = workspace.get_destination(destination_id=destination_id) 924 925 # Access name property to ensure _connector_info is populated 926 destination_name = cast(str, destination.name) 927 928 return CloudDestinationDetails( 929 destination_id=destination.destination_id, 930 destination_name=destination_name, 931 destination_url=destination.connector_url, 932 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 933 )
Get detailed information about a specific deployed destination connector.
936@mcp_tool( 937 domain="cloud", 938 read_only=True, 939 idempotent=True, 940 open_world=True, 941 extra_help_text=CLOUD_AUTH_TIP_TEXT, 942) 943def describe_cloud_connection( 944 connection_id: Annotated[ 945 str, 946 Field(description="The ID of the connection to describe."), 947 ], 948 *, 949 workspace_id: Annotated[ 950 str | None, 951 Field( 952 description=WORKSPACE_ID_TIP_TEXT, 953 default=None, 954 ), 955 ], 956) -> CloudConnectionDetails: 957 """Get detailed information about a specific deployed connection.""" 958 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 959 connection = workspace.get_connection(connection_id=connection_id) 960 961 return CloudConnectionDetails( 962 connection_id=connection.connection_id, 963 connection_name=cast(str, connection.name), 964 connection_url=cast(str, connection.connection_url), 965 source_id=connection.source_id, 966 source_name=cast(str, connection.source.name), 967 destination_id=connection.destination_id, 968 destination_name=cast(str, connection.destination.name), 969 selected_streams=connection.stream_names, 970 table_prefix=connection.table_prefix, 971 )
Get detailed information about a specific deployed connection.
974@mcp_tool( 975 domain="cloud", 976 read_only=True, 977 idempotent=True, 978 open_world=True, 979 extra_help_text=CLOUD_AUTH_TIP_TEXT, 980) 981def get_cloud_sync_logs( 982 connection_id: Annotated[ 983 str, 984 Field(description="The ID of the Airbyte Cloud connection."), 985 ], 986 job_id: Annotated[ 987 int | None, 988 Field(description="Optional job ID. If not provided, the latest job will be used."), 989 ] = None, 990 attempt_number: Annotated[ 991 int | None, 992 Field( 993 description="Optional attempt number. If not provided, the latest attempt will be used." 994 ), 995 ] = None, 996 *, 997 workspace_id: Annotated[ 998 str | None, 999 Field( 1000 description=WORKSPACE_ID_TIP_TEXT, 1001 default=None, 1002 ), 1003 ], 1004 max_lines: Annotated[ 1005 int, 1006 Field( 1007 description=( 1008 "Maximum number of lines to return. " 1009 "Defaults to 4000 if not specified. " 1010 "If '0' is provided, no limit is applied." 1011 ), 1012 default=4000, 1013 ), 1014 ], 1015 from_tail: Annotated[ 1016 bool | None, 1017 Field( 1018 description=( 1019 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1020 "Defaults to True if `line_offset` is not specified. " 1021 "Cannot combine `from_tail=True` with `line_offset`." 1022 ), 1023 default=None, 1024 ), 1025 ], 1026 line_offset: Annotated[ 1027 int | None, 1028 Field( 1029 description=( 1030 "Number of lines to skip from the beginning of the logs. " 1031 "Cannot be combined with `from_tail=True`." 1032 ), 1033 default=None, 1034 ), 1035 ], 1036) -> LogReadResult: 1037 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1038 # Validate that line_offset and from_tail are not both set 1039 if line_offset is not None and from_tail: 1040 raise PyAirbyteInputError( 1041 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1042 context={"line_offset": line_offset, "from_tail": from_tail}, 1043 ) 1044 1045 if from_tail is None and line_offset is None: 1046 from_tail = True 1047 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1048 connection = workspace.get_connection(connection_id=connection_id) 1049 1050 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1051 1052 if not sync_result: 1053 raise AirbyteMissingResourceError( 1054 resource_type="sync job", 1055 resource_name_or_id=connection_id, 1056 ) 1057 1058 attempts = sync_result.get_attempts() 1059 1060 if not attempts: 1061 raise AirbyteMissingResourceError( 1062 resource_type="sync attempt", 1063 resource_name_or_id=str(sync_result.job_id), 1064 ) 1065 1066 if attempt_number is not None: 1067 target_attempt = None 1068 for attempt in attempts: 1069 if attempt.attempt_number == attempt_number: 1070 target_attempt = attempt 1071 break 1072 1073 if target_attempt is None: 1074 raise AirbyteMissingResourceError( 1075 resource_type="sync attempt", 1076 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1077 ) 1078 else: 1079 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1080 1081 logs = target_attempt.get_full_log_text() 1082 1083 if not logs: 1084 # Return empty result with zero lines 1085 return LogReadResult( 1086 log_text=( 1087 f"[No logs available for job '{sync_result.job_id}', " 1088 f"attempt {target_attempt.attempt_number}.]" 1089 ), 1090 log_text_start_line=1, 1091 log_text_line_count=0, 1092 total_log_lines_available=0, 1093 job_id=sync_result.job_id, 1094 attempt_number=target_attempt.attempt_number, 1095 ) 1096 1097 # Apply line limiting 1098 log_lines = logs.splitlines() 1099 total_lines = len(log_lines) 1100 1101 # Determine effective max_lines (0 means no limit) 1102 effective_max = total_lines if max_lines == 0 else max_lines 1103 1104 # Calculate start_index and slice based on from_tail or line_offset 1105 if from_tail: 1106 start_index = max(0, total_lines - effective_max) 1107 selected_lines = log_lines[start_index:][:effective_max] 1108 else: 1109 start_index = line_offset or 0 1110 selected_lines = log_lines[start_index : start_index + effective_max] 1111 1112 return LogReadResult( 1113 log_text="\n".join(selected_lines), 1114 log_text_start_line=start_index + 1, # Convert to 1-based index 1115 log_text_line_count=len(selected_lines), 1116 total_log_lines_available=total_lines, 1117 job_id=sync_result.job_id, 1118 attempt_number=target_attempt.attempt_number, 1119 )
Get the logs from a sync job attempt on Airbyte Cloud.
1122@mcp_tool( 1123 domain="cloud", 1124 read_only=True, 1125 idempotent=True, 1126 open_world=True, 1127 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1128) 1129def list_deployed_cloud_connections( 1130 *, 1131 workspace_id: Annotated[ 1132 str | None, 1133 Field( 1134 description=WORKSPACE_ID_TIP_TEXT, 1135 default=None, 1136 ), 1137 ], 1138 name_contains: Annotated[ 1139 str | None, 1140 Field( 1141 description="Optional case-insensitive substring to filter connections by name", 1142 default=None, 1143 ), 1144 ], 1145 max_items_limit: Annotated[ 1146 int | None, 1147 Field( 1148 description="Optional maximum number of items to return (default: no limit)", 1149 default=None, 1150 ), 1151 ], 1152 with_connection_status: Annotated[ 1153 bool | None, 1154 Field( 1155 description="If True, include status info for each connection's most recent sync job", 1156 default=False, 1157 ), 1158 ], 1159 failing_connections_only: Annotated[ 1160 bool | None, 1161 Field( 1162 description="If True, only return connections with failed/cancelled last sync", 1163 default=False, 1164 ), 1165 ], 1166) -> list[CloudConnectionResult]: 1167 """List all deployed connections in the Airbyte Cloud workspace. 1168 1169 When with_connection_status is True, each connection result will include 1170 information about the most recent sync job status, skipping over any 1171 currently in-progress syncs to find the last completed job. 1172 1173 When failing_connections_only is True, only connections where the most 1174 recent completed sync job failed or was cancelled will be returned. 1175 This implicitly enables with_connection_status. 1176 """ 1177 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1178 connections = workspace.list_connections() 1179 1180 # Filter by name if requested 1181 if name_contains: 1182 needle = name_contains.lower() 1183 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1184 1185 # If failing_connections_only is True, implicitly enable with_connection_status 1186 if failing_connections_only: 1187 with_connection_status = True 1188 1189 results: list[CloudConnectionResult] = [] 1190 1191 for connection in connections: 1192 last_job_status: str | None = None 1193 last_job_id: int | None = None 1194 last_job_time: str | None = None 1195 currently_running_job_id: int | None = None 1196 currently_running_job_start_time: str | None = None 1197 1198 if with_connection_status: 1199 sync_logs = connection.get_previous_sync_logs(limit=5) 1200 last_completed_job_status = None # Keep enum for comparison 1201 1202 for sync_result in sync_logs: 1203 job_status = sync_result.get_job_status() 1204 1205 if not sync_result.is_job_complete(): 1206 currently_running_job_id = sync_result.job_id 1207 currently_running_job_start_time = sync_result.start_time.isoformat() 1208 continue 1209 1210 last_completed_job_status = job_status 1211 last_job_status = str(job_status.value) if job_status else None 1212 last_job_id = sync_result.job_id 1213 last_job_time = sync_result.start_time.isoformat() 1214 break 1215 1216 if failing_connections_only and ( 1217 last_completed_job_status is None 1218 or last_completed_job_status not in FAILED_STATUSES 1219 ): 1220 continue 1221 1222 results.append( 1223 CloudConnectionResult( 1224 id=connection.connection_id, 1225 name=cast(str, connection.name), 1226 url=cast(str, connection.connection_url), 1227 source_id=connection.source_id, 1228 destination_id=connection.destination_id, 1229 last_job_status=last_job_status, 1230 last_job_id=last_job_id, 1231 last_job_time=last_job_time, 1232 currently_running_job_id=currently_running_job_id, 1233 currently_running_job_start_time=currently_running_job_start_time, 1234 ) 1235 ) 1236 1237 if max_items_limit is not None and len(results) >= max_items_limit: 1238 break 1239 1240 return results
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include information about the most recent sync job status, skipping over any currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most recent completed sync job failed or was cancelled will be returned. This implicitly enables with_connection_status.
1347@mcp_tool( 1348 domain="cloud", 1349 read_only=True, 1350 idempotent=True, 1351 open_world=True, 1352 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1353) 1354def list_cloud_workspaces( 1355 *, 1356 organization_id: Annotated[ 1357 str | None, 1358 Field( 1359 description="Organization ID. Required if organization_name is not provided.", 1360 default=None, 1361 ), 1362 ], 1363 organization_name: Annotated[ 1364 str | None, 1365 Field( 1366 description=( 1367 "Organization name (exact match). " "Required if organization_id is not provided." 1368 ), 1369 default=None, 1370 ), 1371 ], 1372 name_contains: Annotated[ 1373 str | None, 1374 Field( 1375 description="Optional substring to filter workspaces by name (server-side filtering)", 1376 default=None, 1377 ), 1378 ], 1379 max_items_limit: Annotated[ 1380 int | None, 1381 Field( 1382 description="Optional maximum number of items to return (default: no limit)", 1383 default=None, 1384 ), 1385 ], 1386) -> list[CloudWorkspaceResult]: 1387 """List all workspaces in a specific organization. 1388 1389 Requires either organization_id OR organization_name (exact match) to be provided. 1390 This tool will NOT list workspaces across all organizations - you must specify 1391 which organization to list workspaces from. 1392 """ 1393 credentials = resolve_cloud_credentials() 1394 1395 resolved_org_id = _resolve_organization_id( 1396 organization_id=organization_id, 1397 organization_name=organization_name, 1398 api_root=credentials.api_root, 1399 client_id=credentials.client_id, 1400 client_secret=credentials.client_secret, 1401 bearer_token=credentials.bearer_token, 1402 ) 1403 1404 workspaces = api_util.list_workspaces_in_organization( 1405 organization_id=resolved_org_id, 1406 api_root=credentials.api_root, 1407 client_id=credentials.client_id, 1408 client_secret=credentials.client_secret, 1409 bearer_token=credentials.bearer_token, 1410 name_contains=name_contains, 1411 max_items_limit=max_items_limit, 1412 ) 1413 1414 return [ 1415 CloudWorkspaceResult( 1416 workspace_id=ws.get("workspaceId", ""), 1417 workspace_name=ws.get("name", ""), 1418 organization_id=ws.get("organizationId", ""), 1419 ) 1420 for ws in workspaces 1421 ]
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided. This tool will NOT list workspaces across all organizations - you must specify which organization to list workspaces from.
1424@mcp_tool( 1425 domain="cloud", 1426 read_only=True, 1427 idempotent=True, 1428 open_world=True, 1429 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1430) 1431def describe_cloud_organization( 1432 *, 1433 organization_id: Annotated[ 1434 str | None, 1435 Field( 1436 description="Organization ID. Required if organization_name is not provided.", 1437 default=None, 1438 ), 1439 ], 1440 organization_name: Annotated[ 1441 str | None, 1442 Field( 1443 description=( 1444 "Organization name (exact match). " "Required if organization_id is not provided." 1445 ), 1446 default=None, 1447 ), 1448 ], 1449) -> CloudOrganizationResult: 1450 """Get details about a specific organization. 1451 1452 Requires either organization_id OR organization_name (exact match) to be provided. 1453 This tool is useful for looking up an organization's ID from its name, or vice versa. 1454 """ 1455 credentials = resolve_cloud_credentials() 1456 1457 org = _resolve_organization( 1458 organization_id=organization_id, 1459 organization_name=organization_name, 1460 api_root=credentials.api_root, 1461 client_id=credentials.client_id, 1462 client_secret=credentials.client_secret, 1463 bearer_token=credentials.bearer_token, 1464 ) 1465 1466 return CloudOrganizationResult( 1467 id=org.organization_id, 1468 name=org.organization_name, 1469 email=org.email, 1470 )
Get details about a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided. This tool is useful for looking up an organization's ID from its name, or vice versa.
1487@mcp_tool( 1488 domain="cloud", 1489 open_world=True, 1490 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1491) 1492def publish_custom_source_definition( 1493 name: Annotated[ 1494 str, 1495 Field(description="The name for the custom connector definition."), 1496 ], 1497 *, 1498 workspace_id: Annotated[ 1499 str | None, 1500 Field( 1501 description=WORKSPACE_ID_TIP_TEXT, 1502 default=None, 1503 ), 1504 ], 1505 manifest_yaml: Annotated[ 1506 str | Path | None, 1507 Field( 1508 description=( 1509 "The Low-code CDK manifest as a YAML string or file path. " 1510 "Required for YAML connectors." 1511 ), 1512 default=None, 1513 ), 1514 ] = None, 1515 unique: Annotated[ 1516 bool, 1517 Field( 1518 description="Whether to require a unique name.", 1519 default=True, 1520 ), 1521 ] = True, 1522 pre_validate: Annotated[ 1523 bool, 1524 Field( 1525 description="Whether to validate the manifest client-side before publishing.", 1526 default=True, 1527 ), 1528 ] = True, 1529 testing_values: Annotated[ 1530 dict | str | None, 1531 Field( 1532 description=( 1533 "Optional testing configuration values for the Builder UI. " 1534 "Can be provided as a JSON object or JSON string. " 1535 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1536 "If provided, these values replace any existing testing values " 1537 "for the connector builder project, allowing immediate test read operations." 1538 ), 1539 default=None, 1540 ), 1541 ], 1542 testing_values_secret_name: Annotated[ 1543 str | None, 1544 Field( 1545 description=( 1546 "Optional name of a secret containing testing configuration values " 1547 "in JSON or YAML format. The secret will be resolved by the MCP " 1548 "server and merged into testing_values, with secret values taking " 1549 "precedence. This lets the agent reference secrets without sending " 1550 "raw values as tool arguments." 1551 ), 1552 default=None, 1553 ), 1554 ], 1555) -> str: 1556 """Publish a custom YAML source connector definition to Airbyte Cloud. 1557 1558 Note: Only YAML (declarative) connectors are currently supported. 1559 Docker-based custom sources are not yet available. 1560 """ 1561 processed_manifest = manifest_yaml 1562 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1563 processed_manifest = Path(manifest_yaml) 1564 1565 # Resolve testing values from inline config and/or secret 1566 testing_values_dict: dict[str, Any] | None = None 1567 if testing_values is not None or testing_values_secret_name is not None: 1568 testing_values_dict = ( 1569 resolve_config( 1570 config=testing_values, 1571 config_secret_name=testing_values_secret_name, 1572 ) 1573 or None 1574 ) 1575 1576 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1577 custom_source = workspace.publish_custom_source_definition( 1578 name=name, 1579 manifest_yaml=processed_manifest, 1580 unique=unique, 1581 pre_validate=pre_validate, 1582 testing_values=testing_values_dict, 1583 ) 1584 register_guid_created_in_session(custom_source.definition_id) 1585 return ( 1586 "Successfully published custom YAML source definition:\n" 1587 + _get_custom_source_definition_description( 1588 custom_source=custom_source, 1589 ) 1590 + "\n" 1591 )
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1594@mcp_tool( 1595 domain="cloud", 1596 read_only=True, 1597 idempotent=True, 1598 open_world=True, 1599) 1600def list_custom_source_definitions( 1601 *, 1602 workspace_id: Annotated[ 1603 str | None, 1604 Field( 1605 description=WORKSPACE_ID_TIP_TEXT, 1606 default=None, 1607 ), 1608 ], 1609) -> list[dict[str, Any]]: 1610 """List custom YAML source definitions in the Airbyte Cloud workspace. 1611 1612 Note: Only YAML (declarative) connectors are currently supported. 1613 Docker-based custom sources are not yet available. 1614 """ 1615 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1616 definitions = workspace.list_custom_source_definitions( 1617 definition_type="yaml", 1618 ) 1619 1620 return [ 1621 { 1622 "definition_id": d.definition_id, 1623 "name": d.name, 1624 "version": d.version, 1625 "connector_builder_project_url": d.connector_builder_project_url, 1626 } 1627 for d in definitions 1628 ]
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1631@mcp_tool( 1632 domain="cloud", 1633 read_only=True, 1634 idempotent=True, 1635 open_world=True, 1636) 1637def get_custom_source_definition( 1638 definition_id: Annotated[ 1639 str, 1640 Field(description="The ID of the custom source definition to retrieve."), 1641 ], 1642 *, 1643 workspace_id: Annotated[ 1644 str | None, 1645 Field( 1646 description=WORKSPACE_ID_TIP_TEXT, 1647 default=None, 1648 ), 1649 ], 1650) -> dict[str, Any]: 1651 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1652 1653 Returns the full definition details including the manifest YAML content, 1654 which can be used to inspect or store the connector configuration locally. 1655 1656 Note: Only YAML (declarative) connectors are currently supported. 1657 Docker-based custom sources are not yet available. 1658 """ 1659 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1660 definition = workspace.get_custom_source_definition( 1661 definition_id=definition_id, 1662 definition_type="yaml", 1663 ) 1664 1665 return { 1666 "definition_id": definition.definition_id, 1667 "name": definition.name, 1668 "version": definition.version, 1669 "connector_builder_project_id": definition.connector_builder_project_id, 1670 "connector_builder_project_url": definition.connector_builder_project_url, 1671 "manifest": definition.manifest, 1672 }
Get a custom YAML source definition from Airbyte Cloud, including its manifest.
Returns the full definition details including the manifest YAML content, which can be used to inspect or store the connector configuration locally.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1675@mcp_tool( 1676 domain="cloud", 1677 destructive=True, 1678 open_world=True, 1679) 1680def update_custom_source_definition( 1681 definition_id: Annotated[ 1682 str, 1683 Field(description="The ID of the definition to update."), 1684 ], 1685 manifest_yaml: Annotated[ 1686 str | Path | None, 1687 Field( 1688 description=( 1689 "New manifest as YAML string or file path. " 1690 "Optional; omit to update only testing values." 1691 ), 1692 default=None, 1693 ), 1694 ] = None, 1695 *, 1696 workspace_id: Annotated[ 1697 str | None, 1698 Field( 1699 description=WORKSPACE_ID_TIP_TEXT, 1700 default=None, 1701 ), 1702 ], 1703 pre_validate: Annotated[ 1704 bool, 1705 Field( 1706 description="Whether to validate the manifest client-side before updating.", 1707 default=True, 1708 ), 1709 ] = True, 1710 testing_values: Annotated[ 1711 dict | str | None, 1712 Field( 1713 description=( 1714 "Optional testing configuration values for the Builder UI. " 1715 "Can be provided as a JSON object or JSON string. " 1716 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1717 "If provided, these values replace any existing testing values " 1718 "for the connector builder project. The entire testing values object " 1719 "is overwritten, so pass the full set of values you want to persist." 1720 ), 1721 default=None, 1722 ), 1723 ], 1724 testing_values_secret_name: Annotated[ 1725 str | None, 1726 Field( 1727 description=( 1728 "Optional name of a secret containing testing configuration values " 1729 "in JSON or YAML format. The secret will be resolved by the MCP " 1730 "server and merged into testing_values, with secret values taking " 1731 "precedence. This lets the agent reference secrets without sending " 1732 "raw values as tool arguments." 1733 ), 1734 default=None, 1735 ), 1736 ], 1737) -> str: 1738 """Update a custom YAML source definition in Airbyte Cloud. 1739 1740 Updates the manifest and/or testing values for an existing custom source definition. 1741 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1742 """ 1743 check_guid_created_in_session(definition_id) 1744 1745 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1746 1747 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1748 raise PyAirbyteInputError( 1749 message=( 1750 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1751 "must be provided to update a custom source definition." 1752 ), 1753 context={ 1754 "definition_id": definition_id, 1755 "workspace_id": workspace.workspace_id, 1756 }, 1757 ) 1758 1759 processed_manifest: str | Path | None = manifest_yaml 1760 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1761 processed_manifest = Path(manifest_yaml) 1762 1763 # Resolve testing values from inline config and/or secret 1764 testing_values_dict: dict[str, Any] | None = None 1765 if testing_values is not None or testing_values_secret_name is not None: 1766 testing_values_dict = ( 1767 resolve_config( 1768 config=testing_values, 1769 config_secret_name=testing_values_secret_name, 1770 ) 1771 or None 1772 ) 1773 1774 definition = workspace.get_custom_source_definition( 1775 definition_id=definition_id, 1776 definition_type="yaml", 1777 ) 1778 custom_source: CustomCloudSourceDefinition = definition 1779 1780 if processed_manifest is not None: 1781 custom_source = definition.update_definition( 1782 manifest_yaml=processed_manifest, 1783 pre_validate=pre_validate, 1784 ) 1785 1786 if testing_values_dict is not None: 1787 custom_source.set_testing_values(testing_values_dict) 1788 1789 return ( 1790 "Successfully updated custom YAML source definition:\n" 1791 + _get_custom_source_definition_description( 1792 custom_source=custom_source, 1793 ) 1794 )
Update a custom YAML source definition in Airbyte Cloud.
Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1797@mcp_tool( 1798 domain="cloud", 1799 destructive=True, 1800 open_world=True, 1801) 1802def permanently_delete_custom_source_definition( 1803 definition_id: Annotated[ 1804 str, 1805 Field(description="The ID of the custom source definition to delete."), 1806 ], 1807 name: Annotated[ 1808 str, 1809 Field(description="The expected name of the custom source definition (for verification)."), 1810 ], 1811 *, 1812 workspace_id: Annotated[ 1813 str | None, 1814 Field( 1815 description=WORKSPACE_ID_TIP_TEXT, 1816 default=None, 1817 ), 1818 ], 1819) -> str: 1820 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1821 1822 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1823 (case insensitive). 1824 1825 If the connector does not meet this requirement, the deletion will be rejected with a 1826 helpful error message. Instruct the user to rename the connector appropriately to authorize 1827 the deletion. 1828 1829 The provided name must match the actual name of the definition for the operation to proceed. 1830 This is a safety measure to ensure you are deleting the correct resource. 1831 1832 Note: Only YAML (declarative) connectors are currently supported. 1833 Docker-based custom sources are not yet available. 1834 """ 1835 check_guid_created_in_session(definition_id) 1836 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1837 definition = workspace.get_custom_source_definition( 1838 definition_id=definition_id, 1839 definition_type="yaml", 1840 ) 1841 actual_name: str = definition.name 1842 1843 # Verify the name matches 1844 if actual_name != name: 1845 raise PyAirbyteInputError( 1846 message=( 1847 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1848 "The provided name must exactly match the definition's actual name. " 1849 "This is a safety measure to prevent accidental deletion." 1850 ), 1851 context={ 1852 "definition_id": definition_id, 1853 "expected_name": name, 1854 "actual_name": actual_name, 1855 }, 1856 ) 1857 1858 definition.permanently_delete( 1859 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1860 ) 1861 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1864@mcp_tool( 1865 domain="cloud", 1866 destructive=True, 1867 open_world=True, 1868 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1869) 1870def permanently_delete_cloud_source( 1871 source_id: Annotated[ 1872 str, 1873 Field(description="The ID of the deployed source to delete."), 1874 ], 1875 name: Annotated[ 1876 str, 1877 Field(description="The expected name of the source (for verification)."), 1878 ], 1879) -> str: 1880 """Permanently delete a deployed source connector from Airbyte Cloud. 1881 1882 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1883 (case insensitive). 1884 1885 If the source does not meet this requirement, the deletion will be rejected with a 1886 helpful error message. Instruct the user to rename the source appropriately to authorize 1887 the deletion. 1888 1889 The provided name must match the actual name of the source for the operation to proceed. 1890 This is a safety measure to ensure you are deleting the correct resource. 1891 """ 1892 check_guid_created_in_session(source_id) 1893 workspace: CloudWorkspace = _get_cloud_workspace() 1894 source = workspace.get_source(source_id=source_id) 1895 actual_name: str = cast(str, source.name) 1896 1897 # Verify the name matches 1898 if actual_name != name: 1899 raise PyAirbyteInputError( 1900 message=( 1901 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1902 "The provided name must exactly match the source's actual name. " 1903 "This is a safety measure to prevent accidental deletion." 1904 ), 1905 context={ 1906 "source_id": source_id, 1907 "expected_name": name, 1908 "actual_name": actual_name, 1909 }, 1910 ) 1911 1912 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1913 workspace.permanently_delete_source( 1914 source=source_id, 1915 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1916 ) 1917 return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" (case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the source appropriately to authorize the deletion.
The provided name must match the actual name of the source for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1920@mcp_tool( 1921 domain="cloud", 1922 destructive=True, 1923 open_world=True, 1924 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1925) 1926def permanently_delete_cloud_destination( 1927 destination_id: Annotated[ 1928 str, 1929 Field(description="The ID of the deployed destination to delete."), 1930 ], 1931 name: Annotated[ 1932 str, 1933 Field(description="The expected name of the destination (for verification)."), 1934 ], 1935) -> str: 1936 """Permanently delete a deployed destination connector from Airbyte Cloud. 1937 1938 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1939 (case insensitive). 1940 1941 If the destination does not meet this requirement, the deletion will be rejected with a 1942 helpful error message. Instruct the user to rename the destination appropriately to authorize 1943 the deletion. 1944 1945 The provided name must match the actual name of the destination for the operation to proceed. 1946 This is a safety measure to ensure you are deleting the correct resource. 1947 """ 1948 check_guid_created_in_session(destination_id) 1949 workspace: CloudWorkspace = _get_cloud_workspace() 1950 destination = workspace.get_destination(destination_id=destination_id) 1951 actual_name: str = cast(str, destination.name) 1952 1953 # Verify the name matches 1954 if actual_name != name: 1955 raise PyAirbyteInputError( 1956 message=( 1957 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1958 "The provided name must exactly match the destination's actual name. " 1959 "This is a safety measure to prevent accidental deletion." 1960 ), 1961 context={ 1962 "destination_id": destination_id, 1963 "expected_name": name, 1964 "actual_name": actual_name, 1965 }, 1966 ) 1967 1968 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1969 workspace.permanently_delete_destination( 1970 destination=destination_id, 1971 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1972 ) 1973 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" (case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the destination appropriately to authorize the deletion.
The provided name must match the actual name of the destination for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1976@mcp_tool( 1977 domain="cloud", 1978 destructive=True, 1979 open_world=True, 1980 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1981) 1982def permanently_delete_cloud_connection( 1983 connection_id: Annotated[ 1984 str, 1985 Field(description="The ID of the connection to delete."), 1986 ], 1987 name: Annotated[ 1988 str, 1989 Field(description="The expected name of the connection (for verification)."), 1990 ], 1991 *, 1992 cascade_delete_source: Annotated[ 1993 bool, 1994 Field( 1995 description=( 1996 "Whether to also delete the source connector associated with this connection." 1997 ), 1998 default=False, 1999 ), 2000 ] = False, 2001 cascade_delete_destination: Annotated[ 2002 bool, 2003 Field( 2004 description=( 2005 "Whether to also delete the destination connector associated with this connection." 2006 ), 2007 default=False, 2008 ), 2009 ] = False, 2010) -> str: 2011 """Permanently delete a connection from Airbyte Cloud. 2012 2013 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2014 (case insensitive). 2015 2016 If the connection does not meet this requirement, the deletion will be rejected with a 2017 helpful error message. Instruct the user to rename the connection appropriately to authorize 2018 the deletion. 2019 2020 The provided name must match the actual name of the connection for the operation to proceed. 2021 This is a safety measure to ensure you are deleting the correct resource. 2022 """ 2023 check_guid_created_in_session(connection_id) 2024 workspace: CloudWorkspace = _get_cloud_workspace() 2025 connection = workspace.get_connection(connection_id=connection_id) 2026 actual_name: str = cast(str, connection.name) 2027 2028 # Verify the name matches 2029 if actual_name != name: 2030 raise PyAirbyteInputError( 2031 message=( 2032 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2033 "The provided name must exactly match the connection's actual name. " 2034 "This is a safety measure to prevent accidental deletion." 2035 ), 2036 context={ 2037 "connection_id": connection_id, 2038 "expected_name": name, 2039 "actual_name": actual_name, 2040 }, 2041 ) 2042 2043 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2044 workspace.permanently_delete_connection( 2045 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2046 connection=connection_id, 2047 cascade_delete_source=cascade_delete_source, 2048 cascade_delete_destination=cascade_delete_destination, 2049 ) 2050 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" (case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connection appropriately to authorize the deletion.
The provided name must match the actual name of the connection for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
2053@mcp_tool( 2054 domain="cloud", 2055 open_world=True, 2056 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2057) 2058def rename_cloud_source( 2059 source_id: Annotated[ 2060 str, 2061 Field(description="The ID of the deployed source to rename."), 2062 ], 2063 name: Annotated[ 2064 str, 2065 Field(description="New name for the source."), 2066 ], 2067 *, 2068 workspace_id: Annotated[ 2069 str | None, 2070 Field( 2071 description=WORKSPACE_ID_TIP_TEXT, 2072 default=None, 2073 ), 2074 ], 2075) -> str: 2076 """Rename a deployed source connector on Airbyte Cloud.""" 2077 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2078 source = workspace.get_source(source_id=source_id) 2079 source.rename(name=name) 2080 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
Rename a deployed source connector on Airbyte Cloud.
2083@mcp_tool( 2084 domain="cloud", 2085 destructive=True, 2086 open_world=True, 2087 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2088) 2089def update_cloud_source_config( 2090 source_id: Annotated[ 2091 str, 2092 Field(description="The ID of the deployed source to update."), 2093 ], 2094 config: Annotated[ 2095 dict | str, 2096 Field( 2097 description="New configuration for the source connector.", 2098 ), 2099 ], 2100 config_secret_name: Annotated[ 2101 str | None, 2102 Field( 2103 description="The name of the secret containing the configuration.", 2104 default=None, 2105 ), 2106 ] = None, 2107 *, 2108 workspace_id: Annotated[ 2109 str | None, 2110 Field( 2111 description=WORKSPACE_ID_TIP_TEXT, 2112 default=None, 2113 ), 2114 ], 2115) -> str: 2116 """Update a deployed source connector's configuration on Airbyte Cloud. 2117 2118 This is a destructive operation that can break existing connections if the 2119 configuration is changed incorrectly. Use with caution. 2120 """ 2121 check_guid_created_in_session(source_id) 2122 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2123 source = workspace.get_source(source_id=source_id) 2124 2125 config_dict = resolve_config( 2126 config=config, 2127 config_secret_name=config_secret_name, 2128 config_spec_jsonschema=None, # We don't have the spec here 2129 ) 2130 2131 source.update_config(config=config_dict) 2132 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
2135@mcp_tool( 2136 domain="cloud", 2137 open_world=True, 2138 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2139) 2140def rename_cloud_destination( 2141 destination_id: Annotated[ 2142 str, 2143 Field(description="The ID of the deployed destination to rename."), 2144 ], 2145 name: Annotated[ 2146 str, 2147 Field(description="New name for the destination."), 2148 ], 2149 *, 2150 workspace_id: Annotated[ 2151 str | None, 2152 Field( 2153 description=WORKSPACE_ID_TIP_TEXT, 2154 default=None, 2155 ), 2156 ], 2157) -> str: 2158 """Rename a deployed destination connector on Airbyte Cloud.""" 2159 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2160 destination = workspace.get_destination(destination_id=destination_id) 2161 destination.rename(name=name) 2162 return ( 2163 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2164 f"URL: {destination.connector_url}" 2165 )
Rename a deployed destination connector on Airbyte Cloud.
2168@mcp_tool( 2169 domain="cloud", 2170 destructive=True, 2171 open_world=True, 2172 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2173) 2174def update_cloud_destination_config( 2175 destination_id: Annotated[ 2176 str, 2177 Field(description="The ID of the deployed destination to update."), 2178 ], 2179 config: Annotated[ 2180 dict | str, 2181 Field( 2182 description="New configuration for the destination connector.", 2183 ), 2184 ], 2185 config_secret_name: Annotated[ 2186 str | None, 2187 Field( 2188 description="The name of the secret containing the configuration.", 2189 default=None, 2190 ), 2191 ], 2192 *, 2193 workspace_id: Annotated[ 2194 str | None, 2195 Field( 2196 description=WORKSPACE_ID_TIP_TEXT, 2197 default=None, 2198 ), 2199 ], 2200) -> str: 2201 """Update a deployed destination connector's configuration on Airbyte Cloud. 2202 2203 This is a destructive operation that can break existing connections if the 2204 configuration is changed incorrectly. Use with caution. 2205 """ 2206 check_guid_created_in_session(destination_id) 2207 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2208 destination = workspace.get_destination(destination_id=destination_id) 2209 2210 config_dict = resolve_config( 2211 config=config, 2212 config_secret_name=config_secret_name, 2213 config_spec_jsonschema=None, # We don't have the spec here 2214 ) 2215 2216 destination.update_config(config=config_dict) 2217 return ( 2218 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2219 )
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
2222@mcp_tool( 2223 domain="cloud", 2224 open_world=True, 2225 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2226) 2227def rename_cloud_connection( 2228 connection_id: Annotated[ 2229 str, 2230 Field(description="The ID of the connection to rename."), 2231 ], 2232 name: Annotated[ 2233 str, 2234 Field(description="New name for the connection."), 2235 ], 2236 *, 2237 workspace_id: Annotated[ 2238 str | None, 2239 Field( 2240 description=WORKSPACE_ID_TIP_TEXT, 2241 default=None, 2242 ), 2243 ], 2244) -> str: 2245 """Rename a connection on Airbyte Cloud.""" 2246 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2247 connection = workspace.get_connection(connection_id=connection_id) 2248 connection.rename(name=name) 2249 return ( 2250 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2251 f"URL: {connection.connection_url}" 2252 )
Rename a connection on Airbyte Cloud.
2255@mcp_tool( 2256 domain="cloud", 2257 destructive=True, 2258 open_world=True, 2259 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2260) 2261def set_cloud_connection_table_prefix( 2262 connection_id: Annotated[ 2263 str, 2264 Field(description="The ID of the connection to update."), 2265 ], 2266 prefix: Annotated[ 2267 str, 2268 Field(description="New table prefix to use when syncing to the destination."), 2269 ], 2270 *, 2271 workspace_id: Annotated[ 2272 str | None, 2273 Field( 2274 description=WORKSPACE_ID_TIP_TEXT, 2275 default=None, 2276 ), 2277 ], 2278) -> str: 2279 """Set the table prefix for a connection on Airbyte Cloud. 2280 2281 This is a destructive operation that can break downstream dependencies if the 2282 table prefix is changed incorrectly. Use with caution. 2283 """ 2284 check_guid_created_in_session(connection_id) 2285 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2286 connection = workspace.get_connection(connection_id=connection_id) 2287 connection.set_table_prefix(prefix=prefix) 2288 return ( 2289 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2290 f"URL: {connection.connection_url}" 2291 )
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.
2294@mcp_tool( 2295 domain="cloud", 2296 destructive=True, 2297 open_world=True, 2298 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2299) 2300def set_cloud_connection_selected_streams( 2301 connection_id: Annotated[ 2302 str, 2303 Field(description="The ID of the connection to update."), 2304 ], 2305 stream_names: Annotated[ 2306 str | list[str], 2307 Field( 2308 description=( 2309 "The selected stream names to sync within the connection. " 2310 "Must be an explicit stream name or list of streams." 2311 ) 2312 ), 2313 ], 2314 *, 2315 workspace_id: Annotated[ 2316 str | None, 2317 Field( 2318 description=WORKSPACE_ID_TIP_TEXT, 2319 default=None, 2320 ), 2321 ], 2322) -> str: 2323 """Set the selected streams for a connection on Airbyte Cloud. 2324 2325 This is a destructive operation that can break existing connections if the 2326 stream selection is changed incorrectly. Use with caution. 2327 """ 2328 check_guid_created_in_session(connection_id) 2329 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2330 connection = workspace.get_connection(connection_id=connection_id) 2331 2332 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2333 connection.set_selected_streams(stream_names=resolved_streams_list) 2334 2335 return ( 2336 f"Successfully set selected streams for connection '{connection_id}' " 2337 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2338 )
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
2341@mcp_tool( 2342 domain="cloud", 2343 open_world=True, 2344 destructive=True, 2345 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2346) 2347def update_cloud_connection( 2348 connection_id: Annotated[ 2349 str, 2350 Field(description="The ID of the connection to update."), 2351 ], 2352 *, 2353 enabled: Annotated[ 2354 bool | None, 2355 Field( 2356 description=( 2357 "Set the connection's enabled status. " 2358 "True enables the connection (status='active'), " 2359 "False disables it (status='inactive'). " 2360 "Leave unset to keep the current status." 2361 ), 2362 default=None, 2363 ), 2364 ], 2365 cron_expression: Annotated[ 2366 str | None, 2367 Field( 2368 description=( 2369 "A cron expression defining when syncs should run. " 2370 "Examples: '0 0 * * *' (daily at midnight UTC), " 2371 "'0 */6 * * *' (every 6 hours), " 2372 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2373 "Leave unset to keep the current schedule. " 2374 "Cannot be used together with 'manual_schedule'." 2375 ), 2376 default=None, 2377 ), 2378 ], 2379 manual_schedule: Annotated[ 2380 bool | None, 2381 Field( 2382 description=( 2383 "Set to True to disable automatic syncs (manual scheduling only). " 2384 "Syncs will only run when manually triggered. " 2385 "Cannot be used together with 'cron_expression'." 2386 ), 2387 default=None, 2388 ), 2389 ], 2390 workspace_id: Annotated[ 2391 str | None, 2392 Field( 2393 description=WORKSPACE_ID_TIP_TEXT, 2394 default=None, 2395 ), 2396 ], 2397) -> str: 2398 """Update a connection's settings on Airbyte Cloud. 2399 2400 This tool allows updating multiple connection settings in a single call: 2401 - Enable or disable the connection 2402 - Set a cron schedule for automatic syncs 2403 - Switch to manual scheduling (no automatic syncs) 2404 2405 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2406 parameters are mutually exclusive. 2407 """ 2408 check_guid_created_in_session(connection_id) 2409 2410 # Validate that at least one setting is provided 2411 if enabled is None and cron_expression is None and manual_schedule is None: 2412 raise ValueError( 2413 "At least one setting must be provided: 'enabled', 'cron_expression', " 2414 "or 'manual_schedule'." 2415 ) 2416 2417 # Validate mutually exclusive schedule options 2418 if cron_expression is not None and manual_schedule is True: 2419 raise ValueError( 2420 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2421 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2422 "for manual-only syncs." 2423 ) 2424 2425 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2426 connection = workspace.get_connection(connection_id=connection_id) 2427 2428 changes_made: list[str] = [] 2429 2430 # Apply enabled status change 2431 if enabled is not None: 2432 connection.set_enabled(enabled=enabled) 2433 status_str = "enabled" if enabled else "disabled" 2434 changes_made.append(f"status set to '{status_str}'") 2435 2436 # Apply schedule change 2437 if cron_expression is not None: 2438 connection.set_schedule(cron_expression=cron_expression) 2439 changes_made.append(f"schedule set to '{cron_expression}'") 2440 elif manual_schedule is True: 2441 connection.set_manual_schedule() 2442 changes_made.append("schedule set to 'manual'") 2443 2444 changes_summary = ", ".join(changes_made) 2445 return ( 2446 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2447 f"URL: {connection.connection_url}" 2448 )
Update a connection's settings on Airbyte Cloud.
This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)
At least one setting must be provided. The 'cron_expression' and 'manual_schedule' parameters are mutually exclusive.
2451@mcp_tool( 2452 domain="cloud", 2453 read_only=True, 2454 idempotent=True, 2455 open_world=True, 2456 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2457) 2458def get_connection_artifact( 2459 connection_id: Annotated[ 2460 str, 2461 Field(description="The ID of the Airbyte Cloud connection."), 2462 ], 2463 artifact_type: Annotated[ 2464 Literal["state", "catalog"], 2465 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2466 ], 2467 *, 2468 workspace_id: Annotated[ 2469 str | None, 2470 Field( 2471 description=WORKSPACE_ID_TIP_TEXT, 2472 default=None, 2473 ), 2474 ], 2475) -> dict[str, Any] | list[dict[str, Any]]: 2476 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2477 2478 Retrieves the specified artifact for a connection: 2479 - 'state': Returns the persisted state for incremental syncs as a list of 2480 stream state objects, or {"ERROR": "..."} if no state is set. 2481 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2482 or {"ERROR": "..."} if not found. 2483 """ 2484 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2485 connection = workspace.get_connection(connection_id=connection_id) 2486 2487 if artifact_type == "state": 2488 result = connection.get_state_artifacts() 2489 if result is None: 2490 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2491 return result 2492 2493 # artifact_type == "catalog" 2494 result = connection.get_catalog_artifact() 2495 if result is None: 2496 return {"ERROR": "No catalog found for this connection"} 2497 return result
Get a connection artifact (state or catalog) from Airbyte Cloud.
Retrieves the specified artifact for a connection:
- 'state': Returns the persisted state for incremental syncs as a list of stream state objects, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict, or {"ERROR": "..."} if not found.