airbyte.mcp.cloud_ops
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from pathlib import Path 5from typing import Annotated, Any 6 7from fastmcp import FastMCP 8from pydantic import Field 9 10from airbyte import cloud, get_destination, get_source 11from airbyte.cloud.auth import ( 12 resolve_cloud_api_url, 13 resolve_cloud_client_id, 14 resolve_cloud_client_secret, 15 resolve_cloud_workspace_id, 16) 17from airbyte.cloud.connections import CloudConnection 18from airbyte.cloud.connectors import CloudDestination, CloudSource, CustomCloudSourceDefinition 19from airbyte.cloud.workspaces import CloudWorkspace 20from airbyte.destinations.util import get_noop_destination 21from airbyte.mcp._tool_utils import ( 22 check_guid_created_in_session, 23 mcp_tool, 24 register_guid_created_in_session, 25 register_tools, 26) 27from airbyte.mcp._util import resolve_config, resolve_list_of_strings 28 29 30def _get_cloud_workspace() -> CloudWorkspace: 31 """Get an authenticated CloudWorkspace using environment variables.""" 32 return CloudWorkspace( 33 workspace_id=resolve_cloud_workspace_id(), 34 client_id=resolve_cloud_client_id(), 35 client_secret=resolve_cloud_client_secret(), 36 api_root=resolve_cloud_api_url(), 37 ) 38 39 40@mcp_tool( 41 domain="cloud", 42 open_world=True, 43) 44def deploy_source_to_cloud( 45 source_name: Annotated[ 46 str, 47 Field(description="The name to use when deploying the source."), 48 ], 49 source_connector_name: Annotated[ 50 str, 51 Field(description="The name of the source connector (e.g., 'source-faker')."), 52 ], 53 *, 54 config: Annotated[ 55 dict | str | None, 56 Field( 57 description="The configuration for the source connector.", 58 default=None, 59 ), 60 ], 61 config_secret_name: Annotated[ 62 str | None, 63 Field( 64 description="The name of the secret containing the configuration.", 65 default=None, 66 ), 67 ], 68 unique: Annotated[ 69 bool, 70 Field( 71 description="Whether to require a unique name.", 72 default=True, 73 ), 74 ], 75) -> str: 76 """Deploy a source connector to Airbyte Cloud. 77 78 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 79 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 80 Airbyte Cloud API. 81 """ 82 try: 83 source = get_source( 84 source_connector_name, 85 install_if_missing=False, 86 ) 87 config_dict = resolve_config( 88 config=config, 89 config_secret_name=config_secret_name, 90 config_spec_jsonschema=source.config_spec, 91 ) 92 source.set_config(config_dict) 93 94 workspace: CloudWorkspace = _get_cloud_workspace() 95 deployed_source = workspace.deploy_source( 96 name=source_name, 97 source=source, 98 unique=unique, 99 ) 100 101 except Exception as ex: 102 return f"Failed to deploy source '{source_name}': {ex}" 103 else: 104 register_guid_created_in_session(deployed_source.connector_id) 105 return ( 106 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 107 f" and URL: {deployed_source.connector_url}" 108 ) 109 110 111@mcp_tool( 112 domain="cloud", 113 open_world=True, 114) 115def deploy_destination_to_cloud( 116 destination_name: Annotated[ 117 str, 118 Field(description="The name to use when deploying the destination."), 119 ], 120 destination_connector_name: Annotated[ 121 str, 122 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 123 ], 124 *, 125 config: Annotated[ 126 dict | str | None, 127 Field( 128 description="The configuration for the destination connector.", 129 default=None, 130 ), 131 ], 132 config_secret_name: Annotated[ 133 str | None, 134 Field( 135 description="The name of the secret containing the configuration.", 136 default=None, 137 ), 138 ], 139 unique: Annotated[ 140 bool, 141 Field( 142 description="Whether to require a unique name.", 143 default=True, 144 ), 145 ], 146) -> str: 147 """Deploy a destination connector to Airbyte Cloud. 148 149 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 150 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 151 Airbyte Cloud API. 152 """ 153 try: 154 destination = get_destination( 155 destination_connector_name, 156 install_if_missing=False, 157 ) 158 config_dict = resolve_config( 159 config=config, 160 config_secret_name=config_secret_name, 161 config_spec_jsonschema=destination.config_spec, 162 ) 163 destination.set_config(config_dict) 164 165 workspace: CloudWorkspace = _get_cloud_workspace() 166 deployed_destination = workspace.deploy_destination( 167 name=destination_name, 168 destination=destination, 169 unique=unique, 170 ) 171 172 except Exception as ex: 173 return f"Failed to deploy destination '{destination_name}': {ex}" 174 else: 175 register_guid_created_in_session(deployed_destination.connector_id) 176 return ( 177 f"Successfully deployed destination '{destination_name}' " 178 f"with ID: {deployed_destination.connector_id}" 179 ) 180 181 182@mcp_tool( 183 domain="cloud", 184 open_world=True, 185) 186def create_connection_on_cloud( 187 connection_name: Annotated[ 188 str, 189 Field(description="The name of the connection."), 190 ], 191 source_id: Annotated[ 192 str, 193 Field(description="The ID of the deployed source."), 194 ], 195 destination_id: Annotated[ 196 str, 197 Field(description="The ID of the deployed destination."), 198 ], 199 selected_streams: Annotated[ 200 str | list[str], 201 Field( 202 description=( 203 "The selected stream names to sync within the connection. " 204 "Must be an explicit stream name or list of streams. " 205 "Cannot be empty or '*'." 206 ) 207 ), 208 ], 209 table_prefix: Annotated[ 210 str | None, 211 Field( 212 description="Optional table prefix to use when syncing to the destination.", 213 default=None, 214 ), 215 ], 216) -> str: 217 """Create a connection between a deployed source and destination on Airbyte Cloud. 218 219 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 220 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 221 Airbyte Cloud API. 222 """ 223 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 224 try: 225 workspace: CloudWorkspace = _get_cloud_workspace() 226 deployed_connection = workspace.deploy_connection( 227 connection_name=connection_name, 228 source=source_id, 229 destination=destination_id, 230 selected_streams=resolved_streams_list, 231 table_prefix=table_prefix, 232 ) 233 234 except Exception as ex: 235 return f"Failed to create connection '{connection_name}': {ex}" 236 else: 237 register_guid_created_in_session(deployed_connection.connection_id) 238 return ( 239 f"Successfully created connection '{connection_name}' " 240 f"with ID '{deployed_connection.connection_id}' and " 241 f"URL: {deployed_connection.connection_url}" 242 ) 243 244 245@mcp_tool( 246 domain="cloud", 247 open_world=True, 248) 249def run_cloud_sync( 250 connection_id: Annotated[ 251 str, 252 Field(description="The ID of the Airbyte Cloud connection."), 253 ], 254 *, 255 wait: Annotated[ 256 bool, 257 Field( 258 description="Whether to wait for the sync to complete.", 259 default=True, 260 ), 261 ], 262 wait_timeout: Annotated[ 263 int, 264 Field( 265 description="Maximum time to wait for sync completion (seconds).", 266 default=300, 267 ), 268 ], 269) -> str: 270 """Run a sync job on Airbyte Cloud. 271 272 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 273 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 274 Airbyte Cloud API. 275 """ 276 try: 277 workspace: CloudWorkspace = _get_cloud_workspace() 278 connection = workspace.get_connection(connection_id=connection_id) 279 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 280 281 except Exception as ex: 282 return f"Failed to run sync for connection '{connection_id}': {ex}" 283 else: 284 if wait: 285 status = sync_result.get_job_status() 286 return ( 287 f"Sync completed with status: {status}. " # Sync completed. 288 f"Job ID is '{sync_result.job_id}' and " 289 f"job URL is: {sync_result.job_url}" 290 ) 291 return ( 292 f"Sync started. " # Sync started. 293 f"Job ID is '{sync_result.job_id}' and " 294 f"job URL is: {sync_result.job_url}" 295 ) 296 297 298@mcp_tool( 299 domain="cloud", 300 read_only=True, 301 idempotent=True, 302 open_world=True, 303) 304def check_airbyte_cloud_workspace() -> str: 305 """Check if we have a valid Airbyte Cloud connection and return workspace info. 306 307 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 308 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 309 Airbyte Cloud API. 310 311 Returns workspace ID and workspace URL for verification. 312 """ 313 try: 314 workspace: CloudWorkspace = _get_cloud_workspace() 315 workspace.connect() 316 317 except Exception as ex: 318 return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}" 319 else: 320 return ( 321 f"✅ Successfully connected to Airbyte Cloud workspace.\n" 322 f"Workspace ID: {workspace.workspace_id}\n" 323 f"Workspace URL: {workspace.workspace_url}" 324 ) 325 326 327@mcp_tool( 328 domain="cloud", 329 open_world=True, 330) 331def deploy_noop_destination_to_cloud( 332 name: str = "No-op Destination", 333 *, 334 unique: bool = True, 335) -> str: 336 """Deploy the No-op destination to Airbyte Cloud for testing purposes. 337 338 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 339 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 340 Airbyte Cloud API. 341 """ 342 try: 343 destination = get_noop_destination() 344 workspace: CloudWorkspace = _get_cloud_workspace() 345 deployed_destination = workspace.deploy_destination( 346 name=name, 347 destination=destination, 348 unique=unique, 349 ) 350 except Exception as ex: 351 return f"Failed to deploy No-op Destination: {ex}" 352 else: 353 register_guid_created_in_session(deployed_destination.connector_id) 354 return ( 355 f"Successfully deployed No-op Destination " 356 f"with ID '{deployed_destination.connector_id}' and " 357 f"URL: {deployed_destination.connector_url}" 358 ) 359 360 361@mcp_tool( 362 domain="cloud", 363 read_only=True, 364 idempotent=True, 365 open_world=True, 366) 367def get_cloud_sync_status( 368 connection_id: Annotated[ 369 str, 370 Field( 371 description="The ID of the Airbyte Cloud connection.", 372 ), 373 ], 374 job_id: Annotated[ 375 int | None, 376 Field( 377 description="Optional job ID. If not provided, the latest job will be used.", 378 default=None, 379 ), 380 ], 381 *, 382 include_attempts: Annotated[ 383 bool, 384 Field( 385 description="Whether to include detailed attempts information.", 386 default=False, 387 ), 388 ], 389) -> dict[str, Any]: 390 """Get the status of a sync job from the Airbyte Cloud. 391 392 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 393 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 394 Airbyte Cloud API. 395 """ 396 try: 397 workspace: CloudWorkspace = _get_cloud_workspace() 398 connection = workspace.get_connection(connection_id=connection_id) 399 400 # If a job ID is provided, get the job by ID. 401 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 402 403 if not sync_result: 404 return {"status": None, "job_id": None, "attempts": []} 405 406 result = { 407 "status": sync_result.get_job_status(), 408 "job_id": sync_result.job_id, 409 "bytes_synced": sync_result.bytes_synced, 410 "records_synced": sync_result.records_synced, 411 "start_time": sync_result.start_time.isoformat(), 412 "job_url": sync_result.job_url, 413 "attempts": [], 414 } 415 416 if include_attempts: 417 attempts = sync_result.get_attempts() 418 result["attempts"] = [ 419 { 420 "attempt_number": attempt.attempt_number, 421 "attempt_id": attempt.attempt_id, 422 "status": attempt.status, 423 "bytes_synced": attempt.bytes_synced, 424 "records_synced": attempt.records_synced, 425 "created_at": attempt.created_at.isoformat(), 426 } 427 for attempt in attempts 428 ] 429 430 return result # noqa: TRY300 431 432 except Exception as ex: 433 return { 434 "status": None, 435 "job_id": job_id, 436 "error": f"Failed to get sync status for connection '{connection_id}': {ex}", 437 "attempts": [], 438 } 439 440 441@mcp_tool( 442 domain="cloud", 443 read_only=True, 444 idempotent=True, 445 open_world=True, 446) 447def list_deployed_cloud_source_connectors() -> list[CloudSource]: 448 """List all deployed source connectors in the Airbyte Cloud workspace. 449 450 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 451 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 452 Airbyte Cloud API. 453 """ 454 workspace: CloudWorkspace = _get_cloud_workspace() 455 return workspace.list_sources() 456 457 458@mcp_tool( 459 domain="cloud", 460 read_only=True, 461 idempotent=True, 462 open_world=True, 463) 464def list_deployed_cloud_destination_connectors() -> list[CloudDestination]: 465 """List all deployed destination connectors in the Airbyte Cloud workspace. 466 467 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 468 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 469 Airbyte Cloud API. 470 """ 471 workspace: CloudWorkspace = _get_cloud_workspace() 472 return workspace.list_destinations() 473 474 475@mcp_tool( 476 domain="cloud", 477 read_only=True, 478 idempotent=True, 479 open_world=True, 480) 481def get_cloud_sync_logs( 482 connection_id: Annotated[ 483 str, 484 Field(description="The ID of the Airbyte Cloud connection."), 485 ], 486 job_id: Annotated[ 487 int | None, 488 Field(description="Optional job ID. If not provided, the latest job will be used."), 489 ] = None, 490 attempt_number: Annotated[ 491 int | None, 492 Field( 493 description="Optional attempt number. If not provided, the latest attempt will be used." 494 ), 495 ] = None, 496) -> str: 497 """Get the logs from a sync job attempt on Airbyte Cloud. 498 499 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 500 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 501 Airbyte Cloud API. 502 """ 503 try: 504 workspace: CloudWorkspace = _get_cloud_workspace() 505 connection = workspace.get_connection(connection_id=connection_id) 506 507 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 508 509 if not sync_result: 510 return f"No sync job found for connection '{connection_id}'" 511 512 attempts = sync_result.get_attempts() 513 514 if not attempts: 515 return f"No attempts found for job '{sync_result.job_id}'" 516 517 if attempt_number is not None: 518 target_attempt = None 519 for attempt in attempts: 520 if attempt.attempt_number == attempt_number: 521 target_attempt = attempt 522 break 523 524 if target_attempt is None: 525 return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" 526 else: 527 target_attempt = max(attempts, key=lambda a: a.attempt_number) 528 529 logs = target_attempt.get_full_log_text() 530 531 if not logs: 532 return ( 533 f"No logs available for job '{sync_result.job_id}', " 534 f"attempt {target_attempt.attempt_number}" 535 ) 536 537 return logs # noqa: TRY300 538 539 except Exception as ex: 540 return f"Failed to get logs for connection '{connection_id}': {ex}" 541 542 543@mcp_tool( 544 domain="cloud", 545 read_only=True, 546 idempotent=True, 547 open_world=True, 548) 549def list_deployed_cloud_connections() -> list[CloudConnection]: 550 """List all deployed connections in the Airbyte Cloud workspace. 551 552 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 553 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 554 Airbyte Cloud API. 555 """ 556 workspace: CloudWorkspace = _get_cloud_workspace() 557 return workspace.list_connections() 558 559 560def _get_custom_source_definition_description( 561 custom_source: CustomCloudSourceDefinition, 562) -> str: 563 return "\n".join( 564 [ 565 f" - Custom Source Name: {custom_source.name}", 566 f" - Definition ID: {custom_source.definition_id}", 567 f" - Definition Version: {custom_source.version}", 568 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 569 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 570 ] 571 ) 572 573 574@mcp_tool( 575 domain="cloud", 576 open_world=True, 577) 578def publish_custom_source_definition( 579 name: Annotated[ 580 str, 581 Field(description="The name for the custom connector definition."), 582 ], 583 *, 584 manifest_yaml: Annotated[ 585 str | Path | None, 586 Field( 587 description=( 588 "The Low-code CDK manifest as a YAML string or file path. " 589 "Required for YAML connectors." 590 ), 591 default=None, 592 ), 593 ] = None, 594 unique: Annotated[ 595 bool, 596 Field( 597 description="Whether to require a unique name.", 598 default=True, 599 ), 600 ] = True, 601 pre_validate: Annotated[ 602 bool, 603 Field( 604 description="Whether to validate the manifest client-side before publishing.", 605 default=True, 606 ), 607 ] = True, 608) -> str: 609 """Publish a custom YAML source connector definition to Airbyte Cloud. 610 611 Note: Only YAML (declarative) connectors are currently supported. 612 Docker-based custom sources are not yet available. 613 """ 614 try: 615 processed_manifest = manifest_yaml 616 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 617 processed_manifest = Path(manifest_yaml) 618 619 workspace: CloudWorkspace = _get_cloud_workspace() 620 custom_source = workspace.publish_custom_source_definition( 621 name=name, 622 manifest_yaml=processed_manifest, 623 unique=unique, 624 pre_validate=pre_validate, 625 ) 626 except Exception as ex: 627 return f"Failed to publish custom source definition '{name}': {ex}" 628 else: 629 register_guid_created_in_session(custom_source.definition_id) 630 return ( 631 "Successfully published custom YAML source definition:\n" 632 + _get_custom_source_definition_description( 633 custom_source=custom_source, 634 ) 635 + "\n" 636 ) 637 638 639@mcp_tool( 640 domain="cloud", 641 read_only=True, 642 idempotent=True, 643 open_world=True, 644) 645def list_custom_source_definitions() -> list[dict[str, Any]]: 646 """List custom YAML source definitions in the Airbyte Cloud workspace. 647 648 Note: Only YAML (declarative) connectors are currently supported. 649 Docker-based custom sources are not yet available. 650 """ 651 workspace: CloudWorkspace = _get_cloud_workspace() 652 definitions = workspace.list_custom_source_definitions( 653 definition_type="yaml", 654 ) 655 656 return [ 657 { 658 "definition_id": d.definition_id, 659 "name": d.name, 660 "version": d.version, 661 "connector_builder_project_url": d.connector_builder_project_url, 662 } 663 for d in definitions 664 ] 665 666 667@mcp_tool( 668 domain="cloud", 669 destructive=True, 670 open_world=True, 671) 672def update_custom_source_definition( 673 definition_id: Annotated[ 674 str, 675 Field(description="The ID of the definition to update."), 676 ], 677 manifest_yaml: Annotated[ 678 str | Path, 679 Field( 680 description="New manifest as YAML string or file path.", 681 ), 682 ], 683 *, 684 pre_validate: Annotated[ 685 bool, 686 Field( 687 description="Whether to validate the manifest client-side before updating.", 688 default=True, 689 ), 690 ] = True, 691) -> str: 692 """Update a custom YAML source definition in Airbyte Cloud. 693 694 Note: Only YAML (declarative) connectors are currently supported. 695 Docker-based custom sources are not yet available. 696 """ 697 check_guid_created_in_session(definition_id) 698 try: 699 processed_manifest = manifest_yaml 700 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 701 processed_manifest = Path(manifest_yaml) 702 703 workspace: CloudWorkspace = _get_cloud_workspace() 704 definition = workspace.get_custom_source_definition( 705 definition_id=definition_id, 706 definition_type="yaml", 707 ) 708 custom_source: CustomCloudSourceDefinition = definition.update_definition( 709 manifest_yaml=processed_manifest, 710 pre_validate=pre_validate, 711 ) 712 except Exception as ex: 713 return f"Failed to update custom source definition '{definition_id}': {ex}" 714 else: 715 return ( 716 "Successfully updated custom YAML source definition:\n" 717 + _get_custom_source_definition_description( 718 custom_source=custom_source, 719 ) 720 ) 721 722 723@mcp_tool( 724 domain="cloud", 725 destructive=True, 726 open_world=True, 727) 728def permanently_delete_custom_source_definition( 729 definition_id: Annotated[ 730 str, 731 Field(description="The ID of the custom source definition to delete."), 732 ], 733) -> str: 734 """Permanently delete a custom YAML source definition from Airbyte Cloud. 735 736 IMPORTANT: This operation requires the connector name to either: 737 1. Start with "delete:" (case insensitive), OR 738 2. Contain "delete-me" (case insensitive) 739 740 If the connector does not meet these requirements, the deletion will be rejected with a 741 helpful error message. Instruct the user to rename the connector appropriately to authorize 742 the deletion. 743 744 Note: Only YAML (declarative) connectors are currently supported. 745 Docker-based custom sources are not yet available. 746 """ 747 check_guid_created_in_session(definition_id) 748 workspace: CloudWorkspace = _get_cloud_workspace() 749 definition = workspace.get_custom_source_definition( 750 definition_id=definition_id, 751 definition_type="yaml", 752 ) 753 definition_name: str = definition.name # Capture name before deletion 754 definition.permanently_delete( 755 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 756 ) 757 return ( 758 f"Successfully deleted custom source definition '{definition_name}' (ID: {definition_id})" 759 ) 760 761 762@mcp_tool( 763 domain="cloud", 764 open_world=True, 765) 766def rename_cloud_source( 767 source_id: Annotated[ 768 str, 769 Field(description="The ID of the deployed source to rename."), 770 ], 771 name: Annotated[ 772 str, 773 Field(description="New name for the source."), 774 ], 775) -> str: 776 """Rename a deployed source connector on Airbyte Cloud. 777 778 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 779 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 780 Airbyte Cloud API. 781 """ 782 workspace: CloudWorkspace = _get_cloud_workspace() 783 source = workspace.get_source(source_id=source_id) 784 source.rename(name=name) 785 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 786 787 788@mcp_tool( 789 domain="cloud", 790 destructive=True, 791 open_world=True, 792) 793def update_cloud_source_config( 794 source_id: Annotated[ 795 str, 796 Field(description="The ID of the deployed source to update."), 797 ], 798 config: Annotated[ 799 dict | str, 800 Field( 801 description="New configuration for the source connector.", 802 ), 803 ], 804 config_secret_name: Annotated[ 805 str | None, 806 Field( 807 description="The name of the secret containing the configuration.", 808 default=None, 809 ), 810 ] = None, 811) -> str: 812 """Update a deployed source connector's configuration on Airbyte Cloud. 813 814 This is a destructive operation that can break existing connections if the 815 configuration is changed incorrectly. Use with caution. 816 817 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 818 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 819 Airbyte Cloud API. 820 """ 821 check_guid_created_in_session(source_id) 822 workspace: CloudWorkspace = _get_cloud_workspace() 823 source = workspace.get_source(source_id=source_id) 824 825 config_dict = resolve_config( 826 config=config, 827 config_secret_name=config_secret_name, 828 config_spec_jsonschema=None, # We don't have the spec here 829 ) 830 831 source.update_config(config=config_dict) 832 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 833 834 835@mcp_tool( 836 domain="cloud", 837 open_world=True, 838) 839def rename_cloud_destination( 840 destination_id: Annotated[ 841 str, 842 Field(description="The ID of the deployed destination to rename."), 843 ], 844 name: Annotated[ 845 str, 846 Field(description="New name for the destination."), 847 ], 848) -> str: 849 """Rename a deployed destination connector on Airbyte Cloud. 850 851 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 852 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 853 Airbyte Cloud API. 854 """ 855 workspace: CloudWorkspace = _get_cloud_workspace() 856 destination = workspace.get_destination(destination_id=destination_id) 857 destination.rename(name=name) 858 return ( 859 f"Successfully renamed destination '{destination_id}' to '{name}'. " 860 f"URL: {destination.connector_url}" 861 ) 862 863 864@mcp_tool( 865 domain="cloud", 866 destructive=True, 867 open_world=True, 868) 869def update_cloud_destination_config( 870 destination_id: Annotated[ 871 str, 872 Field(description="The ID of the deployed destination to update."), 873 ], 874 config: Annotated[ 875 dict | str, 876 Field( 877 description="New configuration for the destination connector.", 878 ), 879 ], 880 config_secret_name: Annotated[ 881 str | None, 882 Field( 883 description="The name of the secret containing the configuration.", 884 default=None, 885 ), 886 ] = None, 887) -> str: 888 """Update a deployed destination connector's configuration on Airbyte Cloud. 889 890 This is a destructive operation that can break existing connections if the 891 configuration is changed incorrectly. Use with caution. 892 893 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 894 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 895 Airbyte Cloud API. 896 """ 897 check_guid_created_in_session(destination_id) 898 workspace: CloudWorkspace = _get_cloud_workspace() 899 destination = workspace.get_destination(destination_id=destination_id) 900 901 config_dict = resolve_config( 902 config=config, 903 config_secret_name=config_secret_name, 904 config_spec_jsonschema=None, # We don't have the spec here 905 ) 906 907 destination.update_config(config=config_dict) 908 return ( 909 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 910 ) 911 912 913@mcp_tool( 914 domain="cloud", 915 open_world=True, 916) 917def rename_cloud_connection( 918 connection_id: Annotated[ 919 str, 920 Field(description="The ID of the connection to rename."), 921 ], 922 name: Annotated[ 923 str, 924 Field(description="New name for the connection."), 925 ], 926) -> str: 927 """Rename a connection on Airbyte Cloud. 928 929 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 930 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 931 Airbyte Cloud API. 932 """ 933 workspace: CloudWorkspace = _get_cloud_workspace() 934 connection = workspace.get_connection(connection_id=connection_id) 935 connection.rename(name=name) 936 return ( 937 f"Successfully renamed connection '{connection_id}' to '{name}'. " 938 f"URL: {connection.connection_url}" 939 ) 940 941 942@mcp_tool( 943 domain="cloud", 944 destructive=True, 945 open_world=True, 946) 947def set_cloud_connection_table_prefix( 948 connection_id: Annotated[ 949 str, 950 Field(description="The ID of the connection to update."), 951 ], 952 prefix: Annotated[ 953 str, 954 Field(description="New table prefix to use when syncing to the destination."), 955 ], 956) -> str: 957 """Set the table prefix for a connection on Airbyte Cloud. 958 959 This is a destructive operation that can break downstream dependencies if the 960 table prefix is changed incorrectly. Use with caution. 961 962 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 963 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 964 Airbyte Cloud API. 965 """ 966 check_guid_created_in_session(connection_id) 967 workspace: CloudWorkspace = _get_cloud_workspace() 968 connection = workspace.get_connection(connection_id=connection_id) 969 connection.set_table_prefix(prefix=prefix) 970 return ( 971 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 972 f"URL: {connection.connection_url}" 973 ) 974 975 976@mcp_tool( 977 domain="cloud", 978 destructive=True, 979 open_world=True, 980) 981def set_cloud_connection_selected_streams( 982 connection_id: Annotated[ 983 str, 984 Field(description="The ID of the connection to update."), 985 ], 986 stream_names: Annotated[ 987 str | list[str], 988 Field( 989 description=( 990 "The selected stream names to sync within the connection. " 991 "Must be an explicit stream name or list of streams." 992 ) 993 ), 994 ], 995) -> str: 996 """Set the selected streams for a connection on Airbyte Cloud. 997 998 This is a destructive operation that can break existing connections if the 999 stream selection is changed incorrectly. Use with caution. 1000 1001 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 1002 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 1003 Airbyte Cloud API. 1004 """ 1005 check_guid_created_in_session(connection_id) 1006 workspace: CloudWorkspace = _get_cloud_workspace() 1007 connection = workspace.get_connection(connection_id=connection_id) 1008 1009 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 1010 connection.set_selected_streams(stream_names=resolved_streams_list) 1011 1012 return ( 1013 f"Successfully set selected streams for connection '{connection_id}' " 1014 f"to {resolved_streams_list}. URL: {connection.connection_url}" 1015 ) 1016 1017 1018def register_cloud_ops_tools(app: FastMCP) -> None: 1019 """@private Register tools with the FastMCP app. 1020 1021 This is an internal function and should not be called directly. 1022 1023 Tools are filtered based on mode settings: 1024 - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered 1025 - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive 1026 operations are protected by runtime session checks 1027 """ 1028 register_tools(app, domain="cloud")
41@mcp_tool( 42 domain="cloud", 43 open_world=True, 44) 45def deploy_source_to_cloud( 46 source_name: Annotated[ 47 str, 48 Field(description="The name to use when deploying the source."), 49 ], 50 source_connector_name: Annotated[ 51 str, 52 Field(description="The name of the source connector (e.g., 'source-faker')."), 53 ], 54 *, 55 config: Annotated[ 56 dict | str | None, 57 Field( 58 description="The configuration for the source connector.", 59 default=None, 60 ), 61 ], 62 config_secret_name: Annotated[ 63 str | None, 64 Field( 65 description="The name of the secret containing the configuration.", 66 default=None, 67 ), 68 ], 69 unique: Annotated[ 70 bool, 71 Field( 72 description="Whether to require a unique name.", 73 default=True, 74 ), 75 ], 76) -> str: 77 """Deploy a source connector to Airbyte Cloud. 78 79 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 80 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 81 Airbyte Cloud API. 82 """ 83 try: 84 source = get_source( 85 source_connector_name, 86 install_if_missing=False, 87 ) 88 config_dict = resolve_config( 89 config=config, 90 config_secret_name=config_secret_name, 91 config_spec_jsonschema=source.config_spec, 92 ) 93 source.set_config(config_dict) 94 95 workspace: CloudWorkspace = _get_cloud_workspace() 96 deployed_source = workspace.deploy_source( 97 name=source_name, 98 source=source, 99 unique=unique, 100 ) 101 102 except Exception as ex: 103 return f"Failed to deploy source '{source_name}': {ex}" 104 else: 105 register_guid_created_in_session(deployed_source.connector_id) 106 return ( 107 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 108 f" and URL: {deployed_source.connector_url}" 109 )
Deploy a source connector to Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
112@mcp_tool( 113 domain="cloud", 114 open_world=True, 115) 116def deploy_destination_to_cloud( 117 destination_name: Annotated[ 118 str, 119 Field(description="The name to use when deploying the destination."), 120 ], 121 destination_connector_name: Annotated[ 122 str, 123 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 124 ], 125 *, 126 config: Annotated[ 127 dict | str | None, 128 Field( 129 description="The configuration for the destination connector.", 130 default=None, 131 ), 132 ], 133 config_secret_name: Annotated[ 134 str | None, 135 Field( 136 description="The name of the secret containing the configuration.", 137 default=None, 138 ), 139 ], 140 unique: Annotated[ 141 bool, 142 Field( 143 description="Whether to require a unique name.", 144 default=True, 145 ), 146 ], 147) -> str: 148 """Deploy a destination connector to Airbyte Cloud. 149 150 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 151 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 152 Airbyte Cloud API. 153 """ 154 try: 155 destination = get_destination( 156 destination_connector_name, 157 install_if_missing=False, 158 ) 159 config_dict = resolve_config( 160 config=config, 161 config_secret_name=config_secret_name, 162 config_spec_jsonschema=destination.config_spec, 163 ) 164 destination.set_config(config_dict) 165 166 workspace: CloudWorkspace = _get_cloud_workspace() 167 deployed_destination = workspace.deploy_destination( 168 name=destination_name, 169 destination=destination, 170 unique=unique, 171 ) 172 173 except Exception as ex: 174 return f"Failed to deploy destination '{destination_name}': {ex}" 175 else: 176 register_guid_created_in_session(deployed_destination.connector_id) 177 return ( 178 f"Successfully deployed destination '{destination_name}' " 179 f"with ID: {deployed_destination.connector_id}" 180 )
Deploy a destination connector to Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
183@mcp_tool( 184 domain="cloud", 185 open_world=True, 186) 187def create_connection_on_cloud( 188 connection_name: Annotated[ 189 str, 190 Field(description="The name of the connection."), 191 ], 192 source_id: Annotated[ 193 str, 194 Field(description="The ID of the deployed source."), 195 ], 196 destination_id: Annotated[ 197 str, 198 Field(description="The ID of the deployed destination."), 199 ], 200 selected_streams: Annotated[ 201 str | list[str], 202 Field( 203 description=( 204 "The selected stream names to sync within the connection. " 205 "Must be an explicit stream name or list of streams. " 206 "Cannot be empty or '*'." 207 ) 208 ), 209 ], 210 table_prefix: Annotated[ 211 str | None, 212 Field( 213 description="Optional table prefix to use when syncing to the destination.", 214 default=None, 215 ), 216 ], 217) -> str: 218 """Create a connection between a deployed source and destination on Airbyte Cloud. 219 220 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 221 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 222 Airbyte Cloud API. 223 """ 224 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 225 try: 226 workspace: CloudWorkspace = _get_cloud_workspace() 227 deployed_connection = workspace.deploy_connection( 228 connection_name=connection_name, 229 source=source_id, 230 destination=destination_id, 231 selected_streams=resolved_streams_list, 232 table_prefix=table_prefix, 233 ) 234 235 except Exception as ex: 236 return f"Failed to create connection '{connection_name}': {ex}" 237 else: 238 register_guid_created_in_session(deployed_connection.connection_id) 239 return ( 240 f"Successfully created connection '{connection_name}' " 241 f"with ID '{deployed_connection.connection_id}' and " 242 f"URL: {deployed_connection.connection_url}" 243 )
Create a connection between a deployed source and destination on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
246@mcp_tool( 247 domain="cloud", 248 open_world=True, 249) 250def run_cloud_sync( 251 connection_id: Annotated[ 252 str, 253 Field(description="The ID of the Airbyte Cloud connection."), 254 ], 255 *, 256 wait: Annotated[ 257 bool, 258 Field( 259 description="Whether to wait for the sync to complete.", 260 default=True, 261 ), 262 ], 263 wait_timeout: Annotated[ 264 int, 265 Field( 266 description="Maximum time to wait for sync completion (seconds).", 267 default=300, 268 ), 269 ], 270) -> str: 271 """Run a sync job on Airbyte Cloud. 272 273 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 274 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 275 Airbyte Cloud API. 276 """ 277 try: 278 workspace: CloudWorkspace = _get_cloud_workspace() 279 connection = workspace.get_connection(connection_id=connection_id) 280 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 281 282 except Exception as ex: 283 return f"Failed to run sync for connection '{connection_id}': {ex}" 284 else: 285 if wait: 286 status = sync_result.get_job_status() 287 return ( 288 f"Sync completed with status: {status}. " # Sync completed. 289 f"Job ID is '{sync_result.job_id}' and " 290 f"job URL is: {sync_result.job_url}" 291 ) 292 return ( 293 f"Sync started. " # Sync started. 294 f"Job ID is '{sync_result.job_id}' and " 295 f"job URL is: {sync_result.job_url}" 296 )
Run a sync job on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
299@mcp_tool( 300 domain="cloud", 301 read_only=True, 302 idempotent=True, 303 open_world=True, 304) 305def check_airbyte_cloud_workspace() -> str: 306 """Check if we have a valid Airbyte Cloud connection and return workspace info. 307 308 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 309 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 310 Airbyte Cloud API. 311 312 Returns workspace ID and workspace URL for verification. 313 """ 314 try: 315 workspace: CloudWorkspace = _get_cloud_workspace() 316 workspace.connect() 317 318 except Exception as ex: 319 return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}" 320 else: 321 return ( 322 f"✅ Successfully connected to Airbyte Cloud workspace.\n" 323 f"Workspace ID: {workspace.workspace_id}\n" 324 f"Workspace URL: {workspace.workspace_url}" 325 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
Returns workspace ID and workspace URL for verification.
328@mcp_tool( 329 domain="cloud", 330 open_world=True, 331) 332def deploy_noop_destination_to_cloud( 333 name: str = "No-op Destination", 334 *, 335 unique: bool = True, 336) -> str: 337 """Deploy the No-op destination to Airbyte Cloud for testing purposes. 338 339 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 340 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 341 Airbyte Cloud API. 342 """ 343 try: 344 destination = get_noop_destination() 345 workspace: CloudWorkspace = _get_cloud_workspace() 346 deployed_destination = workspace.deploy_destination( 347 name=name, 348 destination=destination, 349 unique=unique, 350 ) 351 except Exception as ex: 352 return f"Failed to deploy No-op Destination: {ex}" 353 else: 354 register_guid_created_in_session(deployed_destination.connector_id) 355 return ( 356 f"Successfully deployed No-op Destination " 357 f"with ID '{deployed_destination.connector_id}' and " 358 f"URL: {deployed_destination.connector_url}" 359 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
362@mcp_tool( 363 domain="cloud", 364 read_only=True, 365 idempotent=True, 366 open_world=True, 367) 368def get_cloud_sync_status( 369 connection_id: Annotated[ 370 str, 371 Field( 372 description="The ID of the Airbyte Cloud connection.", 373 ), 374 ], 375 job_id: Annotated[ 376 int | None, 377 Field( 378 description="Optional job ID. If not provided, the latest job will be used.", 379 default=None, 380 ), 381 ], 382 *, 383 include_attempts: Annotated[ 384 bool, 385 Field( 386 description="Whether to include detailed attempts information.", 387 default=False, 388 ), 389 ], 390) -> dict[str, Any]: 391 """Get the status of a sync job from the Airbyte Cloud. 392 393 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 394 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 395 Airbyte Cloud API. 396 """ 397 try: 398 workspace: CloudWorkspace = _get_cloud_workspace() 399 connection = workspace.get_connection(connection_id=connection_id) 400 401 # If a job ID is provided, get the job by ID. 402 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 403 404 if not sync_result: 405 return {"status": None, "job_id": None, "attempts": []} 406 407 result = { 408 "status": sync_result.get_job_status(), 409 "job_id": sync_result.job_id, 410 "bytes_synced": sync_result.bytes_synced, 411 "records_synced": sync_result.records_synced, 412 "start_time": sync_result.start_time.isoformat(), 413 "job_url": sync_result.job_url, 414 "attempts": [], 415 } 416 417 if include_attempts: 418 attempts = sync_result.get_attempts() 419 result["attempts"] = [ 420 { 421 "attempt_number": attempt.attempt_number, 422 "attempt_id": attempt.attempt_id, 423 "status": attempt.status, 424 "bytes_synced": attempt.bytes_synced, 425 "records_synced": attempt.records_synced, 426 "created_at": attempt.created_at.isoformat(), 427 } 428 for attempt in attempts 429 ] 430 431 return result # noqa: TRY300 432 433 except Exception as ex: 434 return { 435 "status": None, 436 "job_id": job_id, 437 "error": f"Failed to get sync status for connection '{connection_id}': {ex}", 438 "attempts": [], 439 }
Get the status of a sync job from the Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
442@mcp_tool( 443 domain="cloud", 444 read_only=True, 445 idempotent=True, 446 open_world=True, 447) 448def list_deployed_cloud_source_connectors() -> list[CloudSource]: 449 """List all deployed source connectors in the Airbyte Cloud workspace. 450 451 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 452 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 453 Airbyte Cloud API. 454 """ 455 workspace: CloudWorkspace = _get_cloud_workspace() 456 return workspace.list_sources()
List all deployed source connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
459@mcp_tool( 460 domain="cloud", 461 read_only=True, 462 idempotent=True, 463 open_world=True, 464) 465def list_deployed_cloud_destination_connectors() -> list[CloudDestination]: 466 """List all deployed destination connectors in the Airbyte Cloud workspace. 467 468 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 469 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 470 Airbyte Cloud API. 471 """ 472 workspace: CloudWorkspace = _get_cloud_workspace() 473 return workspace.list_destinations()
List all deployed destination connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
476@mcp_tool( 477 domain="cloud", 478 read_only=True, 479 idempotent=True, 480 open_world=True, 481) 482def get_cloud_sync_logs( 483 connection_id: Annotated[ 484 str, 485 Field(description="The ID of the Airbyte Cloud connection."), 486 ], 487 job_id: Annotated[ 488 int | None, 489 Field(description="Optional job ID. If not provided, the latest job will be used."), 490 ] = None, 491 attempt_number: Annotated[ 492 int | None, 493 Field( 494 description="Optional attempt number. If not provided, the latest attempt will be used." 495 ), 496 ] = None, 497) -> str: 498 """Get the logs from a sync job attempt on Airbyte Cloud. 499 500 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 501 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 502 Airbyte Cloud API. 503 """ 504 try: 505 workspace: CloudWorkspace = _get_cloud_workspace() 506 connection = workspace.get_connection(connection_id=connection_id) 507 508 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 509 510 if not sync_result: 511 return f"No sync job found for connection '{connection_id}'" 512 513 attempts = sync_result.get_attempts() 514 515 if not attempts: 516 return f"No attempts found for job '{sync_result.job_id}'" 517 518 if attempt_number is not None: 519 target_attempt = None 520 for attempt in attempts: 521 if attempt.attempt_number == attempt_number: 522 target_attempt = attempt 523 break 524 525 if target_attempt is None: 526 return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" 527 else: 528 target_attempt = max(attempts, key=lambda a: a.attempt_number) 529 530 logs = target_attempt.get_full_log_text() 531 532 if not logs: 533 return ( 534 f"No logs available for job '{sync_result.job_id}', " 535 f"attempt {target_attempt.attempt_number}" 536 ) 537 538 return logs # noqa: TRY300 539 540 except Exception as ex: 541 return f"Failed to get logs for connection '{connection_id}': {ex}"
Get the logs from a sync job attempt on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
544@mcp_tool( 545 domain="cloud", 546 read_only=True, 547 idempotent=True, 548 open_world=True, 549) 550def list_deployed_cloud_connections() -> list[CloudConnection]: 551 """List all deployed connections in the Airbyte Cloud workspace. 552 553 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 554 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 555 Airbyte Cloud API. 556 """ 557 workspace: CloudWorkspace = _get_cloud_workspace() 558 return workspace.list_connections()
List all deployed connections in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
575@mcp_tool( 576 domain="cloud", 577 open_world=True, 578) 579def publish_custom_source_definition( 580 name: Annotated[ 581 str, 582 Field(description="The name for the custom connector definition."), 583 ], 584 *, 585 manifest_yaml: Annotated[ 586 str | Path | None, 587 Field( 588 description=( 589 "The Low-code CDK manifest as a YAML string or file path. " 590 "Required for YAML connectors." 591 ), 592 default=None, 593 ), 594 ] = None, 595 unique: Annotated[ 596 bool, 597 Field( 598 description="Whether to require a unique name.", 599 default=True, 600 ), 601 ] = True, 602 pre_validate: Annotated[ 603 bool, 604 Field( 605 description="Whether to validate the manifest client-side before publishing.", 606 default=True, 607 ), 608 ] = True, 609) -> str: 610 """Publish a custom YAML source connector definition to Airbyte Cloud. 611 612 Note: Only YAML (declarative) connectors are currently supported. 613 Docker-based custom sources are not yet available. 614 """ 615 try: 616 processed_manifest = manifest_yaml 617 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 618 processed_manifest = Path(manifest_yaml) 619 620 workspace: CloudWorkspace = _get_cloud_workspace() 621 custom_source = workspace.publish_custom_source_definition( 622 name=name, 623 manifest_yaml=processed_manifest, 624 unique=unique, 625 pre_validate=pre_validate, 626 ) 627 except Exception as ex: 628 return f"Failed to publish custom source definition '{name}': {ex}" 629 else: 630 register_guid_created_in_session(custom_source.definition_id) 631 return ( 632 "Successfully published custom YAML source definition:\n" 633 + _get_custom_source_definition_description( 634 custom_source=custom_source, 635 ) 636 + "\n" 637 )
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
640@mcp_tool( 641 domain="cloud", 642 read_only=True, 643 idempotent=True, 644 open_world=True, 645) 646def list_custom_source_definitions() -> list[dict[str, Any]]: 647 """List custom YAML source definitions in the Airbyte Cloud workspace. 648 649 Note: Only YAML (declarative) connectors are currently supported. 650 Docker-based custom sources are not yet available. 651 """ 652 workspace: CloudWorkspace = _get_cloud_workspace() 653 definitions = workspace.list_custom_source_definitions( 654 definition_type="yaml", 655 ) 656 657 return [ 658 { 659 "definition_id": d.definition_id, 660 "name": d.name, 661 "version": d.version, 662 "connector_builder_project_url": d.connector_builder_project_url, 663 } 664 for d in definitions 665 ]
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
668@mcp_tool( 669 domain="cloud", 670 destructive=True, 671 open_world=True, 672) 673def update_custom_source_definition( 674 definition_id: Annotated[ 675 str, 676 Field(description="The ID of the definition to update."), 677 ], 678 manifest_yaml: Annotated[ 679 str | Path, 680 Field( 681 description="New manifest as YAML string or file path.", 682 ), 683 ], 684 *, 685 pre_validate: Annotated[ 686 bool, 687 Field( 688 description="Whether to validate the manifest client-side before updating.", 689 default=True, 690 ), 691 ] = True, 692) -> str: 693 """Update a custom YAML source definition in Airbyte Cloud. 694 695 Note: Only YAML (declarative) connectors are currently supported. 696 Docker-based custom sources are not yet available. 697 """ 698 check_guid_created_in_session(definition_id) 699 try: 700 processed_manifest = manifest_yaml 701 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 702 processed_manifest = Path(manifest_yaml) 703 704 workspace: CloudWorkspace = _get_cloud_workspace() 705 definition = workspace.get_custom_source_definition( 706 definition_id=definition_id, 707 definition_type="yaml", 708 ) 709 custom_source: CustomCloudSourceDefinition = definition.update_definition( 710 manifest_yaml=processed_manifest, 711 pre_validate=pre_validate, 712 ) 713 except Exception as ex: 714 return f"Failed to update custom source definition '{definition_id}': {ex}" 715 else: 716 return ( 717 "Successfully updated custom YAML source definition:\n" 718 + _get_custom_source_definition_description( 719 custom_source=custom_source, 720 ) 721 )
Update a custom YAML source definition in Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
724@mcp_tool( 725 domain="cloud", 726 destructive=True, 727 open_world=True, 728) 729def permanently_delete_custom_source_definition( 730 definition_id: Annotated[ 731 str, 732 Field(description="The ID of the custom source definition to delete."), 733 ], 734) -> str: 735 """Permanently delete a custom YAML source definition from Airbyte Cloud. 736 737 IMPORTANT: This operation requires the connector name to either: 738 1. Start with "delete:" (case insensitive), OR 739 2. Contain "delete-me" (case insensitive) 740 741 If the connector does not meet these requirements, the deletion will be rejected with a 742 helpful error message. Instruct the user to rename the connector appropriately to authorize 743 the deletion. 744 745 Note: Only YAML (declarative) connectors are currently supported. 746 Docker-based custom sources are not yet available. 747 """ 748 check_guid_created_in_session(definition_id) 749 workspace: CloudWorkspace = _get_cloud_workspace() 750 definition = workspace.get_custom_source_definition( 751 definition_id=definition_id, 752 definition_type="yaml", 753 ) 754 definition_name: str = definition.name # Capture name before deletion 755 definition.permanently_delete( 756 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 757 ) 758 return ( 759 f"Successfully deleted custom source definition '{definition_name}' (ID: {definition_id})" 760 )
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to either:
- Start with "delete:" (case insensitive), OR
- Contain "delete-me" (case insensitive)
If the connector does not meet these requirements, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
763@mcp_tool( 764 domain="cloud", 765 open_world=True, 766) 767def rename_cloud_source( 768 source_id: Annotated[ 769 str, 770 Field(description="The ID of the deployed source to rename."), 771 ], 772 name: Annotated[ 773 str, 774 Field(description="New name for the source."), 775 ], 776) -> str: 777 """Rename a deployed source connector on Airbyte Cloud. 778 779 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 780 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 781 Airbyte Cloud API. 782 """ 783 workspace: CloudWorkspace = _get_cloud_workspace() 784 source = workspace.get_source(source_id=source_id) 785 source.rename(name=name) 786 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
Rename a deployed source connector on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
789@mcp_tool( 790 domain="cloud", 791 destructive=True, 792 open_world=True, 793) 794def update_cloud_source_config( 795 source_id: Annotated[ 796 str, 797 Field(description="The ID of the deployed source to update."), 798 ], 799 config: Annotated[ 800 dict | str, 801 Field( 802 description="New configuration for the source connector.", 803 ), 804 ], 805 config_secret_name: Annotated[ 806 str | None, 807 Field( 808 description="The name of the secret containing the configuration.", 809 default=None, 810 ), 811 ] = None, 812) -> str: 813 """Update a deployed source connector's configuration on Airbyte Cloud. 814 815 This is a destructive operation that can break existing connections if the 816 configuration is changed incorrectly. Use with caution. 817 818 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 819 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 820 Airbyte Cloud API. 821 """ 822 check_guid_created_in_session(source_id) 823 workspace: CloudWorkspace = _get_cloud_workspace() 824 source = workspace.get_source(source_id=source_id) 825 826 config_dict = resolve_config( 827 config=config, 828 config_secret_name=config_secret_name, 829 config_spec_jsonschema=None, # We don't have the spec here 830 ) 831 832 source.update_config(config=config_dict) 833 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
836@mcp_tool( 837 domain="cloud", 838 open_world=True, 839) 840def rename_cloud_destination( 841 destination_id: Annotated[ 842 str, 843 Field(description="The ID of the deployed destination to rename."), 844 ], 845 name: Annotated[ 846 str, 847 Field(description="New name for the destination."), 848 ], 849) -> str: 850 """Rename a deployed destination connector on Airbyte Cloud. 851 852 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 853 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 854 Airbyte Cloud API. 855 """ 856 workspace: CloudWorkspace = _get_cloud_workspace() 857 destination = workspace.get_destination(destination_id=destination_id) 858 destination.rename(name=name) 859 return ( 860 f"Successfully renamed destination '{destination_id}' to '{name}'. " 861 f"URL: {destination.connector_url}" 862 )
Rename a deployed destination connector on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
865@mcp_tool( 866 domain="cloud", 867 destructive=True, 868 open_world=True, 869) 870def update_cloud_destination_config( 871 destination_id: Annotated[ 872 str, 873 Field(description="The ID of the deployed destination to update."), 874 ], 875 config: Annotated[ 876 dict | str, 877 Field( 878 description="New configuration for the destination connector.", 879 ), 880 ], 881 config_secret_name: Annotated[ 882 str | None, 883 Field( 884 description="The name of the secret containing the configuration.", 885 default=None, 886 ), 887 ] = None, 888) -> str: 889 """Update a deployed destination connector's configuration on Airbyte Cloud. 890 891 This is a destructive operation that can break existing connections if the 892 configuration is changed incorrectly. Use with caution. 893 894 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 895 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 896 Airbyte Cloud API. 897 """ 898 check_guid_created_in_session(destination_id) 899 workspace: CloudWorkspace = _get_cloud_workspace() 900 destination = workspace.get_destination(destination_id=destination_id) 901 902 config_dict = resolve_config( 903 config=config, 904 config_secret_name=config_secret_name, 905 config_spec_jsonschema=None, # We don't have the spec here 906 ) 907 908 destination.update_config(config=config_dict) 909 return ( 910 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 911 )
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
914@mcp_tool( 915 domain="cloud", 916 open_world=True, 917) 918def rename_cloud_connection( 919 connection_id: Annotated[ 920 str, 921 Field(description="The ID of the connection to rename."), 922 ], 923 name: Annotated[ 924 str, 925 Field(description="New name for the connection."), 926 ], 927) -> str: 928 """Rename a connection on Airbyte Cloud. 929 930 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 931 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 932 Airbyte Cloud API. 933 """ 934 workspace: CloudWorkspace = _get_cloud_workspace() 935 connection = workspace.get_connection(connection_id=connection_id) 936 connection.rename(name=name) 937 return ( 938 f"Successfully renamed connection '{connection_id}' to '{name}'. " 939 f"URL: {connection.connection_url}" 940 )
Rename a connection on Airbyte Cloud.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
943@mcp_tool( 944 domain="cloud", 945 destructive=True, 946 open_world=True, 947) 948def set_cloud_connection_table_prefix( 949 connection_id: Annotated[ 950 str, 951 Field(description="The ID of the connection to update."), 952 ], 953 prefix: Annotated[ 954 str, 955 Field(description="New table prefix to use when syncing to the destination."), 956 ], 957) -> str: 958 """Set the table prefix for a connection on Airbyte Cloud. 959 960 This is a destructive operation that can break downstream dependencies if the 961 table prefix is changed incorrectly. Use with caution. 962 963 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 964 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 965 Airbyte Cloud API. 966 """ 967 check_guid_created_in_session(connection_id) 968 workspace: CloudWorkspace = _get_cloud_workspace() 969 connection = workspace.get_connection(connection_id=connection_id) 970 connection.set_table_prefix(prefix=prefix) 971 return ( 972 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 973 f"URL: {connection.connection_url}" 974 )
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.
977@mcp_tool( 978 domain="cloud", 979 destructive=True, 980 open_world=True, 981) 982def set_cloud_connection_selected_streams( 983 connection_id: Annotated[ 984 str, 985 Field(description="The ID of the connection to update."), 986 ], 987 stream_names: Annotated[ 988 str | list[str], 989 Field( 990 description=( 991 "The selected stream names to sync within the connection. " 992 "Must be an explicit stream name or list of streams." 993 ) 994 ), 995 ], 996) -> str: 997 """Set the selected streams for a connection on Airbyte Cloud. 998 999 This is a destructive operation that can break existing connections if the 1000 stream selection is changed incorrectly. Use with caution. 1001 1002 By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, 1003 and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the 1004 Airbyte Cloud API. 1005 """ 1006 check_guid_created_in_session(connection_id) 1007 workspace: CloudWorkspace = _get_cloud_workspace() 1008 connection = workspace.get_connection(connection_id=connection_id) 1009 1010 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 1011 connection.set_selected_streams(stream_names=resolved_streams_list) 1012 1013 return ( 1014 f"Successfully set selected streams for connection '{connection_id}' " 1015 f"to {resolved_streams_list}. URL: {connection.connection_url}" 1016 )
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLIENT_ID, AIRBYTE_CLIENT_SECRET, AIRBYTE_WORKSPACE_ID,
and AIRBYTE_API_ROOT environment variables will be used to authenticate with the
Airbyte Cloud API.