airbyte.mcp.cloud_ops
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from pathlib import Path 5from typing import Annotated, Any, cast 6 7from fastmcp import FastMCP 8from pydantic import BaseModel, Field 9 10from airbyte import cloud, get_destination, get_source 11from airbyte._util import api_util 12from airbyte.cloud.auth import ( 13 resolve_cloud_api_url, 14 resolve_cloud_client_id, 15 resolve_cloud_client_secret, 16 resolve_cloud_workspace_id, 17) 18from airbyte.cloud.connectors import CustomCloudSourceDefinition 19from airbyte.cloud.constants import FAILED_STATUSES 20from airbyte.cloud.workspaces import CloudWorkspace 21from airbyte.destinations.util import get_noop_destination 22from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 23from airbyte.mcp._tool_utils import ( 24 check_guid_created_in_session, 25 mcp_tool, 26 register_guid_created_in_session, 27 register_tools, 28) 29from airbyte.mcp._util import resolve_config, resolve_list_of_strings 30from airbyte.secrets import SecretString 31 32 33CLOUD_AUTH_TIP_TEXT = ( 34 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 35 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 36 "will be used to authenticate with the Airbyte Cloud API." 37) 38WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 39 40 41class CloudSourceResult(BaseModel): 42 """Information about a deployed source connector in Airbyte Cloud.""" 43 44 id: str 45 """The source ID.""" 46 name: str 47 """Display name of the source.""" 48 url: str 49 """Web URL for managing this source in Airbyte Cloud.""" 50 51 52class CloudDestinationResult(BaseModel): 53 """Information about a deployed destination connector in Airbyte Cloud.""" 54 55 id: str 56 """The destination ID.""" 57 name: str 58 """Display name of the destination.""" 59 url: str 60 """Web URL for managing this destination in Airbyte Cloud.""" 61 62 63class CloudConnectionResult(BaseModel): 64 """Information about a deployed connection in Airbyte Cloud.""" 65 66 id: str 67 """The connection ID.""" 68 name: str 69 """Display name of the connection.""" 70 url: str 71 """Web URL for managing this connection in Airbyte Cloud.""" 72 source_id: str 73 """ID of the source used by this connection.""" 74 destination_id: str 75 """ID of the destination used by this connection.""" 76 last_job_status: str | None = None 77 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 78 Only populated when with_connection_status=True.""" 79 last_job_id: int | None = None 80 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 81 last_job_time: str | None = None 82 """ISO 8601 timestamp of the most recent completed sync. 83 Only populated when with_connection_status=True.""" 84 currently_running_job_id: int | None = None 85 """Job ID of a currently running sync, if any. 86 Only populated when with_connection_status=True.""" 87 currently_running_job_start_time: str | None = None 88 """ISO 8601 timestamp of when the currently running sync started. 89 Only populated when with_connection_status=True.""" 90 91 92class CloudSourceDetails(BaseModel): 93 """Detailed information about a deployed source connector in Airbyte Cloud.""" 94 95 source_id: str 96 """The source ID.""" 97 source_name: str 98 """Display name of the source.""" 99 source_url: str 100 """Web URL for managing this source in Airbyte Cloud.""" 101 connector_definition_id: str 102 """The connector definition ID (e.g., the ID for 'source-postgres').""" 103 104 105class CloudDestinationDetails(BaseModel): 106 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 107 108 destination_id: str 109 """The destination ID.""" 110 destination_name: str 111 """Display name of the destination.""" 112 destination_url: str 113 """Web URL for managing this destination in Airbyte Cloud.""" 114 connector_definition_id: str 115 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 116 117 118class CloudConnectionDetails(BaseModel): 119 """Detailed information about a deployed connection in Airbyte Cloud.""" 120 121 connection_id: str 122 """The connection ID.""" 123 connection_name: str 124 """Display name of the connection.""" 125 connection_url: str 126 """Web URL for managing this connection in Airbyte Cloud.""" 127 source_id: str 128 """ID of the source used by this connection.""" 129 source_name: str 130 """Display name of the source.""" 131 destination_id: str 132 """ID of the destination used by this connection.""" 133 destination_name: str 134 """Display name of the destination.""" 135 selected_streams: list[str] 136 """List of stream names selected for syncing.""" 137 table_prefix: str | None 138 """Table prefix applied when syncing to the destination.""" 139 140 141class CloudOrganizationResult(BaseModel): 142 """Information about an organization in Airbyte Cloud.""" 143 144 id: str 145 """The organization ID.""" 146 name: str 147 """Display name of the organization.""" 148 email: str 149 """Email associated with the organization.""" 150 151 152class CloudWorkspaceResult(BaseModel): 153 """Information about a workspace in Airbyte Cloud.""" 154 155 id: str 156 """The workspace ID.""" 157 name: str 158 """Display name of the workspace.""" 159 organization_id: str 160 """ID of the organization this workspace belongs to.""" 161 162 163class LogReadResult(BaseModel): 164 """Result of reading sync logs with pagination support.""" 165 166 job_id: int 167 """The job ID the logs belong to.""" 168 attempt_number: int 169 """The attempt number the logs belong to.""" 170 log_text: str 171 """The string containing the log text we are returning.""" 172 log_text_start_line: int 173 """1-based line index of the first line returned.""" 174 log_text_line_count: int 175 """Count of lines we are returning.""" 176 total_log_lines_available: int 177 """Total number of log lines available, shows if any lines were missed due to the limit.""" 178 179 180def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: 181 """Get an authenticated CloudWorkspace. 182 183 Args: 184 workspace_id: Optional workspace ID. If not provided, uses the 185 AIRBYTE_CLOUD_WORKSPACE_ID environment variable. 186 """ 187 return CloudWorkspace( 188 workspace_id=resolve_cloud_workspace_id(workspace_id), 189 client_id=resolve_cloud_client_id(), 190 client_secret=resolve_cloud_client_secret(), 191 api_root=resolve_cloud_api_url(), 192 ) 193 194 195@mcp_tool( 196 domain="cloud", 197 open_world=True, 198 extra_help_text=CLOUD_AUTH_TIP_TEXT, 199) 200def deploy_source_to_cloud( 201 source_name: Annotated[ 202 str, 203 Field(description="The name to use when deploying the source."), 204 ], 205 source_connector_name: Annotated[ 206 str, 207 Field(description="The name of the source connector (e.g., 'source-faker')."), 208 ], 209 *, 210 workspace_id: Annotated[ 211 str | None, 212 Field( 213 description=WORKSPACE_ID_TIP_TEXT, 214 default=None, 215 ), 216 ], 217 config: Annotated[ 218 dict | str | None, 219 Field( 220 description="The configuration for the source connector.", 221 default=None, 222 ), 223 ], 224 config_secret_name: Annotated[ 225 str | None, 226 Field( 227 description="The name of the secret containing the configuration.", 228 default=None, 229 ), 230 ], 231 unique: Annotated[ 232 bool, 233 Field( 234 description="Whether to require a unique name.", 235 default=True, 236 ), 237 ], 238) -> str: 239 """Deploy a source connector to Airbyte Cloud.""" 240 source = get_source( 241 source_connector_name, 242 no_executor=True, 243 ) 244 config_dict = resolve_config( 245 config=config, 246 config_secret_name=config_secret_name, 247 config_spec_jsonschema=source.config_spec, 248 ) 249 source.set_config(config_dict, validate=True) 250 251 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 252 deployed_source = workspace.deploy_source( 253 name=source_name, 254 source=source, 255 unique=unique, 256 ) 257 258 register_guid_created_in_session(deployed_source.connector_id) 259 return ( 260 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 261 f" and URL: {deployed_source.connector_url}" 262 ) 263 264 265@mcp_tool( 266 domain="cloud", 267 open_world=True, 268 extra_help_text=CLOUD_AUTH_TIP_TEXT, 269) 270def deploy_destination_to_cloud( 271 destination_name: Annotated[ 272 str, 273 Field(description="The name to use when deploying the destination."), 274 ], 275 destination_connector_name: Annotated[ 276 str, 277 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 278 ], 279 *, 280 workspace_id: Annotated[ 281 str | None, 282 Field( 283 description=WORKSPACE_ID_TIP_TEXT, 284 default=None, 285 ), 286 ], 287 config: Annotated[ 288 dict | str | None, 289 Field( 290 description="The configuration for the destination connector.", 291 default=None, 292 ), 293 ], 294 config_secret_name: Annotated[ 295 str | None, 296 Field( 297 description="The name of the secret containing the configuration.", 298 default=None, 299 ), 300 ], 301 unique: Annotated[ 302 bool, 303 Field( 304 description="Whether to require a unique name.", 305 default=True, 306 ), 307 ], 308) -> str: 309 """Deploy a destination connector to Airbyte Cloud.""" 310 destination = get_destination( 311 destination_connector_name, 312 no_executor=True, 313 ) 314 config_dict = resolve_config( 315 config=config, 316 config_secret_name=config_secret_name, 317 config_spec_jsonschema=destination.config_spec, 318 ) 319 destination.set_config(config_dict, validate=True) 320 321 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 322 deployed_destination = workspace.deploy_destination( 323 name=destination_name, 324 destination=destination, 325 unique=unique, 326 ) 327 328 register_guid_created_in_session(deployed_destination.connector_id) 329 return ( 330 f"Successfully deployed destination '{destination_name}' " 331 f"with ID: {deployed_destination.connector_id}" 332 ) 333 334 335@mcp_tool( 336 domain="cloud", 337 open_world=True, 338 extra_help_text=CLOUD_AUTH_TIP_TEXT, 339) 340def create_connection_on_cloud( 341 connection_name: Annotated[ 342 str, 343 Field(description="The name of the connection."), 344 ], 345 source_id: Annotated[ 346 str, 347 Field(description="The ID of the deployed source."), 348 ], 349 destination_id: Annotated[ 350 str, 351 Field(description="The ID of the deployed destination."), 352 ], 353 selected_streams: Annotated[ 354 str | list[str], 355 Field( 356 description=( 357 "The selected stream names to sync within the connection. " 358 "Must be an explicit stream name or list of streams. " 359 "Cannot be empty or '*'." 360 ) 361 ), 362 ], 363 *, 364 workspace_id: Annotated[ 365 str | None, 366 Field( 367 description=WORKSPACE_ID_TIP_TEXT, 368 default=None, 369 ), 370 ], 371 table_prefix: Annotated[ 372 str | None, 373 Field( 374 description="Optional table prefix to use when syncing to the destination.", 375 default=None, 376 ), 377 ], 378) -> str: 379 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 380 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 381 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 382 deployed_connection = workspace.deploy_connection( 383 connection_name=connection_name, 384 source=source_id, 385 destination=destination_id, 386 selected_streams=resolved_streams_list, 387 table_prefix=table_prefix, 388 ) 389 390 register_guid_created_in_session(deployed_connection.connection_id) 391 return ( 392 f"Successfully created connection '{connection_name}' " 393 f"with ID '{deployed_connection.connection_id}' and " 394 f"URL: {deployed_connection.connection_url}" 395 ) 396 397 398@mcp_tool( 399 domain="cloud", 400 open_world=True, 401 extra_help_text=CLOUD_AUTH_TIP_TEXT, 402) 403def run_cloud_sync( 404 connection_id: Annotated[ 405 str, 406 Field(description="The ID of the Airbyte Cloud connection."), 407 ], 408 *, 409 workspace_id: Annotated[ 410 str | None, 411 Field( 412 description=WORKSPACE_ID_TIP_TEXT, 413 default=None, 414 ), 415 ], 416 wait: Annotated[ 417 bool, 418 Field( 419 description=( 420 "Whether to wait for the sync to complete. Since a sync can take between several " 421 "minutes and several hours, this option is not recommended for most " 422 "scenarios." 423 ), 424 default=False, 425 ), 426 ], 427 wait_timeout: Annotated[ 428 int, 429 Field( 430 description="Maximum time to wait for sync completion (seconds).", 431 default=300, 432 ), 433 ], 434) -> str: 435 """Run a sync job on Airbyte Cloud.""" 436 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 437 connection = workspace.get_connection(connection_id=connection_id) 438 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 439 440 if wait: 441 status = sync_result.get_job_status() 442 return ( 443 f"Sync completed with status: {status}. " 444 f"Job ID is '{sync_result.job_id}' and " 445 f"job URL is: {sync_result.job_url}" 446 ) 447 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 448 449 450@mcp_tool( 451 domain="cloud", 452 read_only=True, 453 idempotent=True, 454 open_world=True, 455 extra_help_text=CLOUD_AUTH_TIP_TEXT, 456) 457def check_airbyte_cloud_workspace( 458 *, 459 workspace_id: Annotated[ 460 str | None, 461 Field( 462 description=WORKSPACE_ID_TIP_TEXT, 463 default=None, 464 ), 465 ], 466) -> str: 467 """Check if we have a valid Airbyte Cloud connection and return workspace info. 468 469 Returns workspace ID and workspace URL for verification. 470 """ 471 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 472 workspace.connect() 473 474 return ( 475 f"✅ Successfully connected to Airbyte Cloud workspace.\n" 476 f"Workspace ID: {workspace.workspace_id}\n" 477 f"Workspace URL: {workspace.workspace_url}" 478 ) 479 480 481@mcp_tool( 482 domain="cloud", 483 open_world=True, 484 extra_help_text=CLOUD_AUTH_TIP_TEXT, 485) 486def deploy_noop_destination_to_cloud( 487 name: str = "No-op Destination", 488 *, 489 workspace_id: Annotated[ 490 str | None, 491 Field( 492 description=WORKSPACE_ID_TIP_TEXT, 493 default=None, 494 ), 495 ], 496 unique: bool = True, 497) -> str: 498 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 499 destination = get_noop_destination() 500 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 501 deployed_destination = workspace.deploy_destination( 502 name=name, 503 destination=destination, 504 unique=unique, 505 ) 506 register_guid_created_in_session(deployed_destination.connector_id) 507 return ( 508 f"Successfully deployed No-op Destination " 509 f"with ID '{deployed_destination.connector_id}' and " 510 f"URL: {deployed_destination.connector_url}" 511 ) 512 513 514@mcp_tool( 515 domain="cloud", 516 read_only=True, 517 idempotent=True, 518 open_world=True, 519 extra_help_text=CLOUD_AUTH_TIP_TEXT, 520) 521def get_cloud_sync_status( 522 connection_id: Annotated[ 523 str, 524 Field( 525 description="The ID of the Airbyte Cloud connection.", 526 ), 527 ], 528 job_id: Annotated[ 529 int | None, 530 Field( 531 description="Optional job ID. If not provided, the latest job will be used.", 532 default=None, 533 ), 534 ], 535 *, 536 workspace_id: Annotated[ 537 str | None, 538 Field( 539 description=WORKSPACE_ID_TIP_TEXT, 540 default=None, 541 ), 542 ], 543 include_attempts: Annotated[ 544 bool, 545 Field( 546 description="Whether to include detailed attempts information.", 547 default=False, 548 ), 549 ], 550) -> dict[str, Any]: 551 """Get the status of a sync job from the Airbyte Cloud.""" 552 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 553 connection = workspace.get_connection(connection_id=connection_id) 554 555 # If a job ID is provided, get the job by ID. 556 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 557 558 if not sync_result: 559 return {"status": None, "job_id": None, "attempts": []} 560 561 result = { 562 "status": sync_result.get_job_status(), 563 "job_id": sync_result.job_id, 564 "bytes_synced": sync_result.bytes_synced, 565 "records_synced": sync_result.records_synced, 566 "start_time": sync_result.start_time.isoformat(), 567 "job_url": sync_result.job_url, 568 "attempts": [], 569 } 570 571 if include_attempts: 572 attempts = sync_result.get_attempts() 573 result["attempts"] = [ 574 { 575 "attempt_number": attempt.attempt_number, 576 "attempt_id": attempt.attempt_id, 577 "status": attempt.status, 578 "bytes_synced": attempt.bytes_synced, 579 "records_synced": attempt.records_synced, 580 "created_at": attempt.created_at.isoformat(), 581 } 582 for attempt in attempts 583 ] 584 585 return result 586 587 588@mcp_tool( 589 domain="cloud", 590 read_only=True, 591 idempotent=True, 592 open_world=True, 593 extra_help_text=CLOUD_AUTH_TIP_TEXT, 594) 595def list_deployed_cloud_source_connectors( 596 *, 597 workspace_id: Annotated[ 598 str | None, 599 Field( 600 description=WORKSPACE_ID_TIP_TEXT, 601 default=None, 602 ), 603 ], 604 name_contains: Annotated[ 605 str | None, 606 Field( 607 description="Optional case-insensitive substring to filter sources by name", 608 default=None, 609 ), 610 ], 611 max_items_limit: Annotated[ 612 int | None, 613 Field( 614 description="Optional maximum number of items to return (default: no limit)", 615 default=None, 616 ), 617 ], 618) -> list[CloudSourceResult]: 619 """List all deployed source connectors in the Airbyte Cloud workspace.""" 620 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 621 sources = workspace.list_sources() 622 623 # Filter by name if requested 624 if name_contains: 625 needle = name_contains.lower() 626 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 627 628 # Apply limit if requested 629 if max_items_limit is not None: 630 sources = sources[:max_items_limit] 631 632 # Note: name and url are guaranteed non-null from list API responses 633 return [ 634 CloudSourceResult( 635 id=source.source_id, 636 name=cast(str, source.name), 637 url=cast(str, source.connector_url), 638 ) 639 for source in sources 640 ] 641 642 643@mcp_tool( 644 domain="cloud", 645 read_only=True, 646 idempotent=True, 647 open_world=True, 648 extra_help_text=CLOUD_AUTH_TIP_TEXT, 649) 650def list_deployed_cloud_destination_connectors( 651 *, 652 workspace_id: Annotated[ 653 str | None, 654 Field( 655 description=WORKSPACE_ID_TIP_TEXT, 656 default=None, 657 ), 658 ], 659 name_contains: Annotated[ 660 str | None, 661 Field( 662 description="Optional case-insensitive substring to filter destinations by name", 663 default=None, 664 ), 665 ], 666 max_items_limit: Annotated[ 667 int | None, 668 Field( 669 description="Optional maximum number of items to return (default: no limit)", 670 default=None, 671 ), 672 ], 673) -> list[CloudDestinationResult]: 674 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 675 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 676 destinations = workspace.list_destinations() 677 678 # Filter by name if requested 679 if name_contains: 680 needle = name_contains.lower() 681 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 682 683 # Apply limit if requested 684 if max_items_limit is not None: 685 destinations = destinations[:max_items_limit] 686 687 # Note: name and url are guaranteed non-null from list API responses 688 return [ 689 CloudDestinationResult( 690 id=destination.destination_id, 691 name=cast(str, destination.name), 692 url=cast(str, destination.connector_url), 693 ) 694 for destination in destinations 695 ] 696 697 698@mcp_tool( 699 domain="cloud", 700 read_only=True, 701 idempotent=True, 702 open_world=True, 703 extra_help_text=CLOUD_AUTH_TIP_TEXT, 704) 705def describe_cloud_source( 706 source_id: Annotated[ 707 str, 708 Field(description="The ID of the source to describe."), 709 ], 710 *, 711 workspace_id: Annotated[ 712 str | None, 713 Field( 714 description=WORKSPACE_ID_TIP_TEXT, 715 default=None, 716 ), 717 ], 718) -> CloudSourceDetails: 719 """Get detailed information about a specific deployed source connector.""" 720 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 721 source = workspace.get_source(source_id=source_id) 722 723 # Access name property to ensure _connector_info is populated 724 source_name = cast(str, source.name) 725 726 return CloudSourceDetails( 727 source_id=source.source_id, 728 source_name=source_name, 729 source_url=source.connector_url, 730 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 731 ) 732 733 734@mcp_tool( 735 domain="cloud", 736 read_only=True, 737 idempotent=True, 738 open_world=True, 739 extra_help_text=CLOUD_AUTH_TIP_TEXT, 740) 741def describe_cloud_destination( 742 destination_id: Annotated[ 743 str, 744 Field(description="The ID of the destination to describe."), 745 ], 746 *, 747 workspace_id: Annotated[ 748 str | None, 749 Field( 750 description=WORKSPACE_ID_TIP_TEXT, 751 default=None, 752 ), 753 ], 754) -> CloudDestinationDetails: 755 """Get detailed information about a specific deployed destination connector.""" 756 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 757 destination = workspace.get_destination(destination_id=destination_id) 758 759 # Access name property to ensure _connector_info is populated 760 destination_name = cast(str, destination.name) 761 762 return CloudDestinationDetails( 763 destination_id=destination.destination_id, 764 destination_name=destination_name, 765 destination_url=destination.connector_url, 766 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 767 ) 768 769 770@mcp_tool( 771 domain="cloud", 772 read_only=True, 773 idempotent=True, 774 open_world=True, 775 extra_help_text=CLOUD_AUTH_TIP_TEXT, 776) 777def describe_cloud_connection( 778 connection_id: Annotated[ 779 str, 780 Field(description="The ID of the connection to describe."), 781 ], 782 *, 783 workspace_id: Annotated[ 784 str | None, 785 Field( 786 description=WORKSPACE_ID_TIP_TEXT, 787 default=None, 788 ), 789 ], 790) -> CloudConnectionDetails: 791 """Get detailed information about a specific deployed connection.""" 792 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 793 connection = workspace.get_connection(connection_id=connection_id) 794 795 return CloudConnectionDetails( 796 connection_id=connection.connection_id, 797 connection_name=cast(str, connection.name), 798 connection_url=cast(str, connection.connection_url), 799 source_id=connection.source_id, 800 source_name=cast(str, connection.source.name), 801 destination_id=connection.destination_id, 802 destination_name=cast(str, connection.destination.name), 803 selected_streams=connection.stream_names, 804 table_prefix=connection.table_prefix, 805 ) 806 807 808@mcp_tool( 809 domain="cloud", 810 read_only=True, 811 idempotent=True, 812 open_world=True, 813 extra_help_text=CLOUD_AUTH_TIP_TEXT, 814) 815def get_cloud_sync_logs( 816 connection_id: Annotated[ 817 str, 818 Field(description="The ID of the Airbyte Cloud connection."), 819 ], 820 job_id: Annotated[ 821 int | None, 822 Field(description="Optional job ID. If not provided, the latest job will be used."), 823 ] = None, 824 attempt_number: Annotated[ 825 int | None, 826 Field( 827 description="Optional attempt number. If not provided, the latest attempt will be used." 828 ), 829 ] = None, 830 *, 831 workspace_id: Annotated[ 832 str | None, 833 Field( 834 description=WORKSPACE_ID_TIP_TEXT, 835 default=None, 836 ), 837 ], 838 max_lines: Annotated[ 839 int, 840 Field( 841 description=( 842 "Maximum number of lines to return. " 843 "Defaults to 4000 if not specified. " 844 "If '0' is provided, no limit is applied." 845 ), 846 default=4000, 847 ), 848 ], 849 from_tail: Annotated[ 850 bool | None, 851 Field( 852 description=( 853 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 854 "Defaults to True if `line_offset` is not specified. " 855 "Cannot combine `from_tail=True` with `line_offset`." 856 ), 857 default=None, 858 ), 859 ], 860 line_offset: Annotated[ 861 int | None, 862 Field( 863 description=( 864 "Number of lines to skip from the beginning of the logs. " 865 "Cannot be combined with `from_tail=True`." 866 ), 867 default=None, 868 ), 869 ], 870) -> LogReadResult: 871 """Get the logs from a sync job attempt on Airbyte Cloud.""" 872 # Validate that line_offset and from_tail are not both set 873 if line_offset is not None and from_tail: 874 raise PyAirbyteInputError( 875 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 876 context={"line_offset": line_offset, "from_tail": from_tail}, 877 ) 878 879 if from_tail is None and line_offset is None: 880 from_tail = True 881 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 882 connection = workspace.get_connection(connection_id=connection_id) 883 884 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 885 886 if not sync_result: 887 raise AirbyteMissingResourceError( 888 resource_type="sync job", 889 resource_name_or_id=connection_id, 890 ) 891 892 attempts = sync_result.get_attempts() 893 894 if not attempts: 895 raise AirbyteMissingResourceError( 896 resource_type="sync attempt", 897 resource_name_or_id=str(sync_result.job_id), 898 ) 899 900 if attempt_number is not None: 901 target_attempt = None 902 for attempt in attempts: 903 if attempt.attempt_number == attempt_number: 904 target_attempt = attempt 905 break 906 907 if target_attempt is None: 908 raise AirbyteMissingResourceError( 909 resource_type="sync attempt", 910 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 911 ) 912 else: 913 target_attempt = max(attempts, key=lambda a: a.attempt_number) 914 915 logs = target_attempt.get_full_log_text() 916 917 if not logs: 918 # Return empty result with zero lines 919 return LogReadResult( 920 log_text=( 921 f"[No logs available for job '{sync_result.job_id}', " 922 f"attempt {target_attempt.attempt_number}.]" 923 ), 924 log_text_start_line=1, 925 log_text_line_count=0, 926 total_log_lines_available=0, 927 job_id=sync_result.job_id, 928 attempt_number=target_attempt.attempt_number, 929 ) 930 931 # Apply line limiting 932 log_lines = logs.splitlines() 933 total_lines = len(log_lines) 934 935 # Determine effective max_lines (0 means no limit) 936 effective_max = total_lines if max_lines == 0 else max_lines 937 938 # Calculate start_index and slice based on from_tail or line_offset 939 if from_tail: 940 start_index = max(0, total_lines - effective_max) 941 selected_lines = log_lines[start_index:][:effective_max] 942 else: 943 start_index = line_offset or 0 944 selected_lines = log_lines[start_index : start_index + effective_max] 945 946 return LogReadResult( 947 log_text="\n".join(selected_lines), 948 log_text_start_line=start_index + 1, # Convert to 1-based index 949 log_text_line_count=len(selected_lines), 950 total_log_lines_available=total_lines, 951 job_id=sync_result.job_id, 952 attempt_number=target_attempt.attempt_number, 953 ) 954 955 956@mcp_tool( 957 domain="cloud", 958 read_only=True, 959 idempotent=True, 960 open_world=True, 961 extra_help_text=CLOUD_AUTH_TIP_TEXT, 962) 963def list_deployed_cloud_connections( 964 *, 965 workspace_id: Annotated[ 966 str | None, 967 Field( 968 description=WORKSPACE_ID_TIP_TEXT, 969 default=None, 970 ), 971 ], 972 name_contains: Annotated[ 973 str | None, 974 Field( 975 description="Optional case-insensitive substring to filter connections by name", 976 default=None, 977 ), 978 ], 979 max_items_limit: Annotated[ 980 int | None, 981 Field( 982 description="Optional maximum number of items to return (default: no limit)", 983 default=None, 984 ), 985 ], 986 with_connection_status: Annotated[ 987 bool | None, 988 Field( 989 description="If True, include status info for each connection's most recent sync job", 990 default=False, 991 ), 992 ], 993 failing_connections_only: Annotated[ 994 bool | None, 995 Field( 996 description="If True, only return connections with failed/cancelled last sync", 997 default=False, 998 ), 999 ], 1000) -> list[CloudConnectionResult]: 1001 """List all deployed connections in the Airbyte Cloud workspace. 1002 1003 When with_connection_status is True, each connection result will include 1004 information about the most recent sync job status, skipping over any 1005 currently in-progress syncs to find the last completed job. 1006 1007 When failing_connections_only is True, only connections where the most 1008 recent completed sync job failed or was cancelled will be returned. 1009 This implicitly enables with_connection_status. 1010 """ 1011 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1012 connections = workspace.list_connections() 1013 1014 # Filter by name if requested 1015 if name_contains: 1016 needle = name_contains.lower() 1017 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1018 1019 # If failing_connections_only is True, implicitly enable with_connection_status 1020 if failing_connections_only: 1021 with_connection_status = True 1022 1023 results: list[CloudConnectionResult] = [] 1024 1025 for connection in connections: 1026 last_job_status: str | None = None 1027 last_job_id: int | None = None 1028 last_job_time: str | None = None 1029 currently_running_job_id: int | None = None 1030 currently_running_job_start_time: str | None = None 1031 1032 if with_connection_status: 1033 sync_logs = connection.get_previous_sync_logs(limit=5) 1034 last_completed_job_status = None # Keep enum for comparison 1035 1036 for sync_result in sync_logs: 1037 job_status = sync_result.get_job_status() 1038 1039 if not sync_result.is_job_complete(): 1040 currently_running_job_id = sync_result.job_id 1041 currently_running_job_start_time = sync_result.start_time.isoformat() 1042 continue 1043 1044 last_completed_job_status = job_status 1045 last_job_status = str(job_status.value) if job_status else None 1046 last_job_id = sync_result.job_id 1047 last_job_time = sync_result.start_time.isoformat() 1048 break 1049 1050 if failing_connections_only and ( 1051 last_completed_job_status is None 1052 or last_completed_job_status not in FAILED_STATUSES 1053 ): 1054 continue 1055 1056 results.append( 1057 CloudConnectionResult( 1058 id=connection.connection_id, 1059 name=cast(str, connection.name), 1060 url=cast(str, connection.connection_url), 1061 source_id=connection.source_id, 1062 destination_id=connection.destination_id, 1063 last_job_status=last_job_status, 1064 last_job_id=last_job_id, 1065 last_job_time=last_job_time, 1066 currently_running_job_id=currently_running_job_id, 1067 currently_running_job_start_time=currently_running_job_start_time, 1068 ) 1069 ) 1070 1071 if max_items_limit is not None and len(results) >= max_items_limit: 1072 break 1073 1074 return results 1075 1076 1077def _resolve_organization( 1078 organization_id: str | None, 1079 organization_name: str | None, 1080 *, 1081 api_root: str, 1082 client_id: SecretString, 1083 client_secret: SecretString, 1084) -> api_util.models.OrganizationResponse: 1085 """Resolve organization from either ID or exact name match. 1086 1087 Args: 1088 organization_id: The organization ID (if provided directly) 1089 organization_name: The organization name (exact match required) 1090 api_root: The API root URL 1091 client_id: OAuth client ID 1092 client_secret: OAuth client secret 1093 1094 Returns: 1095 The resolved OrganizationResponse object 1096 1097 Raises: 1098 PyAirbyteInputError: If neither or both parameters are provided, 1099 or if no organization matches the exact name 1100 AirbyteMissingResourceError: If the organization is not found 1101 """ 1102 if organization_id and organization_name: 1103 raise PyAirbyteInputError( 1104 message="Provide either 'organization_id' or 'organization_name', not both." 1105 ) 1106 if not organization_id and not organization_name: 1107 raise PyAirbyteInputError( 1108 message="Either 'organization_id' or 'organization_name' must be provided." 1109 ) 1110 1111 # Get all organizations for the user 1112 orgs = api_util.list_organizations_for_user( 1113 api_root=api_root, 1114 client_id=client_id, 1115 client_secret=client_secret, 1116 ) 1117 1118 if organization_id: 1119 # Find by ID 1120 matching_orgs = [org for org in orgs if org.organization_id == organization_id] 1121 if not matching_orgs: 1122 raise AirbyteMissingResourceError( 1123 resource_type="organization", 1124 context={ 1125 "organization_id": organization_id, 1126 "message": f"No organization found with ID '{organization_id}' " 1127 "for the current user.", 1128 }, 1129 ) 1130 return matching_orgs[0] 1131 1132 # Find by exact name match (case-sensitive) 1133 matching_orgs = [org for org in orgs if org.organization_name == organization_name] 1134 1135 if not matching_orgs: 1136 raise AirbyteMissingResourceError( 1137 resource_type="organization", 1138 context={ 1139 "organization_name": organization_name, 1140 "message": f"No organization found with exact name '{organization_name}' " 1141 "for the current user.", 1142 }, 1143 ) 1144 1145 if len(matching_orgs) > 1: 1146 raise PyAirbyteInputError( 1147 message=f"Multiple organizations found with name '{organization_name}'. " 1148 "Please use 'organization_id' instead to specify the exact organization." 1149 ) 1150 1151 return matching_orgs[0] 1152 1153 1154def _resolve_organization_id( 1155 organization_id: str | None, 1156 organization_name: str | None, 1157 *, 1158 api_root: str, 1159 client_id: SecretString, 1160 client_secret: SecretString, 1161) -> str: 1162 """Resolve organization ID from either ID or exact name match. 1163 1164 This is a convenience wrapper around _resolve_organization that returns just the ID. 1165 """ 1166 org = _resolve_organization( 1167 organization_id=organization_id, 1168 organization_name=organization_name, 1169 api_root=api_root, 1170 client_id=client_id, 1171 client_secret=client_secret, 1172 ) 1173 return org.organization_id 1174 1175 1176@mcp_tool( 1177 domain="cloud", 1178 read_only=True, 1179 idempotent=True, 1180 open_world=True, 1181 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1182) 1183def list_cloud_workspaces( 1184 *, 1185 organization_id: Annotated[ 1186 str | None, 1187 Field( 1188 description="Organization ID. Required if organization_name is not provided.", 1189 default=None, 1190 ), 1191 ], 1192 organization_name: Annotated[ 1193 str | None, 1194 Field( 1195 description=( 1196 "Organization name (exact match). " "Required if organization_id is not provided." 1197 ), 1198 default=None, 1199 ), 1200 ], 1201 name_contains: Annotated[ 1202 str | None, 1203 Field( 1204 description="Optional substring to filter workspaces by name (server-side filtering)", 1205 default=None, 1206 ), 1207 ], 1208 max_items_limit: Annotated[ 1209 int | None, 1210 Field( 1211 description="Optional maximum number of items to return (default: no limit)", 1212 default=None, 1213 ), 1214 ], 1215) -> list[CloudWorkspaceResult]: 1216 """List all workspaces in a specific organization. 1217 1218 Requires either organization_id OR organization_name (exact match) to be provided. 1219 This tool will NOT list workspaces across all organizations - you must specify 1220 which organization to list workspaces from. 1221 """ 1222 api_root = resolve_cloud_api_url() 1223 client_id = resolve_cloud_client_id() 1224 client_secret = resolve_cloud_client_secret() 1225 1226 resolved_org_id = _resolve_organization_id( 1227 organization_id=organization_id, 1228 organization_name=organization_name, 1229 api_root=api_root, 1230 client_id=client_id, 1231 client_secret=client_secret, 1232 ) 1233 1234 workspaces = api_util.list_workspaces_in_organization( 1235 organization_id=resolved_org_id, 1236 api_root=api_root, 1237 client_id=client_id, 1238 client_secret=client_secret, 1239 name_contains=name_contains, 1240 max_items_limit=max_items_limit, 1241 ) 1242 1243 return [ 1244 CloudWorkspaceResult( 1245 id=ws.get("workspaceId", ""), 1246 name=ws.get("name", ""), 1247 organization_id=ws.get("organizationId", ""), 1248 ) 1249 for ws in workspaces 1250 ] 1251 1252 1253@mcp_tool( 1254 domain="cloud", 1255 read_only=True, 1256 idempotent=True, 1257 open_world=True, 1258 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1259) 1260def describe_cloud_organization( 1261 *, 1262 organization_id: Annotated[ 1263 str | None, 1264 Field( 1265 description="Organization ID. Required if organization_name is not provided.", 1266 default=None, 1267 ), 1268 ], 1269 organization_name: Annotated[ 1270 str | None, 1271 Field( 1272 description=( 1273 "Organization name (exact match). " "Required if organization_id is not provided." 1274 ), 1275 default=None, 1276 ), 1277 ], 1278) -> CloudOrganizationResult: 1279 """Get details about a specific organization. 1280 1281 Requires either organization_id OR organization_name (exact match) to be provided. 1282 This tool is useful for looking up an organization's ID from its name, or vice versa. 1283 """ 1284 api_root = resolve_cloud_api_url() 1285 client_id = resolve_cloud_client_id() 1286 client_secret = resolve_cloud_client_secret() 1287 1288 org = _resolve_organization( 1289 organization_id=organization_id, 1290 organization_name=organization_name, 1291 api_root=api_root, 1292 client_id=client_id, 1293 client_secret=client_secret, 1294 ) 1295 1296 return CloudOrganizationResult( 1297 id=org.organization_id, 1298 name=org.organization_name, 1299 email=org.email, 1300 ) 1301 1302 1303def _get_custom_source_definition_description( 1304 custom_source: CustomCloudSourceDefinition, 1305) -> str: 1306 return "\n".join( 1307 [ 1308 f" - Custom Source Name: {custom_source.name}", 1309 f" - Definition ID: {custom_source.definition_id}", 1310 f" - Definition Version: {custom_source.version}", 1311 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1312 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1313 ] 1314 ) 1315 1316 1317@mcp_tool( 1318 domain="cloud", 1319 open_world=True, 1320 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1321) 1322def publish_custom_source_definition( 1323 name: Annotated[ 1324 str, 1325 Field(description="The name for the custom connector definition."), 1326 ], 1327 *, 1328 workspace_id: Annotated[ 1329 str | None, 1330 Field( 1331 description=WORKSPACE_ID_TIP_TEXT, 1332 default=None, 1333 ), 1334 ], 1335 manifest_yaml: Annotated[ 1336 str | Path | None, 1337 Field( 1338 description=( 1339 "The Low-code CDK manifest as a YAML string or file path. " 1340 "Required for YAML connectors." 1341 ), 1342 default=None, 1343 ), 1344 ] = None, 1345 unique: Annotated[ 1346 bool, 1347 Field( 1348 description="Whether to require a unique name.", 1349 default=True, 1350 ), 1351 ] = True, 1352 pre_validate: Annotated[ 1353 bool, 1354 Field( 1355 description="Whether to validate the manifest client-side before publishing.", 1356 default=True, 1357 ), 1358 ] = True, 1359) -> str: 1360 """Publish a custom YAML source connector definition to Airbyte Cloud. 1361 1362 Note: Only YAML (declarative) connectors are currently supported. 1363 Docker-based custom sources are not yet available. 1364 """ 1365 processed_manifest = manifest_yaml 1366 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1367 processed_manifest = Path(manifest_yaml) 1368 1369 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1370 custom_source = workspace.publish_custom_source_definition( 1371 name=name, 1372 manifest_yaml=processed_manifest, 1373 unique=unique, 1374 pre_validate=pre_validate, 1375 ) 1376 register_guid_created_in_session(custom_source.definition_id) 1377 return ( 1378 "Successfully published custom YAML source definition:\n" 1379 + _get_custom_source_definition_description( 1380 custom_source=custom_source, 1381 ) 1382 + "\n" 1383 ) 1384 1385 1386@mcp_tool( 1387 domain="cloud", 1388 read_only=True, 1389 idempotent=True, 1390 open_world=True, 1391) 1392def list_custom_source_definitions( 1393 *, 1394 workspace_id: Annotated[ 1395 str | None, 1396 Field( 1397 description=WORKSPACE_ID_TIP_TEXT, 1398 default=None, 1399 ), 1400 ], 1401) -> list[dict[str, Any]]: 1402 """List custom YAML source definitions in the Airbyte Cloud workspace. 1403 1404 Note: Only YAML (declarative) connectors are currently supported. 1405 Docker-based custom sources are not yet available. 1406 """ 1407 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1408 definitions = workspace.list_custom_source_definitions( 1409 definition_type="yaml", 1410 ) 1411 1412 return [ 1413 { 1414 "definition_id": d.definition_id, 1415 "name": d.name, 1416 "version": d.version, 1417 "connector_builder_project_url": d.connector_builder_project_url, 1418 } 1419 for d in definitions 1420 ] 1421 1422 1423@mcp_tool( 1424 domain="cloud", 1425 destructive=True, 1426 open_world=True, 1427) 1428def update_custom_source_definition( 1429 definition_id: Annotated[ 1430 str, 1431 Field(description="The ID of the definition to update."), 1432 ], 1433 manifest_yaml: Annotated[ 1434 str | Path, 1435 Field( 1436 description="New manifest as YAML string or file path.", 1437 ), 1438 ], 1439 *, 1440 workspace_id: Annotated[ 1441 str | None, 1442 Field( 1443 description=WORKSPACE_ID_TIP_TEXT, 1444 default=None, 1445 ), 1446 ], 1447 pre_validate: Annotated[ 1448 bool, 1449 Field( 1450 description="Whether to validate the manifest client-side before updating.", 1451 default=True, 1452 ), 1453 ] = True, 1454) -> str: 1455 """Update a custom YAML source definition in Airbyte Cloud. 1456 1457 Note: Only YAML (declarative) connectors are currently supported. 1458 Docker-based custom sources are not yet available. 1459 """ 1460 check_guid_created_in_session(definition_id) 1461 processed_manifest = manifest_yaml 1462 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1463 processed_manifest = Path(manifest_yaml) 1464 1465 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1466 definition = workspace.get_custom_source_definition( 1467 definition_id=definition_id, 1468 definition_type="yaml", 1469 ) 1470 custom_source: CustomCloudSourceDefinition = definition.update_definition( 1471 manifest_yaml=processed_manifest, 1472 pre_validate=pre_validate, 1473 ) 1474 return ( 1475 "Successfully updated custom YAML source definition:\n" 1476 + _get_custom_source_definition_description( 1477 custom_source=custom_source, 1478 ) 1479 ) 1480 1481 1482@mcp_tool( 1483 domain="cloud", 1484 destructive=True, 1485 open_world=True, 1486) 1487def permanently_delete_custom_source_definition( 1488 definition_id: Annotated[ 1489 str, 1490 Field(description="The ID of the custom source definition to delete."), 1491 ], 1492 name: Annotated[ 1493 str, 1494 Field(description="The expected name of the custom source definition (for verification)."), 1495 ], 1496 *, 1497 workspace_id: Annotated[ 1498 str | None, 1499 Field( 1500 description=WORKSPACE_ID_TIP_TEXT, 1501 default=None, 1502 ), 1503 ], 1504) -> str: 1505 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1506 1507 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1508 (case insensitive). 1509 1510 If the connector does not meet this requirement, the deletion will be rejected with a 1511 helpful error message. Instruct the user to rename the connector appropriately to authorize 1512 the deletion. 1513 1514 The provided name must match the actual name of the definition for the operation to proceed. 1515 This is a safety measure to ensure you are deleting the correct resource. 1516 1517 Note: Only YAML (declarative) connectors are currently supported. 1518 Docker-based custom sources are not yet available. 1519 """ 1520 check_guid_created_in_session(definition_id) 1521 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1522 definition = workspace.get_custom_source_definition( 1523 definition_id=definition_id, 1524 definition_type="yaml", 1525 ) 1526 actual_name: str = definition.name 1527 1528 # Verify the name matches 1529 if actual_name != name: 1530 raise PyAirbyteInputError( 1531 message=( 1532 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1533 "The provided name must exactly match the definition's actual name. " 1534 "This is a safety measure to prevent accidental deletion." 1535 ), 1536 context={ 1537 "definition_id": definition_id, 1538 "expected_name": name, 1539 "actual_name": actual_name, 1540 }, 1541 ) 1542 1543 definition.permanently_delete( 1544 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1545 ) 1546 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 1547 1548 1549@mcp_tool( 1550 domain="cloud", 1551 destructive=True, 1552 open_world=True, 1553 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1554) 1555def permanently_delete_cloud_source( 1556 source_id: Annotated[ 1557 str, 1558 Field(description="The ID of the deployed source to delete."), 1559 ], 1560 name: Annotated[ 1561 str, 1562 Field(description="The expected name of the source (for verification)."), 1563 ], 1564) -> str: 1565 """Permanently delete a deployed source connector from Airbyte Cloud. 1566 1567 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1568 (case insensitive). 1569 1570 If the source does not meet this requirement, the deletion will be rejected with a 1571 helpful error message. Instruct the user to rename the source appropriately to authorize 1572 the deletion. 1573 1574 The provided name must match the actual name of the source for the operation to proceed. 1575 This is a safety measure to ensure you are deleting the correct resource. 1576 """ 1577 check_guid_created_in_session(source_id) 1578 workspace: CloudWorkspace = _get_cloud_workspace() 1579 source = workspace.get_source(source_id=source_id) 1580 actual_name: str = cast(str, source.name) 1581 1582 # Verify the name matches 1583 if actual_name != name: 1584 raise PyAirbyteInputError( 1585 message=( 1586 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1587 "The provided name must exactly match the source's actual name. " 1588 "This is a safety measure to prevent accidental deletion." 1589 ), 1590 context={ 1591 "source_id": source_id, 1592 "expected_name": name, 1593 "actual_name": actual_name, 1594 }, 1595 ) 1596 1597 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1598 workspace.permanently_delete_source( 1599 source=source_id, 1600 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1601 ) 1602 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 1603 1604 1605@mcp_tool( 1606 domain="cloud", 1607 destructive=True, 1608 open_world=True, 1609 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1610) 1611def permanently_delete_cloud_destination( 1612 destination_id: Annotated[ 1613 str, 1614 Field(description="The ID of the deployed destination to delete."), 1615 ], 1616 name: Annotated[ 1617 str, 1618 Field(description="The expected name of the destination (for verification)."), 1619 ], 1620) -> str: 1621 """Permanently delete a deployed destination connector from Airbyte Cloud. 1622 1623 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1624 (case insensitive). 1625 1626 If the destination does not meet this requirement, the deletion will be rejected with a 1627 helpful error message. Instruct the user to rename the destination appropriately to authorize 1628 the deletion. 1629 1630 The provided name must match the actual name of the destination for the operation to proceed. 1631 This is a safety measure to ensure you are deleting the correct resource. 1632 """ 1633 check_guid_created_in_session(destination_id) 1634 workspace: CloudWorkspace = _get_cloud_workspace() 1635 destination = workspace.get_destination(destination_id=destination_id) 1636 actual_name: str = cast(str, destination.name) 1637 1638 # Verify the name matches 1639 if actual_name != name: 1640 raise PyAirbyteInputError( 1641 message=( 1642 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1643 "The provided name must exactly match the destination's actual name. " 1644 "This is a safety measure to prevent accidental deletion." 1645 ), 1646 context={ 1647 "destination_id": destination_id, 1648 "expected_name": name, 1649 "actual_name": actual_name, 1650 }, 1651 ) 1652 1653 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1654 workspace.permanently_delete_destination( 1655 destination=destination_id, 1656 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1657 ) 1658 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 1659 1660 1661@mcp_tool( 1662 domain="cloud", 1663 destructive=True, 1664 open_world=True, 1665 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1666) 1667def permanently_delete_cloud_connection( 1668 connection_id: Annotated[ 1669 str, 1670 Field(description="The ID of the connection to delete."), 1671 ], 1672 name: Annotated[ 1673 str, 1674 Field(description="The expected name of the connection (for verification)."), 1675 ], 1676 *, 1677 cascade_delete_source: Annotated[ 1678 bool, 1679 Field( 1680 description=( 1681 "Whether to also delete the source connector associated with this connection." 1682 ), 1683 default=False, 1684 ), 1685 ] = False, 1686 cascade_delete_destination: Annotated[ 1687 bool, 1688 Field( 1689 description=( 1690 "Whether to also delete the destination connector associated with this connection." 1691 ), 1692 default=False, 1693 ), 1694 ] = False, 1695) -> str: 1696 """Permanently delete a connection from Airbyte Cloud. 1697 1698 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 1699 (case insensitive). 1700 1701 If the connection does not meet this requirement, the deletion will be rejected with a 1702 helpful error message. Instruct the user to rename the connection appropriately to authorize 1703 the deletion. 1704 1705 The provided name must match the actual name of the connection for the operation to proceed. 1706 This is a safety measure to ensure you are deleting the correct resource. 1707 """ 1708 check_guid_created_in_session(connection_id) 1709 workspace: CloudWorkspace = _get_cloud_workspace() 1710 connection = workspace.get_connection(connection_id=connection_id) 1711 actual_name: str = cast(str, connection.name) 1712 1713 # Verify the name matches 1714 if actual_name != name: 1715 raise PyAirbyteInputError( 1716 message=( 1717 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1718 "The provided name must exactly match the connection's actual name. " 1719 "This is a safety measure to prevent accidental deletion." 1720 ), 1721 context={ 1722 "connection_id": connection_id, 1723 "expected_name": name, 1724 "actual_name": actual_name, 1725 }, 1726 ) 1727 1728 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1729 workspace.permanently_delete_connection( 1730 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1731 connection=connection_id, 1732 cascade_delete_source=cascade_delete_source, 1733 cascade_delete_destination=cascade_delete_destination, 1734 ) 1735 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 1736 1737 1738@mcp_tool( 1739 domain="cloud", 1740 open_world=True, 1741 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1742) 1743def rename_cloud_source( 1744 source_id: Annotated[ 1745 str, 1746 Field(description="The ID of the deployed source to rename."), 1747 ], 1748 name: Annotated[ 1749 str, 1750 Field(description="New name for the source."), 1751 ], 1752 *, 1753 workspace_id: Annotated[ 1754 str | None, 1755 Field( 1756 description=WORKSPACE_ID_TIP_TEXT, 1757 default=None, 1758 ), 1759 ], 1760) -> str: 1761 """Rename a deployed source connector on Airbyte Cloud.""" 1762 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1763 source = workspace.get_source(source_id=source_id) 1764 source.rename(name=name) 1765 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 1766 1767 1768@mcp_tool( 1769 domain="cloud", 1770 destructive=True, 1771 open_world=True, 1772 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1773) 1774def update_cloud_source_config( 1775 source_id: Annotated[ 1776 str, 1777 Field(description="The ID of the deployed source to update."), 1778 ], 1779 config: Annotated[ 1780 dict | str, 1781 Field( 1782 description="New configuration for the source connector.", 1783 ), 1784 ], 1785 config_secret_name: Annotated[ 1786 str | None, 1787 Field( 1788 description="The name of the secret containing the configuration.", 1789 default=None, 1790 ), 1791 ] = None, 1792 *, 1793 workspace_id: Annotated[ 1794 str | None, 1795 Field( 1796 description=WORKSPACE_ID_TIP_TEXT, 1797 default=None, 1798 ), 1799 ], 1800) -> str: 1801 """Update a deployed source connector's configuration on Airbyte Cloud. 1802 1803 This is a destructive operation that can break existing connections if the 1804 configuration is changed incorrectly. Use with caution. 1805 """ 1806 check_guid_created_in_session(source_id) 1807 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1808 source = workspace.get_source(source_id=source_id) 1809 1810 config_dict = resolve_config( 1811 config=config, 1812 config_secret_name=config_secret_name, 1813 config_spec_jsonschema=None, # We don't have the spec here 1814 ) 1815 1816 source.update_config(config=config_dict) 1817 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 1818 1819 1820@mcp_tool( 1821 domain="cloud", 1822 open_world=True, 1823 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1824) 1825def rename_cloud_destination( 1826 destination_id: Annotated[ 1827 str, 1828 Field(description="The ID of the deployed destination to rename."), 1829 ], 1830 name: Annotated[ 1831 str, 1832 Field(description="New name for the destination."), 1833 ], 1834 *, 1835 workspace_id: Annotated[ 1836 str | None, 1837 Field( 1838 description=WORKSPACE_ID_TIP_TEXT, 1839 default=None, 1840 ), 1841 ], 1842) -> str: 1843 """Rename a deployed destination connector on Airbyte Cloud.""" 1844 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1845 destination = workspace.get_destination(destination_id=destination_id) 1846 destination.rename(name=name) 1847 return ( 1848 f"Successfully renamed destination '{destination_id}' to '{name}'. " 1849 f"URL: {destination.connector_url}" 1850 ) 1851 1852 1853@mcp_tool( 1854 domain="cloud", 1855 destructive=True, 1856 open_world=True, 1857 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1858) 1859def update_cloud_destination_config( 1860 destination_id: Annotated[ 1861 str, 1862 Field(description="The ID of the deployed destination to update."), 1863 ], 1864 config: Annotated[ 1865 dict | str, 1866 Field( 1867 description="New configuration for the destination connector.", 1868 ), 1869 ], 1870 config_secret_name: Annotated[ 1871 str | None, 1872 Field( 1873 description="The name of the secret containing the configuration.", 1874 default=None, 1875 ), 1876 ], 1877 *, 1878 workspace_id: Annotated[ 1879 str | None, 1880 Field( 1881 description=WORKSPACE_ID_TIP_TEXT, 1882 default=None, 1883 ), 1884 ], 1885) -> str: 1886 """Update a deployed destination connector's configuration on Airbyte Cloud. 1887 1888 This is a destructive operation that can break existing connections if the 1889 configuration is changed incorrectly. Use with caution. 1890 """ 1891 check_guid_created_in_session(destination_id) 1892 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1893 destination = workspace.get_destination(destination_id=destination_id) 1894 1895 config_dict = resolve_config( 1896 config=config, 1897 config_secret_name=config_secret_name, 1898 config_spec_jsonschema=None, # We don't have the spec here 1899 ) 1900 1901 destination.update_config(config=config_dict) 1902 return ( 1903 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 1904 ) 1905 1906 1907@mcp_tool( 1908 domain="cloud", 1909 open_world=True, 1910 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1911) 1912def rename_cloud_connection( 1913 connection_id: Annotated[ 1914 str, 1915 Field(description="The ID of the connection to rename."), 1916 ], 1917 name: Annotated[ 1918 str, 1919 Field(description="New name for the connection."), 1920 ], 1921 *, 1922 workspace_id: Annotated[ 1923 str | None, 1924 Field( 1925 description=WORKSPACE_ID_TIP_TEXT, 1926 default=None, 1927 ), 1928 ], 1929) -> str: 1930 """Rename a connection on Airbyte Cloud.""" 1931 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1932 connection = workspace.get_connection(connection_id=connection_id) 1933 connection.rename(name=name) 1934 return ( 1935 f"Successfully renamed connection '{connection_id}' to '{name}'. " 1936 f"URL: {connection.connection_url}" 1937 ) 1938 1939 1940@mcp_tool( 1941 domain="cloud", 1942 destructive=True, 1943 open_world=True, 1944 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1945) 1946def set_cloud_connection_table_prefix( 1947 connection_id: Annotated[ 1948 str, 1949 Field(description="The ID of the connection to update."), 1950 ], 1951 prefix: Annotated[ 1952 str, 1953 Field(description="New table prefix to use when syncing to the destination."), 1954 ], 1955 *, 1956 workspace_id: Annotated[ 1957 str | None, 1958 Field( 1959 description=WORKSPACE_ID_TIP_TEXT, 1960 default=None, 1961 ), 1962 ], 1963) -> str: 1964 """Set the table prefix for a connection on Airbyte Cloud. 1965 1966 This is a destructive operation that can break downstream dependencies if the 1967 table prefix is changed incorrectly. Use with caution. 1968 """ 1969 check_guid_created_in_session(connection_id) 1970 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1971 connection = workspace.get_connection(connection_id=connection_id) 1972 connection.set_table_prefix(prefix=prefix) 1973 return ( 1974 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 1975 f"URL: {connection.connection_url}" 1976 ) 1977 1978 1979@mcp_tool( 1980 domain="cloud", 1981 destructive=True, 1982 open_world=True, 1983 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1984) 1985def set_cloud_connection_selected_streams( 1986 connection_id: Annotated[ 1987 str, 1988 Field(description="The ID of the connection to update."), 1989 ], 1990 stream_names: Annotated[ 1991 str | list[str], 1992 Field( 1993 description=( 1994 "The selected stream names to sync within the connection. " 1995 "Must be an explicit stream name or list of streams." 1996 ) 1997 ), 1998 ], 1999 *, 2000 workspace_id: Annotated[ 2001 str | None, 2002 Field( 2003 description=WORKSPACE_ID_TIP_TEXT, 2004 default=None, 2005 ), 2006 ], 2007) -> str: 2008 """Set the selected streams for a connection on Airbyte Cloud. 2009 2010 This is a destructive operation that can break existing connections if the 2011 stream selection is changed incorrectly. Use with caution. 2012 """ 2013 check_guid_created_in_session(connection_id) 2014 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2015 connection = workspace.get_connection(connection_id=connection_id) 2016 2017 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2018 connection.set_selected_streams(stream_names=resolved_streams_list) 2019 2020 return ( 2021 f"Successfully set selected streams for connection '{connection_id}' " 2022 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2023 ) 2024 2025 2026def register_cloud_ops_tools(app: FastMCP) -> None: 2027 """@private Register tools with the FastMCP app. 2028 2029 This is an internal function and should not be called directly. 2030 2031 Tools are filtered based on mode settings: 2032 - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered 2033 - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive 2034 operations are protected by runtime session checks 2035 """ 2036 register_tools(app, domain="cloud")
42class CloudSourceResult(BaseModel): 43 """Information about a deployed source connector in Airbyte Cloud.""" 44 45 id: str 46 """The source ID.""" 47 name: str 48 """Display name of the source.""" 49 url: str 50 """Web URL for managing this source in Airbyte Cloud."""
Information about a deployed source connector in Airbyte Cloud.
53class CloudDestinationResult(BaseModel): 54 """Information about a deployed destination connector in Airbyte Cloud.""" 55 56 id: str 57 """The destination ID.""" 58 name: str 59 """Display name of the destination.""" 60 url: str 61 """Web URL for managing this destination in Airbyte Cloud."""
Information about a deployed destination connector in Airbyte Cloud.
64class CloudConnectionResult(BaseModel): 65 """Information about a deployed connection in Airbyte Cloud.""" 66 67 id: str 68 """The connection ID.""" 69 name: str 70 """Display name of the connection.""" 71 url: str 72 """Web URL for managing this connection in Airbyte Cloud.""" 73 source_id: str 74 """ID of the source used by this connection.""" 75 destination_id: str 76 """ID of the destination used by this connection.""" 77 last_job_status: str | None = None 78 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 79 Only populated when with_connection_status=True.""" 80 last_job_id: int | None = None 81 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 82 last_job_time: str | None = None 83 """ISO 8601 timestamp of the most recent completed sync. 84 Only populated when with_connection_status=True.""" 85 currently_running_job_id: int | None = None 86 """Job ID of a currently running sync, if any. 87 Only populated when with_connection_status=True.""" 88 currently_running_job_start_time: str | None = None 89 """ISO 8601 timestamp of when the currently running sync started. 90 Only populated when with_connection_status=True."""
Information about a deployed connection in Airbyte Cloud.
Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.
Job ID of the most recent completed sync. Only populated when with_connection_status=True.
ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.
93class CloudSourceDetails(BaseModel): 94 """Detailed information about a deployed source connector in Airbyte Cloud.""" 95 96 source_id: str 97 """The source ID.""" 98 source_name: str 99 """Display name of the source.""" 100 source_url: str 101 """Web URL for managing this source in Airbyte Cloud.""" 102 connector_definition_id: str 103 """The connector definition ID (e.g., the ID for 'source-postgres')."""
Detailed information about a deployed source connector in Airbyte Cloud.
106class CloudDestinationDetails(BaseModel): 107 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 108 109 destination_id: str 110 """The destination ID.""" 111 destination_name: str 112 """Display name of the destination.""" 113 destination_url: str 114 """Web URL for managing this destination in Airbyte Cloud.""" 115 connector_definition_id: str 116 """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
Detailed information about a deployed destination connector in Airbyte Cloud.
119class CloudConnectionDetails(BaseModel): 120 """Detailed information about a deployed connection in Airbyte Cloud.""" 121 122 connection_id: str 123 """The connection ID.""" 124 connection_name: str 125 """Display name of the connection.""" 126 connection_url: str 127 """Web URL for managing this connection in Airbyte Cloud.""" 128 source_id: str 129 """ID of the source used by this connection.""" 130 source_name: str 131 """Display name of the source.""" 132 destination_id: str 133 """ID of the destination used by this connection.""" 134 destination_name: str 135 """Display name of the destination.""" 136 selected_streams: list[str] 137 """List of stream names selected for syncing.""" 138 table_prefix: str | None 139 """Table prefix applied when syncing to the destination."""
Detailed information about a deployed connection in Airbyte Cloud.
142class CloudOrganizationResult(BaseModel): 143 """Information about an organization in Airbyte Cloud.""" 144 145 id: str 146 """The organization ID.""" 147 name: str 148 """Display name of the organization.""" 149 email: str 150 """Email associated with the organization."""
Information about an organization in Airbyte Cloud.
153class CloudWorkspaceResult(BaseModel): 154 """Information about a workspace in Airbyte Cloud.""" 155 156 id: str 157 """The workspace ID.""" 158 name: str 159 """Display name of the workspace.""" 160 organization_id: str 161 """ID of the organization this workspace belongs to."""
Information about a workspace in Airbyte Cloud.
164class LogReadResult(BaseModel): 165 """Result of reading sync logs with pagination support.""" 166 167 job_id: int 168 """The job ID the logs belong to.""" 169 attempt_number: int 170 """The attempt number the logs belong to.""" 171 log_text: str 172 """The string containing the log text we are returning.""" 173 log_text_start_line: int 174 """1-based line index of the first line returned.""" 175 log_text_line_count: int 176 """Count of lines we are returning.""" 177 total_log_lines_available: int 178 """Total number of log lines available, shows if any lines were missed due to the limit."""
Result of reading sync logs with pagination support.
196@mcp_tool( 197 domain="cloud", 198 open_world=True, 199 extra_help_text=CLOUD_AUTH_TIP_TEXT, 200) 201def deploy_source_to_cloud( 202 source_name: Annotated[ 203 str, 204 Field(description="The name to use when deploying the source."), 205 ], 206 source_connector_name: Annotated[ 207 str, 208 Field(description="The name of the source connector (e.g., 'source-faker')."), 209 ], 210 *, 211 workspace_id: Annotated[ 212 str | None, 213 Field( 214 description=WORKSPACE_ID_TIP_TEXT, 215 default=None, 216 ), 217 ], 218 config: Annotated[ 219 dict | str | None, 220 Field( 221 description="The configuration for the source connector.", 222 default=None, 223 ), 224 ], 225 config_secret_name: Annotated[ 226 str | None, 227 Field( 228 description="The name of the secret containing the configuration.", 229 default=None, 230 ), 231 ], 232 unique: Annotated[ 233 bool, 234 Field( 235 description="Whether to require a unique name.", 236 default=True, 237 ), 238 ], 239) -> str: 240 """Deploy a source connector to Airbyte Cloud.""" 241 source = get_source( 242 source_connector_name, 243 no_executor=True, 244 ) 245 config_dict = resolve_config( 246 config=config, 247 config_secret_name=config_secret_name, 248 config_spec_jsonschema=source.config_spec, 249 ) 250 source.set_config(config_dict, validate=True) 251 252 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 253 deployed_source = workspace.deploy_source( 254 name=source_name, 255 source=source, 256 unique=unique, 257 ) 258 259 register_guid_created_in_session(deployed_source.connector_id) 260 return ( 261 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 262 f" and URL: {deployed_source.connector_url}" 263 )
Deploy a source connector to Airbyte Cloud.
266@mcp_tool( 267 domain="cloud", 268 open_world=True, 269 extra_help_text=CLOUD_AUTH_TIP_TEXT, 270) 271def deploy_destination_to_cloud( 272 destination_name: Annotated[ 273 str, 274 Field(description="The name to use when deploying the destination."), 275 ], 276 destination_connector_name: Annotated[ 277 str, 278 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 279 ], 280 *, 281 workspace_id: Annotated[ 282 str | None, 283 Field( 284 description=WORKSPACE_ID_TIP_TEXT, 285 default=None, 286 ), 287 ], 288 config: Annotated[ 289 dict | str | None, 290 Field( 291 description="The configuration for the destination connector.", 292 default=None, 293 ), 294 ], 295 config_secret_name: Annotated[ 296 str | None, 297 Field( 298 description="The name of the secret containing the configuration.", 299 default=None, 300 ), 301 ], 302 unique: Annotated[ 303 bool, 304 Field( 305 description="Whether to require a unique name.", 306 default=True, 307 ), 308 ], 309) -> str: 310 """Deploy a destination connector to Airbyte Cloud.""" 311 destination = get_destination( 312 destination_connector_name, 313 no_executor=True, 314 ) 315 config_dict = resolve_config( 316 config=config, 317 config_secret_name=config_secret_name, 318 config_spec_jsonschema=destination.config_spec, 319 ) 320 destination.set_config(config_dict, validate=True) 321 322 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 323 deployed_destination = workspace.deploy_destination( 324 name=destination_name, 325 destination=destination, 326 unique=unique, 327 ) 328 329 register_guid_created_in_session(deployed_destination.connector_id) 330 return ( 331 f"Successfully deployed destination '{destination_name}' " 332 f"with ID: {deployed_destination.connector_id}" 333 )
Deploy a destination connector to Airbyte Cloud.
336@mcp_tool( 337 domain="cloud", 338 open_world=True, 339 extra_help_text=CLOUD_AUTH_TIP_TEXT, 340) 341def create_connection_on_cloud( 342 connection_name: Annotated[ 343 str, 344 Field(description="The name of the connection."), 345 ], 346 source_id: Annotated[ 347 str, 348 Field(description="The ID of the deployed source."), 349 ], 350 destination_id: Annotated[ 351 str, 352 Field(description="The ID of the deployed destination."), 353 ], 354 selected_streams: Annotated[ 355 str | list[str], 356 Field( 357 description=( 358 "The selected stream names to sync within the connection. " 359 "Must be an explicit stream name or list of streams. " 360 "Cannot be empty or '*'." 361 ) 362 ), 363 ], 364 *, 365 workspace_id: Annotated[ 366 str | None, 367 Field( 368 description=WORKSPACE_ID_TIP_TEXT, 369 default=None, 370 ), 371 ], 372 table_prefix: Annotated[ 373 str | None, 374 Field( 375 description="Optional table prefix to use when syncing to the destination.", 376 default=None, 377 ), 378 ], 379) -> str: 380 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 381 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 382 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 383 deployed_connection = workspace.deploy_connection( 384 connection_name=connection_name, 385 source=source_id, 386 destination=destination_id, 387 selected_streams=resolved_streams_list, 388 table_prefix=table_prefix, 389 ) 390 391 register_guid_created_in_session(deployed_connection.connection_id) 392 return ( 393 f"Successfully created connection '{connection_name}' " 394 f"with ID '{deployed_connection.connection_id}' and " 395 f"URL: {deployed_connection.connection_url}" 396 )
Create a connection between a deployed source and destination on Airbyte Cloud.
399@mcp_tool( 400 domain="cloud", 401 open_world=True, 402 extra_help_text=CLOUD_AUTH_TIP_TEXT, 403) 404def run_cloud_sync( 405 connection_id: Annotated[ 406 str, 407 Field(description="The ID of the Airbyte Cloud connection."), 408 ], 409 *, 410 workspace_id: Annotated[ 411 str | None, 412 Field( 413 description=WORKSPACE_ID_TIP_TEXT, 414 default=None, 415 ), 416 ], 417 wait: Annotated[ 418 bool, 419 Field( 420 description=( 421 "Whether to wait for the sync to complete. Since a sync can take between several " 422 "minutes and several hours, this option is not recommended for most " 423 "scenarios." 424 ), 425 default=False, 426 ), 427 ], 428 wait_timeout: Annotated[ 429 int, 430 Field( 431 description="Maximum time to wait for sync completion (seconds).", 432 default=300, 433 ), 434 ], 435) -> str: 436 """Run a sync job on Airbyte Cloud.""" 437 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 438 connection = workspace.get_connection(connection_id=connection_id) 439 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 440 441 if wait: 442 status = sync_result.get_job_status() 443 return ( 444 f"Sync completed with status: {status}. " 445 f"Job ID is '{sync_result.job_id}' and " 446 f"job URL is: {sync_result.job_url}" 447 ) 448 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
Run a sync job on Airbyte Cloud.
451@mcp_tool( 452 domain="cloud", 453 read_only=True, 454 idempotent=True, 455 open_world=True, 456 extra_help_text=CLOUD_AUTH_TIP_TEXT, 457) 458def check_airbyte_cloud_workspace( 459 *, 460 workspace_id: Annotated[ 461 str | None, 462 Field( 463 description=WORKSPACE_ID_TIP_TEXT, 464 default=None, 465 ), 466 ], 467) -> str: 468 """Check if we have a valid Airbyte Cloud connection and return workspace info. 469 470 Returns workspace ID and workspace URL for verification. 471 """ 472 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 473 workspace.connect() 474 475 return ( 476 f"✅ Successfully connected to Airbyte Cloud workspace.\n" 477 f"Workspace ID: {workspace.workspace_id}\n" 478 f"Workspace URL: {workspace.workspace_url}" 479 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace ID and workspace URL for verification.
482@mcp_tool( 483 domain="cloud", 484 open_world=True, 485 extra_help_text=CLOUD_AUTH_TIP_TEXT, 486) 487def deploy_noop_destination_to_cloud( 488 name: str = "No-op Destination", 489 *, 490 workspace_id: Annotated[ 491 str | None, 492 Field( 493 description=WORKSPACE_ID_TIP_TEXT, 494 default=None, 495 ), 496 ], 497 unique: bool = True, 498) -> str: 499 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 500 destination = get_noop_destination() 501 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 502 deployed_destination = workspace.deploy_destination( 503 name=name, 504 destination=destination, 505 unique=unique, 506 ) 507 register_guid_created_in_session(deployed_destination.connector_id) 508 return ( 509 f"Successfully deployed No-op Destination " 510 f"with ID '{deployed_destination.connector_id}' and " 511 f"URL: {deployed_destination.connector_url}" 512 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
515@mcp_tool( 516 domain="cloud", 517 read_only=True, 518 idempotent=True, 519 open_world=True, 520 extra_help_text=CLOUD_AUTH_TIP_TEXT, 521) 522def get_cloud_sync_status( 523 connection_id: Annotated[ 524 str, 525 Field( 526 description="The ID of the Airbyte Cloud connection.", 527 ), 528 ], 529 job_id: Annotated[ 530 int | None, 531 Field( 532 description="Optional job ID. If not provided, the latest job will be used.", 533 default=None, 534 ), 535 ], 536 *, 537 workspace_id: Annotated[ 538 str | None, 539 Field( 540 description=WORKSPACE_ID_TIP_TEXT, 541 default=None, 542 ), 543 ], 544 include_attempts: Annotated[ 545 bool, 546 Field( 547 description="Whether to include detailed attempts information.", 548 default=False, 549 ), 550 ], 551) -> dict[str, Any]: 552 """Get the status of a sync job from the Airbyte Cloud.""" 553 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 554 connection = workspace.get_connection(connection_id=connection_id) 555 556 # If a job ID is provided, get the job by ID. 557 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 558 559 if not sync_result: 560 return {"status": None, "job_id": None, "attempts": []} 561 562 result = { 563 "status": sync_result.get_job_status(), 564 "job_id": sync_result.job_id, 565 "bytes_synced": sync_result.bytes_synced, 566 "records_synced": sync_result.records_synced, 567 "start_time": sync_result.start_time.isoformat(), 568 "job_url": sync_result.job_url, 569 "attempts": [], 570 } 571 572 if include_attempts: 573 attempts = sync_result.get_attempts() 574 result["attempts"] = [ 575 { 576 "attempt_number": attempt.attempt_number, 577 "attempt_id": attempt.attempt_id, 578 "status": attempt.status, 579 "bytes_synced": attempt.bytes_synced, 580 "records_synced": attempt.records_synced, 581 "created_at": attempt.created_at.isoformat(), 582 } 583 for attempt in attempts 584 ] 585 586 return result
Get the status of a sync job from the Airbyte Cloud.
589@mcp_tool( 590 domain="cloud", 591 read_only=True, 592 idempotent=True, 593 open_world=True, 594 extra_help_text=CLOUD_AUTH_TIP_TEXT, 595) 596def list_deployed_cloud_source_connectors( 597 *, 598 workspace_id: Annotated[ 599 str | None, 600 Field( 601 description=WORKSPACE_ID_TIP_TEXT, 602 default=None, 603 ), 604 ], 605 name_contains: Annotated[ 606 str | None, 607 Field( 608 description="Optional case-insensitive substring to filter sources by name", 609 default=None, 610 ), 611 ], 612 max_items_limit: Annotated[ 613 int | None, 614 Field( 615 description="Optional maximum number of items to return (default: no limit)", 616 default=None, 617 ), 618 ], 619) -> list[CloudSourceResult]: 620 """List all deployed source connectors in the Airbyte Cloud workspace.""" 621 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 622 sources = workspace.list_sources() 623 624 # Filter by name if requested 625 if name_contains: 626 needle = name_contains.lower() 627 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 628 629 # Apply limit if requested 630 if max_items_limit is not None: 631 sources = sources[:max_items_limit] 632 633 # Note: name and url are guaranteed non-null from list API responses 634 return [ 635 CloudSourceResult( 636 id=source.source_id, 637 name=cast(str, source.name), 638 url=cast(str, source.connector_url), 639 ) 640 for source in sources 641 ]
List all deployed source connectors in the Airbyte Cloud workspace.
644@mcp_tool( 645 domain="cloud", 646 read_only=True, 647 idempotent=True, 648 open_world=True, 649 extra_help_text=CLOUD_AUTH_TIP_TEXT, 650) 651def list_deployed_cloud_destination_connectors( 652 *, 653 workspace_id: Annotated[ 654 str | None, 655 Field( 656 description=WORKSPACE_ID_TIP_TEXT, 657 default=None, 658 ), 659 ], 660 name_contains: Annotated[ 661 str | None, 662 Field( 663 description="Optional case-insensitive substring to filter destinations by name", 664 default=None, 665 ), 666 ], 667 max_items_limit: Annotated[ 668 int | None, 669 Field( 670 description="Optional maximum number of items to return (default: no limit)", 671 default=None, 672 ), 673 ], 674) -> list[CloudDestinationResult]: 675 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 676 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 677 destinations = workspace.list_destinations() 678 679 # Filter by name if requested 680 if name_contains: 681 needle = name_contains.lower() 682 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 683 684 # Apply limit if requested 685 if max_items_limit is not None: 686 destinations = destinations[:max_items_limit] 687 688 # Note: name and url are guaranteed non-null from list API responses 689 return [ 690 CloudDestinationResult( 691 id=destination.destination_id, 692 name=cast(str, destination.name), 693 url=cast(str, destination.connector_url), 694 ) 695 for destination in destinations 696 ]
List all deployed destination connectors in the Airbyte Cloud workspace.
699@mcp_tool( 700 domain="cloud", 701 read_only=True, 702 idempotent=True, 703 open_world=True, 704 extra_help_text=CLOUD_AUTH_TIP_TEXT, 705) 706def describe_cloud_source( 707 source_id: Annotated[ 708 str, 709 Field(description="The ID of the source to describe."), 710 ], 711 *, 712 workspace_id: Annotated[ 713 str | None, 714 Field( 715 description=WORKSPACE_ID_TIP_TEXT, 716 default=None, 717 ), 718 ], 719) -> CloudSourceDetails: 720 """Get detailed information about a specific deployed source connector.""" 721 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 722 source = workspace.get_source(source_id=source_id) 723 724 # Access name property to ensure _connector_info is populated 725 source_name = cast(str, source.name) 726 727 return CloudSourceDetails( 728 source_id=source.source_id, 729 source_name=source_name, 730 source_url=source.connector_url, 731 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 732 )
Get detailed information about a specific deployed source connector.
735@mcp_tool( 736 domain="cloud", 737 read_only=True, 738 idempotent=True, 739 open_world=True, 740 extra_help_text=CLOUD_AUTH_TIP_TEXT, 741) 742def describe_cloud_destination( 743 destination_id: Annotated[ 744 str, 745 Field(description="The ID of the destination to describe."), 746 ], 747 *, 748 workspace_id: Annotated[ 749 str | None, 750 Field( 751 description=WORKSPACE_ID_TIP_TEXT, 752 default=None, 753 ), 754 ], 755) -> CloudDestinationDetails: 756 """Get detailed information about a specific deployed destination connector.""" 757 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 758 destination = workspace.get_destination(destination_id=destination_id) 759 760 # Access name property to ensure _connector_info is populated 761 destination_name = cast(str, destination.name) 762 763 return CloudDestinationDetails( 764 destination_id=destination.destination_id, 765 destination_name=destination_name, 766 destination_url=destination.connector_url, 767 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 768 )
Get detailed information about a specific deployed destination connector.
771@mcp_tool( 772 domain="cloud", 773 read_only=True, 774 idempotent=True, 775 open_world=True, 776 extra_help_text=CLOUD_AUTH_TIP_TEXT, 777) 778def describe_cloud_connection( 779 connection_id: Annotated[ 780 str, 781 Field(description="The ID of the connection to describe."), 782 ], 783 *, 784 workspace_id: Annotated[ 785 str | None, 786 Field( 787 description=WORKSPACE_ID_TIP_TEXT, 788 default=None, 789 ), 790 ], 791) -> CloudConnectionDetails: 792 """Get detailed information about a specific deployed connection.""" 793 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 794 connection = workspace.get_connection(connection_id=connection_id) 795 796 return CloudConnectionDetails( 797 connection_id=connection.connection_id, 798 connection_name=cast(str, connection.name), 799 connection_url=cast(str, connection.connection_url), 800 source_id=connection.source_id, 801 source_name=cast(str, connection.source.name), 802 destination_id=connection.destination_id, 803 destination_name=cast(str, connection.destination.name), 804 selected_streams=connection.stream_names, 805 table_prefix=connection.table_prefix, 806 )
Get detailed information about a specific deployed connection.
809@mcp_tool( 810 domain="cloud", 811 read_only=True, 812 idempotent=True, 813 open_world=True, 814 extra_help_text=CLOUD_AUTH_TIP_TEXT, 815) 816def get_cloud_sync_logs( 817 connection_id: Annotated[ 818 str, 819 Field(description="The ID of the Airbyte Cloud connection."), 820 ], 821 job_id: Annotated[ 822 int | None, 823 Field(description="Optional job ID. If not provided, the latest job will be used."), 824 ] = None, 825 attempt_number: Annotated[ 826 int | None, 827 Field( 828 description="Optional attempt number. If not provided, the latest attempt will be used." 829 ), 830 ] = None, 831 *, 832 workspace_id: Annotated[ 833 str | None, 834 Field( 835 description=WORKSPACE_ID_TIP_TEXT, 836 default=None, 837 ), 838 ], 839 max_lines: Annotated[ 840 int, 841 Field( 842 description=( 843 "Maximum number of lines to return. " 844 "Defaults to 4000 if not specified. " 845 "If '0' is provided, no limit is applied." 846 ), 847 default=4000, 848 ), 849 ], 850 from_tail: Annotated[ 851 bool | None, 852 Field( 853 description=( 854 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 855 "Defaults to True if `line_offset` is not specified. " 856 "Cannot combine `from_tail=True` with `line_offset`." 857 ), 858 default=None, 859 ), 860 ], 861 line_offset: Annotated[ 862 int | None, 863 Field( 864 description=( 865 "Number of lines to skip from the beginning of the logs. " 866 "Cannot be combined with `from_tail=True`." 867 ), 868 default=None, 869 ), 870 ], 871) -> LogReadResult: 872 """Get the logs from a sync job attempt on Airbyte Cloud.""" 873 # Validate that line_offset and from_tail are not both set 874 if line_offset is not None and from_tail: 875 raise PyAirbyteInputError( 876 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 877 context={"line_offset": line_offset, "from_tail": from_tail}, 878 ) 879 880 if from_tail is None and line_offset is None: 881 from_tail = True 882 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 883 connection = workspace.get_connection(connection_id=connection_id) 884 885 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 886 887 if not sync_result: 888 raise AirbyteMissingResourceError( 889 resource_type="sync job", 890 resource_name_or_id=connection_id, 891 ) 892 893 attempts = sync_result.get_attempts() 894 895 if not attempts: 896 raise AirbyteMissingResourceError( 897 resource_type="sync attempt", 898 resource_name_or_id=str(sync_result.job_id), 899 ) 900 901 if attempt_number is not None: 902 target_attempt = None 903 for attempt in attempts: 904 if attempt.attempt_number == attempt_number: 905 target_attempt = attempt 906 break 907 908 if target_attempt is None: 909 raise AirbyteMissingResourceError( 910 resource_type="sync attempt", 911 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 912 ) 913 else: 914 target_attempt = max(attempts, key=lambda a: a.attempt_number) 915 916 logs = target_attempt.get_full_log_text() 917 918 if not logs: 919 # Return empty result with zero lines 920 return LogReadResult( 921 log_text=( 922 f"[No logs available for job '{sync_result.job_id}', " 923 f"attempt {target_attempt.attempt_number}.]" 924 ), 925 log_text_start_line=1, 926 log_text_line_count=0, 927 total_log_lines_available=0, 928 job_id=sync_result.job_id, 929 attempt_number=target_attempt.attempt_number, 930 ) 931 932 # Apply line limiting 933 log_lines = logs.splitlines() 934 total_lines = len(log_lines) 935 936 # Determine effective max_lines (0 means no limit) 937 effective_max = total_lines if max_lines == 0 else max_lines 938 939 # Calculate start_index and slice based on from_tail or line_offset 940 if from_tail: 941 start_index = max(0, total_lines - effective_max) 942 selected_lines = log_lines[start_index:][:effective_max] 943 else: 944 start_index = line_offset or 0 945 selected_lines = log_lines[start_index : start_index + effective_max] 946 947 return LogReadResult( 948 log_text="\n".join(selected_lines), 949 log_text_start_line=start_index + 1, # Convert to 1-based index 950 log_text_line_count=len(selected_lines), 951 total_log_lines_available=total_lines, 952 job_id=sync_result.job_id, 953 attempt_number=target_attempt.attempt_number, 954 )
Get the logs from a sync job attempt on Airbyte Cloud.
957@mcp_tool( 958 domain="cloud", 959 read_only=True, 960 idempotent=True, 961 open_world=True, 962 extra_help_text=CLOUD_AUTH_TIP_TEXT, 963) 964def list_deployed_cloud_connections( 965 *, 966 workspace_id: Annotated[ 967 str | None, 968 Field( 969 description=WORKSPACE_ID_TIP_TEXT, 970 default=None, 971 ), 972 ], 973 name_contains: Annotated[ 974 str | None, 975 Field( 976 description="Optional case-insensitive substring to filter connections by name", 977 default=None, 978 ), 979 ], 980 max_items_limit: Annotated[ 981 int | None, 982 Field( 983 description="Optional maximum number of items to return (default: no limit)", 984 default=None, 985 ), 986 ], 987 with_connection_status: Annotated[ 988 bool | None, 989 Field( 990 description="If True, include status info for each connection's most recent sync job", 991 default=False, 992 ), 993 ], 994 failing_connections_only: Annotated[ 995 bool | None, 996 Field( 997 description="If True, only return connections with failed/cancelled last sync", 998 default=False, 999 ), 1000 ], 1001) -> list[CloudConnectionResult]: 1002 """List all deployed connections in the Airbyte Cloud workspace. 1003 1004 When with_connection_status is True, each connection result will include 1005 information about the most recent sync job status, skipping over any 1006 currently in-progress syncs to find the last completed job. 1007 1008 When failing_connections_only is True, only connections where the most 1009 recent completed sync job failed or was cancelled will be returned. 1010 This implicitly enables with_connection_status. 1011 """ 1012 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1013 connections = workspace.list_connections() 1014 1015 # Filter by name if requested 1016 if name_contains: 1017 needle = name_contains.lower() 1018 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1019 1020 # If failing_connections_only is True, implicitly enable with_connection_status 1021 if failing_connections_only: 1022 with_connection_status = True 1023 1024 results: list[CloudConnectionResult] = [] 1025 1026 for connection in connections: 1027 last_job_status: str | None = None 1028 last_job_id: int | None = None 1029 last_job_time: str | None = None 1030 currently_running_job_id: int | None = None 1031 currently_running_job_start_time: str | None = None 1032 1033 if with_connection_status: 1034 sync_logs = connection.get_previous_sync_logs(limit=5) 1035 last_completed_job_status = None # Keep enum for comparison 1036 1037 for sync_result in sync_logs: 1038 job_status = sync_result.get_job_status() 1039 1040 if not sync_result.is_job_complete(): 1041 currently_running_job_id = sync_result.job_id 1042 currently_running_job_start_time = sync_result.start_time.isoformat() 1043 continue 1044 1045 last_completed_job_status = job_status 1046 last_job_status = str(job_status.value) if job_status else None 1047 last_job_id = sync_result.job_id 1048 last_job_time = sync_result.start_time.isoformat() 1049 break 1050 1051 if failing_connections_only and ( 1052 last_completed_job_status is None 1053 or last_completed_job_status not in FAILED_STATUSES 1054 ): 1055 continue 1056 1057 results.append( 1058 CloudConnectionResult( 1059 id=connection.connection_id, 1060 name=cast(str, connection.name), 1061 url=cast(str, connection.connection_url), 1062 source_id=connection.source_id, 1063 destination_id=connection.destination_id, 1064 last_job_status=last_job_status, 1065 last_job_id=last_job_id, 1066 last_job_time=last_job_time, 1067 currently_running_job_id=currently_running_job_id, 1068 currently_running_job_start_time=currently_running_job_start_time, 1069 ) 1070 ) 1071 1072 if max_items_limit is not None and len(results) >= max_items_limit: 1073 break 1074 1075 return results
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include information about the most recent sync job status, skipping over any currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most recent completed sync job failed or was cancelled will be returned. This implicitly enables with_connection_status.
1177@mcp_tool( 1178 domain="cloud", 1179 read_only=True, 1180 idempotent=True, 1181 open_world=True, 1182 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1183) 1184def list_cloud_workspaces( 1185 *, 1186 organization_id: Annotated[ 1187 str | None, 1188 Field( 1189 description="Organization ID. Required if organization_name is not provided.", 1190 default=None, 1191 ), 1192 ], 1193 organization_name: Annotated[ 1194 str | None, 1195 Field( 1196 description=( 1197 "Organization name (exact match). " "Required if organization_id is not provided." 1198 ), 1199 default=None, 1200 ), 1201 ], 1202 name_contains: Annotated[ 1203 str | None, 1204 Field( 1205 description="Optional substring to filter workspaces by name (server-side filtering)", 1206 default=None, 1207 ), 1208 ], 1209 max_items_limit: Annotated[ 1210 int | None, 1211 Field( 1212 description="Optional maximum number of items to return (default: no limit)", 1213 default=None, 1214 ), 1215 ], 1216) -> list[CloudWorkspaceResult]: 1217 """List all workspaces in a specific organization. 1218 1219 Requires either organization_id OR organization_name (exact match) to be provided. 1220 This tool will NOT list workspaces across all organizations - you must specify 1221 which organization to list workspaces from. 1222 """ 1223 api_root = resolve_cloud_api_url() 1224 client_id = resolve_cloud_client_id() 1225 client_secret = resolve_cloud_client_secret() 1226 1227 resolved_org_id = _resolve_organization_id( 1228 organization_id=organization_id, 1229 organization_name=organization_name, 1230 api_root=api_root, 1231 client_id=client_id, 1232 client_secret=client_secret, 1233 ) 1234 1235 workspaces = api_util.list_workspaces_in_organization( 1236 organization_id=resolved_org_id, 1237 api_root=api_root, 1238 client_id=client_id, 1239 client_secret=client_secret, 1240 name_contains=name_contains, 1241 max_items_limit=max_items_limit, 1242 ) 1243 1244 return [ 1245 CloudWorkspaceResult( 1246 id=ws.get("workspaceId", ""), 1247 name=ws.get("name", ""), 1248 organization_id=ws.get("organizationId", ""), 1249 ) 1250 for ws in workspaces 1251 ]
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided. This tool will NOT list workspaces across all organizations - you must specify which organization to list workspaces from.
1254@mcp_tool( 1255 domain="cloud", 1256 read_only=True, 1257 idempotent=True, 1258 open_world=True, 1259 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1260) 1261def describe_cloud_organization( 1262 *, 1263 organization_id: Annotated[ 1264 str | None, 1265 Field( 1266 description="Organization ID. Required if organization_name is not provided.", 1267 default=None, 1268 ), 1269 ], 1270 organization_name: Annotated[ 1271 str | None, 1272 Field( 1273 description=( 1274 "Organization name (exact match). " "Required if organization_id is not provided." 1275 ), 1276 default=None, 1277 ), 1278 ], 1279) -> CloudOrganizationResult: 1280 """Get details about a specific organization. 1281 1282 Requires either organization_id OR organization_name (exact match) to be provided. 1283 This tool is useful for looking up an organization's ID from its name, or vice versa. 1284 """ 1285 api_root = resolve_cloud_api_url() 1286 client_id = resolve_cloud_client_id() 1287 client_secret = resolve_cloud_client_secret() 1288 1289 org = _resolve_organization( 1290 organization_id=organization_id, 1291 organization_name=organization_name, 1292 api_root=api_root, 1293 client_id=client_id, 1294 client_secret=client_secret, 1295 ) 1296 1297 return CloudOrganizationResult( 1298 id=org.organization_id, 1299 name=org.organization_name, 1300 email=org.email, 1301 )
Get details about a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided. This tool is useful for looking up an organization's ID from its name, or vice versa.
1318@mcp_tool( 1319 domain="cloud", 1320 open_world=True, 1321 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1322) 1323def publish_custom_source_definition( 1324 name: Annotated[ 1325 str, 1326 Field(description="The name for the custom connector definition."), 1327 ], 1328 *, 1329 workspace_id: Annotated[ 1330 str | None, 1331 Field( 1332 description=WORKSPACE_ID_TIP_TEXT, 1333 default=None, 1334 ), 1335 ], 1336 manifest_yaml: Annotated[ 1337 str | Path | None, 1338 Field( 1339 description=( 1340 "The Low-code CDK manifest as a YAML string or file path. " 1341 "Required for YAML connectors." 1342 ), 1343 default=None, 1344 ), 1345 ] = None, 1346 unique: Annotated[ 1347 bool, 1348 Field( 1349 description="Whether to require a unique name.", 1350 default=True, 1351 ), 1352 ] = True, 1353 pre_validate: Annotated[ 1354 bool, 1355 Field( 1356 description="Whether to validate the manifest client-side before publishing.", 1357 default=True, 1358 ), 1359 ] = True, 1360) -> str: 1361 """Publish a custom YAML source connector definition to Airbyte Cloud. 1362 1363 Note: Only YAML (declarative) connectors are currently supported. 1364 Docker-based custom sources are not yet available. 1365 """ 1366 processed_manifest = manifest_yaml 1367 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1368 processed_manifest = Path(manifest_yaml) 1369 1370 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1371 custom_source = workspace.publish_custom_source_definition( 1372 name=name, 1373 manifest_yaml=processed_manifest, 1374 unique=unique, 1375 pre_validate=pre_validate, 1376 ) 1377 register_guid_created_in_session(custom_source.definition_id) 1378 return ( 1379 "Successfully published custom YAML source definition:\n" 1380 + _get_custom_source_definition_description( 1381 custom_source=custom_source, 1382 ) 1383 + "\n" 1384 )
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1387@mcp_tool( 1388 domain="cloud", 1389 read_only=True, 1390 idempotent=True, 1391 open_world=True, 1392) 1393def list_custom_source_definitions( 1394 *, 1395 workspace_id: Annotated[ 1396 str | None, 1397 Field( 1398 description=WORKSPACE_ID_TIP_TEXT, 1399 default=None, 1400 ), 1401 ], 1402) -> list[dict[str, Any]]: 1403 """List custom YAML source definitions in the Airbyte Cloud workspace. 1404 1405 Note: Only YAML (declarative) connectors are currently supported. 1406 Docker-based custom sources are not yet available. 1407 """ 1408 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1409 definitions = workspace.list_custom_source_definitions( 1410 definition_type="yaml", 1411 ) 1412 1413 return [ 1414 { 1415 "definition_id": d.definition_id, 1416 "name": d.name, 1417 "version": d.version, 1418 "connector_builder_project_url": d.connector_builder_project_url, 1419 } 1420 for d in definitions 1421 ]
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1424@mcp_tool( 1425 domain="cloud", 1426 destructive=True, 1427 open_world=True, 1428) 1429def update_custom_source_definition( 1430 definition_id: Annotated[ 1431 str, 1432 Field(description="The ID of the definition to update."), 1433 ], 1434 manifest_yaml: Annotated[ 1435 str | Path, 1436 Field( 1437 description="New manifest as YAML string or file path.", 1438 ), 1439 ], 1440 *, 1441 workspace_id: Annotated[ 1442 str | None, 1443 Field( 1444 description=WORKSPACE_ID_TIP_TEXT, 1445 default=None, 1446 ), 1447 ], 1448 pre_validate: Annotated[ 1449 bool, 1450 Field( 1451 description="Whether to validate the manifest client-side before updating.", 1452 default=True, 1453 ), 1454 ] = True, 1455) -> str: 1456 """Update a custom YAML source definition in Airbyte Cloud. 1457 1458 Note: Only YAML (declarative) connectors are currently supported. 1459 Docker-based custom sources are not yet available. 1460 """ 1461 check_guid_created_in_session(definition_id) 1462 processed_manifest = manifest_yaml 1463 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1464 processed_manifest = Path(manifest_yaml) 1465 1466 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1467 definition = workspace.get_custom_source_definition( 1468 definition_id=definition_id, 1469 definition_type="yaml", 1470 ) 1471 custom_source: CustomCloudSourceDefinition = definition.update_definition( 1472 manifest_yaml=processed_manifest, 1473 pre_validate=pre_validate, 1474 ) 1475 return ( 1476 "Successfully updated custom YAML source definition:\n" 1477 + _get_custom_source_definition_description( 1478 custom_source=custom_source, 1479 ) 1480 )
Update a custom YAML source definition in Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1483@mcp_tool( 1484 domain="cloud", 1485 destructive=True, 1486 open_world=True, 1487) 1488def permanently_delete_custom_source_definition( 1489 definition_id: Annotated[ 1490 str, 1491 Field(description="The ID of the custom source definition to delete."), 1492 ], 1493 name: Annotated[ 1494 str, 1495 Field(description="The expected name of the custom source definition (for verification)."), 1496 ], 1497 *, 1498 workspace_id: Annotated[ 1499 str | None, 1500 Field( 1501 description=WORKSPACE_ID_TIP_TEXT, 1502 default=None, 1503 ), 1504 ], 1505) -> str: 1506 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1507 1508 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1509 (case insensitive). 1510 1511 If the connector does not meet this requirement, the deletion will be rejected with a 1512 helpful error message. Instruct the user to rename the connector appropriately to authorize 1513 the deletion. 1514 1515 The provided name must match the actual name of the definition for the operation to proceed. 1516 This is a safety measure to ensure you are deleting the correct resource. 1517 1518 Note: Only YAML (declarative) connectors are currently supported. 1519 Docker-based custom sources are not yet available. 1520 """ 1521 check_guid_created_in_session(definition_id) 1522 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1523 definition = workspace.get_custom_source_definition( 1524 definition_id=definition_id, 1525 definition_type="yaml", 1526 ) 1527 actual_name: str = definition.name 1528 1529 # Verify the name matches 1530 if actual_name != name: 1531 raise PyAirbyteInputError( 1532 message=( 1533 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1534 "The provided name must exactly match the definition's actual name. " 1535 "This is a safety measure to prevent accidental deletion." 1536 ), 1537 context={ 1538 "definition_id": definition_id, 1539 "expected_name": name, 1540 "actual_name": actual_name, 1541 }, 1542 ) 1543 1544 definition.permanently_delete( 1545 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1546 ) 1547 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1550@mcp_tool( 1551 domain="cloud", 1552 destructive=True, 1553 open_world=True, 1554 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1555) 1556def permanently_delete_cloud_source( 1557 source_id: Annotated[ 1558 str, 1559 Field(description="The ID of the deployed source to delete."), 1560 ], 1561 name: Annotated[ 1562 str, 1563 Field(description="The expected name of the source (for verification)."), 1564 ], 1565) -> str: 1566 """Permanently delete a deployed source connector from Airbyte Cloud. 1567 1568 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1569 (case insensitive). 1570 1571 If the source does not meet this requirement, the deletion will be rejected with a 1572 helpful error message. Instruct the user to rename the source appropriately to authorize 1573 the deletion. 1574 1575 The provided name must match the actual name of the source for the operation to proceed. 1576 This is a safety measure to ensure you are deleting the correct resource. 1577 """ 1578 check_guid_created_in_session(source_id) 1579 workspace: CloudWorkspace = _get_cloud_workspace() 1580 source = workspace.get_source(source_id=source_id) 1581 actual_name: str = cast(str, source.name) 1582 1583 # Verify the name matches 1584 if actual_name != name: 1585 raise PyAirbyteInputError( 1586 message=( 1587 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1588 "The provided name must exactly match the source's actual name. " 1589 "This is a safety measure to prevent accidental deletion." 1590 ), 1591 context={ 1592 "source_id": source_id, 1593 "expected_name": name, 1594 "actual_name": actual_name, 1595 }, 1596 ) 1597 1598 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1599 workspace.permanently_delete_source( 1600 source=source_id, 1601 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1602 ) 1603 return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" (case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the source appropriately to authorize the deletion.
The provided name must match the actual name of the source for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1606@mcp_tool( 1607 domain="cloud", 1608 destructive=True, 1609 open_world=True, 1610 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1611) 1612def permanently_delete_cloud_destination( 1613 destination_id: Annotated[ 1614 str, 1615 Field(description="The ID of the deployed destination to delete."), 1616 ], 1617 name: Annotated[ 1618 str, 1619 Field(description="The expected name of the destination (for verification)."), 1620 ], 1621) -> str: 1622 """Permanently delete a deployed destination connector from Airbyte Cloud. 1623 1624 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1625 (case insensitive). 1626 1627 If the destination does not meet this requirement, the deletion will be rejected with a 1628 helpful error message. Instruct the user to rename the destination appropriately to authorize 1629 the deletion. 1630 1631 The provided name must match the actual name of the destination for the operation to proceed. 1632 This is a safety measure to ensure you are deleting the correct resource. 1633 """ 1634 check_guid_created_in_session(destination_id) 1635 workspace: CloudWorkspace = _get_cloud_workspace() 1636 destination = workspace.get_destination(destination_id=destination_id) 1637 actual_name: str = cast(str, destination.name) 1638 1639 # Verify the name matches 1640 if actual_name != name: 1641 raise PyAirbyteInputError( 1642 message=( 1643 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1644 "The provided name must exactly match the destination's actual name. " 1645 "This is a safety measure to prevent accidental deletion." 1646 ), 1647 context={ 1648 "destination_id": destination_id, 1649 "expected_name": name, 1650 "actual_name": actual_name, 1651 }, 1652 ) 1653 1654 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1655 workspace.permanently_delete_destination( 1656 destination=destination_id, 1657 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1658 ) 1659 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" (case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the destination appropriately to authorize the deletion.
The provided name must match the actual name of the destination for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1662@mcp_tool( 1663 domain="cloud", 1664 destructive=True, 1665 open_world=True, 1666 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1667) 1668def permanently_delete_cloud_connection( 1669 connection_id: Annotated[ 1670 str, 1671 Field(description="The ID of the connection to delete."), 1672 ], 1673 name: Annotated[ 1674 str, 1675 Field(description="The expected name of the connection (for verification)."), 1676 ], 1677 *, 1678 cascade_delete_source: Annotated[ 1679 bool, 1680 Field( 1681 description=( 1682 "Whether to also delete the source connector associated with this connection." 1683 ), 1684 default=False, 1685 ), 1686 ] = False, 1687 cascade_delete_destination: Annotated[ 1688 bool, 1689 Field( 1690 description=( 1691 "Whether to also delete the destination connector associated with this connection." 1692 ), 1693 default=False, 1694 ), 1695 ] = False, 1696) -> str: 1697 """Permanently delete a connection from Airbyte Cloud. 1698 1699 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 1700 (case insensitive). 1701 1702 If the connection does not meet this requirement, the deletion will be rejected with a 1703 helpful error message. Instruct the user to rename the connection appropriately to authorize 1704 the deletion. 1705 1706 The provided name must match the actual name of the connection for the operation to proceed. 1707 This is a safety measure to ensure you are deleting the correct resource. 1708 """ 1709 check_guid_created_in_session(connection_id) 1710 workspace: CloudWorkspace = _get_cloud_workspace() 1711 connection = workspace.get_connection(connection_id=connection_id) 1712 actual_name: str = cast(str, connection.name) 1713 1714 # Verify the name matches 1715 if actual_name != name: 1716 raise PyAirbyteInputError( 1717 message=( 1718 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1719 "The provided name must exactly match the connection's actual name. " 1720 "This is a safety measure to prevent accidental deletion." 1721 ), 1722 context={ 1723 "connection_id": connection_id, 1724 "expected_name": name, 1725 "actual_name": actual_name, 1726 }, 1727 ) 1728 1729 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1730 workspace.permanently_delete_connection( 1731 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1732 connection=connection_id, 1733 cascade_delete_source=cascade_delete_source, 1734 cascade_delete_destination=cascade_delete_destination, 1735 ) 1736 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" (case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connection appropriately to authorize the deletion.
The provided name must match the actual name of the connection for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1739@mcp_tool( 1740 domain="cloud", 1741 open_world=True, 1742 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1743) 1744def rename_cloud_source( 1745 source_id: Annotated[ 1746 str, 1747 Field(description="The ID of the deployed source to rename."), 1748 ], 1749 name: Annotated[ 1750 str, 1751 Field(description="New name for the source."), 1752 ], 1753 *, 1754 workspace_id: Annotated[ 1755 str | None, 1756 Field( 1757 description=WORKSPACE_ID_TIP_TEXT, 1758 default=None, 1759 ), 1760 ], 1761) -> str: 1762 """Rename a deployed source connector on Airbyte Cloud.""" 1763 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1764 source = workspace.get_source(source_id=source_id) 1765 source.rename(name=name) 1766 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
Rename a deployed source connector on Airbyte Cloud.
1769@mcp_tool( 1770 domain="cloud", 1771 destructive=True, 1772 open_world=True, 1773 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1774) 1775def update_cloud_source_config( 1776 source_id: Annotated[ 1777 str, 1778 Field(description="The ID of the deployed source to update."), 1779 ], 1780 config: Annotated[ 1781 dict | str, 1782 Field( 1783 description="New configuration for the source connector.", 1784 ), 1785 ], 1786 config_secret_name: Annotated[ 1787 str | None, 1788 Field( 1789 description="The name of the secret containing the configuration.", 1790 default=None, 1791 ), 1792 ] = None, 1793 *, 1794 workspace_id: Annotated[ 1795 str | None, 1796 Field( 1797 description=WORKSPACE_ID_TIP_TEXT, 1798 default=None, 1799 ), 1800 ], 1801) -> str: 1802 """Update a deployed source connector's configuration on Airbyte Cloud. 1803 1804 This is a destructive operation that can break existing connections if the 1805 configuration is changed incorrectly. Use with caution. 1806 """ 1807 check_guid_created_in_session(source_id) 1808 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1809 source = workspace.get_source(source_id=source_id) 1810 1811 config_dict = resolve_config( 1812 config=config, 1813 config_secret_name=config_secret_name, 1814 config_spec_jsonschema=None, # We don't have the spec here 1815 ) 1816 1817 source.update_config(config=config_dict) 1818 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
1821@mcp_tool( 1822 domain="cloud", 1823 open_world=True, 1824 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1825) 1826def rename_cloud_destination( 1827 destination_id: Annotated[ 1828 str, 1829 Field(description="The ID of the deployed destination to rename."), 1830 ], 1831 name: Annotated[ 1832 str, 1833 Field(description="New name for the destination."), 1834 ], 1835 *, 1836 workspace_id: Annotated[ 1837 str | None, 1838 Field( 1839 description=WORKSPACE_ID_TIP_TEXT, 1840 default=None, 1841 ), 1842 ], 1843) -> str: 1844 """Rename a deployed destination connector on Airbyte Cloud.""" 1845 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1846 destination = workspace.get_destination(destination_id=destination_id) 1847 destination.rename(name=name) 1848 return ( 1849 f"Successfully renamed destination '{destination_id}' to '{name}'. " 1850 f"URL: {destination.connector_url}" 1851 )
Rename a deployed destination connector on Airbyte Cloud.
1854@mcp_tool( 1855 domain="cloud", 1856 destructive=True, 1857 open_world=True, 1858 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1859) 1860def update_cloud_destination_config( 1861 destination_id: Annotated[ 1862 str, 1863 Field(description="The ID of the deployed destination to update."), 1864 ], 1865 config: Annotated[ 1866 dict | str, 1867 Field( 1868 description="New configuration for the destination connector.", 1869 ), 1870 ], 1871 config_secret_name: Annotated[ 1872 str | None, 1873 Field( 1874 description="The name of the secret containing the configuration.", 1875 default=None, 1876 ), 1877 ], 1878 *, 1879 workspace_id: Annotated[ 1880 str | None, 1881 Field( 1882 description=WORKSPACE_ID_TIP_TEXT, 1883 default=None, 1884 ), 1885 ], 1886) -> str: 1887 """Update a deployed destination connector's configuration on Airbyte Cloud. 1888 1889 This is a destructive operation that can break existing connections if the 1890 configuration is changed incorrectly. Use with caution. 1891 """ 1892 check_guid_created_in_session(destination_id) 1893 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1894 destination = workspace.get_destination(destination_id=destination_id) 1895 1896 config_dict = resolve_config( 1897 config=config, 1898 config_secret_name=config_secret_name, 1899 config_spec_jsonschema=None, # We don't have the spec here 1900 ) 1901 1902 destination.update_config(config=config_dict) 1903 return ( 1904 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 1905 )
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
1908@mcp_tool( 1909 domain="cloud", 1910 open_world=True, 1911 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1912) 1913def rename_cloud_connection( 1914 connection_id: Annotated[ 1915 str, 1916 Field(description="The ID of the connection to rename."), 1917 ], 1918 name: Annotated[ 1919 str, 1920 Field(description="New name for the connection."), 1921 ], 1922 *, 1923 workspace_id: Annotated[ 1924 str | None, 1925 Field( 1926 description=WORKSPACE_ID_TIP_TEXT, 1927 default=None, 1928 ), 1929 ], 1930) -> str: 1931 """Rename a connection on Airbyte Cloud.""" 1932 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1933 connection = workspace.get_connection(connection_id=connection_id) 1934 connection.rename(name=name) 1935 return ( 1936 f"Successfully renamed connection '{connection_id}' to '{name}'. " 1937 f"URL: {connection.connection_url}" 1938 )
Rename a connection on Airbyte Cloud.
1941@mcp_tool( 1942 domain="cloud", 1943 destructive=True, 1944 open_world=True, 1945 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1946) 1947def set_cloud_connection_table_prefix( 1948 connection_id: Annotated[ 1949 str, 1950 Field(description="The ID of the connection to update."), 1951 ], 1952 prefix: Annotated[ 1953 str, 1954 Field(description="New table prefix to use when syncing to the destination."), 1955 ], 1956 *, 1957 workspace_id: Annotated[ 1958 str | None, 1959 Field( 1960 description=WORKSPACE_ID_TIP_TEXT, 1961 default=None, 1962 ), 1963 ], 1964) -> str: 1965 """Set the table prefix for a connection on Airbyte Cloud. 1966 1967 This is a destructive operation that can break downstream dependencies if the 1968 table prefix is changed incorrectly. Use with caution. 1969 """ 1970 check_guid_created_in_session(connection_id) 1971 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1972 connection = workspace.get_connection(connection_id=connection_id) 1973 connection.set_table_prefix(prefix=prefix) 1974 return ( 1975 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 1976 f"URL: {connection.connection_url}" 1977 )
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.
1980@mcp_tool( 1981 domain="cloud", 1982 destructive=True, 1983 open_world=True, 1984 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1985) 1986def set_cloud_connection_selected_streams( 1987 connection_id: Annotated[ 1988 str, 1989 Field(description="The ID of the connection to update."), 1990 ], 1991 stream_names: Annotated[ 1992 str | list[str], 1993 Field( 1994 description=( 1995 "The selected stream names to sync within the connection. " 1996 "Must be an explicit stream name or list of streams." 1997 ) 1998 ), 1999 ], 2000 *, 2001 workspace_id: Annotated[ 2002 str | None, 2003 Field( 2004 description=WORKSPACE_ID_TIP_TEXT, 2005 default=None, 2006 ), 2007 ], 2008) -> str: 2009 """Set the selected streams for a connection on Airbyte Cloud. 2010 2011 This is a destructive operation that can break existing connections if the 2012 stream selection is changed incorrectly. Use with caution. 2013 """ 2014 check_guid_created_in_session(connection_id) 2015 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2016 connection = workspace.get_connection(connection_id=connection_id) 2017 2018 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2019 connection.set_selected_streams(stream_names=resolved_streams_list) 2020 2021 return ( 2022 f"Successfully set selected streams for connection '{connection_id}' " 2023 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2024 )
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.