airbyte.mcp.cloud_ops
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from pathlib import Path 5from typing import Annotated, Any, Literal, cast 6 7from fastmcp import FastMCP 8from pydantic import BaseModel, Field 9 10from airbyte import cloud, get_destination, get_source 11from airbyte._util import api_util 12from airbyte.cloud.connectors import CustomCloudSourceDefinition 13from airbyte.cloud.constants import FAILED_STATUSES 14from airbyte.cloud.workspaces import CloudWorkspace 15from airbyte.destinations.util import get_noop_destination 16from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 17from airbyte.mcp._tool_utils import ( 18 check_guid_created_in_session, 19 mcp_tool, 20 register_guid_created_in_session, 21 register_tools, 22) 23from airbyte.mcp._util import ( 24 resolve_cloud_credentials, 25 resolve_config, 26 resolve_list_of_strings, 27 resolve_workspace_id, 28) 29from airbyte.secrets import SecretString 30 31 32CLOUD_AUTH_TIP_TEXT = ( 33 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 34 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 35 "will be used to authenticate with the Airbyte Cloud API." 36) 37WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 38 39 40class CloudSourceResult(BaseModel): 41 """Information about a deployed source connector in Airbyte Cloud.""" 42 43 id: str 44 """The source ID.""" 45 name: str 46 """Display name of the source.""" 47 url: str 48 """Web URL for managing this source in Airbyte Cloud.""" 49 50 51class CloudDestinationResult(BaseModel): 52 """Information about a deployed destination connector in Airbyte Cloud.""" 53 54 id: str 55 """The destination ID.""" 56 name: str 57 """Display name of the destination.""" 58 url: str 59 """Web URL for managing this destination in Airbyte Cloud.""" 60 61 62class CloudConnectionResult(BaseModel): 63 """Information about a deployed connection in Airbyte Cloud.""" 64 65 id: str 66 """The connection ID.""" 67 name: str 68 """Display name of the connection.""" 69 url: str 70 """Web URL for managing this connection in Airbyte Cloud.""" 71 source_id: str 72 """ID of the source used by this connection.""" 73 destination_id: str 74 """ID of the destination used by this connection.""" 75 last_job_status: str | None = None 76 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 77 Only populated when with_connection_status=True.""" 78 last_job_id: int | None = None 79 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 80 last_job_time: str | None = None 81 """ISO 8601 timestamp of the most recent completed sync. 82 Only populated when with_connection_status=True.""" 83 currently_running_job_id: int | None = None 84 """Job ID of a currently running sync, if any. 85 Only populated when with_connection_status=True.""" 86 currently_running_job_start_time: str | None = None 87 """ISO 8601 timestamp of when the currently running sync started. 88 Only populated when with_connection_status=True.""" 89 90 91class CloudSourceDetails(BaseModel): 92 """Detailed information about a deployed source connector in Airbyte Cloud.""" 93 94 source_id: str 95 """The source ID.""" 96 source_name: str 97 """Display name of the source.""" 98 source_url: str 99 """Web URL for managing this source in Airbyte Cloud.""" 100 connector_definition_id: str 101 """The connector definition ID (e.g., the ID for 'source-postgres').""" 102 103 104class CloudDestinationDetails(BaseModel): 105 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 106 107 destination_id: str 108 """The destination ID.""" 109 destination_name: str 110 """Display name of the destination.""" 111 destination_url: str 112 """Web URL for managing this destination in Airbyte Cloud.""" 113 connector_definition_id: str 114 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 115 116 117class CloudConnectionDetails(BaseModel): 118 """Detailed information about a deployed connection in Airbyte Cloud.""" 119 120 connection_id: str 121 """The connection ID.""" 122 connection_name: str 123 """Display name of the connection.""" 124 connection_url: str 125 """Web URL for managing this connection in Airbyte Cloud.""" 126 source_id: str 127 """ID of the source used by this connection.""" 128 source_name: str 129 """Display name of the source.""" 130 destination_id: str 131 """ID of the destination used by this connection.""" 132 destination_name: str 133 """Display name of the destination.""" 134 selected_streams: list[str] 135 """List of stream names selected for syncing.""" 136 table_prefix: str | None 137 """Table prefix applied when syncing to the destination.""" 138 139 140class CloudOrganizationResult(BaseModel): 141 """Information about an organization in Airbyte Cloud.""" 142 143 id: str 144 """The organization ID.""" 145 name: str 146 """Display name of the organization.""" 147 email: str 148 """Email associated with the organization.""" 149 150 151class CloudWorkspaceResult(BaseModel): 152 """Information about a workspace in Airbyte Cloud.""" 153 154 workspace_id: str 155 """The workspace ID.""" 156 workspace_name: str 157 """Display name of the workspace.""" 158 workspace_url: str | None = None 159 """URL to access the workspace in Airbyte Cloud.""" 160 organization_id: str 161 """ID of the organization (requires ORGANIZATION_READER permission).""" 162 organization_name: str | None = None 163 """Name of the organization (requires ORGANIZATION_READER permission).""" 164 165 166class LogReadResult(BaseModel): 167 """Result of reading sync logs with pagination support.""" 168 169 job_id: int 170 """The job ID the logs belong to.""" 171 attempt_number: int 172 """The attempt number the logs belong to.""" 173 log_text: str 174 """The string containing the log text we are returning.""" 175 log_text_start_line: int 176 """1-based line index of the first line returned.""" 177 log_text_line_count: int 178 """Count of lines we are returning.""" 179 total_log_lines_available: int 180 """Total number of log lines available, shows if any lines were missed due to the limit.""" 181 182 183class SyncJobResult(BaseModel): 184 """Information about a sync job.""" 185 186 job_id: int 187 """The job ID.""" 188 status: str 189 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 190 bytes_synced: int 191 """Number of bytes synced in this job.""" 192 records_synced: int 193 """Number of records synced in this job.""" 194 start_time: str 195 """ISO 8601 timestamp of when the job started.""" 196 job_url: str 197 """URL to view the job in Airbyte Cloud.""" 198 199 200class SyncJobListResult(BaseModel): 201 """Result of listing sync jobs with pagination support.""" 202 203 jobs: list[SyncJobResult] 204 """List of sync jobs.""" 205 jobs_count: int 206 """Number of jobs returned in this response.""" 207 jobs_offset: int 208 """Offset used for this request (0 if not specified).""" 209 from_tail: bool 210 """Whether jobs are ordered newest-first (True) or oldest-first (False).""" 211 212 213def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: 214 """Get an authenticated CloudWorkspace. 215 216 Resolves credentials from multiple sources in order: 217 1. HTTP headers (when running as MCP server with HTTP/SSE transport) 218 2. Environment variables 219 220 Args: 221 workspace_id: Optional workspace ID. If not provided, uses HTTP headers 222 or the AIRBYTE_CLOUD_WORKSPACE_ID environment variable. 223 """ 224 credentials = resolve_cloud_credentials() 225 resolved_workspace_id = resolve_workspace_id(workspace_id) 226 227 return CloudWorkspace( 228 workspace_id=resolved_workspace_id, 229 client_id=credentials.client_id, 230 client_secret=credentials.client_secret, 231 bearer_token=credentials.bearer_token, 232 api_root=credentials.api_root, 233 ) 234 235 236@mcp_tool( 237 domain="cloud", 238 open_world=True, 239 extra_help_text=CLOUD_AUTH_TIP_TEXT, 240) 241def deploy_source_to_cloud( 242 source_name: Annotated[ 243 str, 244 Field(description="The name to use when deploying the source."), 245 ], 246 source_connector_name: Annotated[ 247 str, 248 Field(description="The name of the source connector (e.g., 'source-faker')."), 249 ], 250 *, 251 workspace_id: Annotated[ 252 str | None, 253 Field( 254 description=WORKSPACE_ID_TIP_TEXT, 255 default=None, 256 ), 257 ], 258 config: Annotated[ 259 dict | str | None, 260 Field( 261 description="The configuration for the source connector.", 262 default=None, 263 ), 264 ], 265 config_secret_name: Annotated[ 266 str | None, 267 Field( 268 description="The name of the secret containing the configuration.", 269 default=None, 270 ), 271 ], 272 unique: Annotated[ 273 bool, 274 Field( 275 description="Whether to require a unique name.", 276 default=True, 277 ), 278 ], 279) -> str: 280 """Deploy a source connector to Airbyte Cloud.""" 281 source = get_source( 282 source_connector_name, 283 no_executor=True, 284 ) 285 config_dict = resolve_config( 286 config=config, 287 config_secret_name=config_secret_name, 288 config_spec_jsonschema=source.config_spec, 289 ) 290 source.set_config(config_dict, validate=True) 291 292 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 293 deployed_source = workspace.deploy_source( 294 name=source_name, 295 source=source, 296 unique=unique, 297 ) 298 299 register_guid_created_in_session(deployed_source.connector_id) 300 return ( 301 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 302 f" and URL: {deployed_source.connector_url}" 303 ) 304 305 306@mcp_tool( 307 domain="cloud", 308 open_world=True, 309 extra_help_text=CLOUD_AUTH_TIP_TEXT, 310) 311def deploy_destination_to_cloud( 312 destination_name: Annotated[ 313 str, 314 Field(description="The name to use when deploying the destination."), 315 ], 316 destination_connector_name: Annotated[ 317 str, 318 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 319 ], 320 *, 321 workspace_id: Annotated[ 322 str | None, 323 Field( 324 description=WORKSPACE_ID_TIP_TEXT, 325 default=None, 326 ), 327 ], 328 config: Annotated[ 329 dict | str | None, 330 Field( 331 description="The configuration for the destination connector.", 332 default=None, 333 ), 334 ], 335 config_secret_name: Annotated[ 336 str | None, 337 Field( 338 description="The name of the secret containing the configuration.", 339 default=None, 340 ), 341 ], 342 unique: Annotated[ 343 bool, 344 Field( 345 description="Whether to require a unique name.", 346 default=True, 347 ), 348 ], 349) -> str: 350 """Deploy a destination connector to Airbyte Cloud.""" 351 destination = get_destination( 352 destination_connector_name, 353 no_executor=True, 354 ) 355 config_dict = resolve_config( 356 config=config, 357 config_secret_name=config_secret_name, 358 config_spec_jsonschema=destination.config_spec, 359 ) 360 destination.set_config(config_dict, validate=True) 361 362 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 363 deployed_destination = workspace.deploy_destination( 364 name=destination_name, 365 destination=destination, 366 unique=unique, 367 ) 368 369 register_guid_created_in_session(deployed_destination.connector_id) 370 return ( 371 f"Successfully deployed destination '{destination_name}' " 372 f"with ID: {deployed_destination.connector_id}" 373 ) 374 375 376@mcp_tool( 377 domain="cloud", 378 open_world=True, 379 extra_help_text=CLOUD_AUTH_TIP_TEXT, 380) 381def create_connection_on_cloud( 382 connection_name: Annotated[ 383 str, 384 Field(description="The name of the connection."), 385 ], 386 source_id: Annotated[ 387 str, 388 Field(description="The ID of the deployed source."), 389 ], 390 destination_id: Annotated[ 391 str, 392 Field(description="The ID of the deployed destination."), 393 ], 394 selected_streams: Annotated[ 395 str | list[str], 396 Field( 397 description=( 398 "The selected stream names to sync within the connection. " 399 "Must be an explicit stream name or list of streams. " 400 "Cannot be empty or '*'." 401 ) 402 ), 403 ], 404 *, 405 workspace_id: Annotated[ 406 str | None, 407 Field( 408 description=WORKSPACE_ID_TIP_TEXT, 409 default=None, 410 ), 411 ], 412 table_prefix: Annotated[ 413 str | None, 414 Field( 415 description="Optional table prefix to use when syncing to the destination.", 416 default=None, 417 ), 418 ], 419) -> str: 420 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 421 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 422 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 423 deployed_connection = workspace.deploy_connection( 424 connection_name=connection_name, 425 source=source_id, 426 destination=destination_id, 427 selected_streams=resolved_streams_list, 428 table_prefix=table_prefix, 429 ) 430 431 register_guid_created_in_session(deployed_connection.connection_id) 432 return ( 433 f"Successfully created connection '{connection_name}' " 434 f"with ID '{deployed_connection.connection_id}' and " 435 f"URL: {deployed_connection.connection_url}" 436 ) 437 438 439@mcp_tool( 440 domain="cloud", 441 open_world=True, 442 extra_help_text=CLOUD_AUTH_TIP_TEXT, 443) 444def run_cloud_sync( 445 connection_id: Annotated[ 446 str, 447 Field(description="The ID of the Airbyte Cloud connection."), 448 ], 449 *, 450 workspace_id: Annotated[ 451 str | None, 452 Field( 453 description=WORKSPACE_ID_TIP_TEXT, 454 default=None, 455 ), 456 ], 457 wait: Annotated[ 458 bool, 459 Field( 460 description=( 461 "Whether to wait for the sync to complete. Since a sync can take between several " 462 "minutes and several hours, this option is not recommended for most " 463 "scenarios." 464 ), 465 default=False, 466 ), 467 ], 468 wait_timeout: Annotated[ 469 int, 470 Field( 471 description="Maximum time to wait for sync completion (seconds).", 472 default=300, 473 ), 474 ], 475) -> str: 476 """Run a sync job on Airbyte Cloud.""" 477 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 478 connection = workspace.get_connection(connection_id=connection_id) 479 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 480 481 if wait: 482 status = sync_result.get_job_status() 483 return ( 484 f"Sync completed with status: {status}. " 485 f"Job ID is '{sync_result.job_id}' and " 486 f"job URL is: {sync_result.job_url}" 487 ) 488 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 489 490 491@mcp_tool( 492 domain="cloud", 493 read_only=True, 494 idempotent=True, 495 open_world=True, 496 extra_help_text=CLOUD_AUTH_TIP_TEXT, 497) 498def check_airbyte_cloud_workspace( 499 *, 500 workspace_id: Annotated[ 501 str | None, 502 Field( 503 description=WORKSPACE_ID_TIP_TEXT, 504 default=None, 505 ), 506 ], 507) -> CloudWorkspaceResult: 508 """Check if we have a valid Airbyte Cloud connection and return workspace info. 509 510 Returns workspace details including workspace ID, name, and organization info. 511 """ 512 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 513 514 # Get workspace details from the public API using workspace's credentials 515 workspace_response = api_util.get_workspace( 516 workspace_id=workspace.workspace_id, 517 api_root=workspace.api_root, 518 client_id=workspace.client_id, 519 client_secret=workspace.client_secret, 520 bearer_token=workspace.bearer_token, 521 ) 522 523 # Try to get organization info, but fail gracefully if we don't have permissions. 524 # Fetching organization info requires ORGANIZATION_READER permissions on the organization, 525 # which may not be available with workspace-scoped credentials. 526 organization = workspace.get_organization(raise_on_error=False) 527 528 return CloudWorkspaceResult( 529 workspace_id=workspace_response.workspace_id, 530 workspace_name=workspace_response.name, 531 workspace_url=workspace.workspace_url, 532 organization_id=( 533 organization.organization_id 534 if organization 535 else "[unavailable - requires ORGANIZATION_READER permission]" 536 ), 537 organization_name=organization.organization_name if organization else None, 538 ) 539 540 541@mcp_tool( 542 domain="cloud", 543 open_world=True, 544 extra_help_text=CLOUD_AUTH_TIP_TEXT, 545) 546def deploy_noop_destination_to_cloud( 547 name: str = "No-op Destination", 548 *, 549 workspace_id: Annotated[ 550 str | None, 551 Field( 552 description=WORKSPACE_ID_TIP_TEXT, 553 default=None, 554 ), 555 ], 556 unique: bool = True, 557) -> str: 558 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 559 destination = get_noop_destination() 560 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 561 deployed_destination = workspace.deploy_destination( 562 name=name, 563 destination=destination, 564 unique=unique, 565 ) 566 register_guid_created_in_session(deployed_destination.connector_id) 567 return ( 568 f"Successfully deployed No-op Destination " 569 f"with ID '{deployed_destination.connector_id}' and " 570 f"URL: {deployed_destination.connector_url}" 571 ) 572 573 574@mcp_tool( 575 domain="cloud", 576 read_only=True, 577 idempotent=True, 578 open_world=True, 579 extra_help_text=CLOUD_AUTH_TIP_TEXT, 580) 581def get_cloud_sync_status( 582 connection_id: Annotated[ 583 str, 584 Field( 585 description="The ID of the Airbyte Cloud connection.", 586 ), 587 ], 588 job_id: Annotated[ 589 int | None, 590 Field( 591 description="Optional job ID. If not provided, the latest job will be used.", 592 default=None, 593 ), 594 ], 595 *, 596 workspace_id: Annotated[ 597 str | None, 598 Field( 599 description=WORKSPACE_ID_TIP_TEXT, 600 default=None, 601 ), 602 ], 603 include_attempts: Annotated[ 604 bool, 605 Field( 606 description="Whether to include detailed attempts information.", 607 default=False, 608 ), 609 ], 610) -> dict[str, Any]: 611 """Get the status of a sync job from the Airbyte Cloud.""" 612 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 613 connection = workspace.get_connection(connection_id=connection_id) 614 615 # If a job ID is provided, get the job by ID. 616 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 617 618 if not sync_result: 619 return {"status": None, "job_id": None, "attempts": []} 620 621 result = { 622 "status": sync_result.get_job_status(), 623 "job_id": sync_result.job_id, 624 "bytes_synced": sync_result.bytes_synced, 625 "records_synced": sync_result.records_synced, 626 "start_time": sync_result.start_time.isoformat(), 627 "job_url": sync_result.job_url, 628 "attempts": [], 629 } 630 631 if include_attempts: 632 attempts = sync_result.get_attempts() 633 result["attempts"] = [ 634 { 635 "attempt_number": attempt.attempt_number, 636 "attempt_id": attempt.attempt_id, 637 "status": attempt.status, 638 "bytes_synced": attempt.bytes_synced, 639 "records_synced": attempt.records_synced, 640 "created_at": attempt.created_at.isoformat(), 641 } 642 for attempt in attempts 643 ] 644 645 return result 646 647 648@mcp_tool( 649 domain="cloud", 650 read_only=True, 651 idempotent=True, 652 open_world=True, 653 extra_help_text=CLOUD_AUTH_TIP_TEXT, 654) 655def list_cloud_sync_jobs( 656 connection_id: Annotated[ 657 str, 658 Field(description="The ID of the Airbyte Cloud connection."), 659 ], 660 *, 661 workspace_id: Annotated[ 662 str | None, 663 Field( 664 description=WORKSPACE_ID_TIP_TEXT, 665 default=None, 666 ), 667 ], 668 max_jobs: Annotated[ 669 int, 670 Field( 671 description=( 672 "Maximum number of jobs to return. " 673 "Defaults to 20 if not specified. " 674 "Maximum allowed value is 500." 675 ), 676 default=20, 677 ), 678 ], 679 from_tail: Annotated[ 680 bool | None, 681 Field( 682 description=( 683 "When True, jobs are ordered newest-first (createdAt DESC). " 684 "When False, jobs are ordered oldest-first (createdAt ASC). " 685 "Defaults to True if `jobs_offset` is not specified. " 686 "Cannot combine `from_tail=True` with `jobs_offset`." 687 ), 688 default=None, 689 ), 690 ], 691 jobs_offset: Annotated[ 692 int | None, 693 Field( 694 description=( 695 "Number of jobs to skip from the beginning. " 696 "Cannot be combined with `from_tail=True`." 697 ), 698 default=None, 699 ), 700 ], 701) -> SyncJobListResult: 702 """List sync jobs for a connection with pagination support. 703 704 This tool allows you to retrieve a list of sync jobs for a connection, 705 with control over ordering and pagination. By default, jobs are returned 706 newest-first (from_tail=True). 707 """ 708 # Validate that jobs_offset and from_tail are not both set 709 if jobs_offset is not None and from_tail is True: 710 raise PyAirbyteInputError( 711 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 712 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 713 ) 714 715 # Default to from_tail=True if neither is specified 716 if from_tail is None and jobs_offset is None: 717 from_tail = True 718 elif from_tail is None: 719 from_tail = False 720 721 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 722 connection = workspace.get_connection(connection_id=connection_id) 723 724 # Cap at 500 to avoid overloading agent context 725 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 726 727 sync_results = connection.get_previous_sync_logs( 728 limit=effective_limit, 729 offset=jobs_offset, 730 from_tail=from_tail, 731 ) 732 733 jobs = [ 734 SyncJobResult( 735 job_id=sync_result.job_id, 736 status=str(sync_result.get_job_status()), 737 bytes_synced=sync_result.bytes_synced, 738 records_synced=sync_result.records_synced, 739 start_time=sync_result.start_time.isoformat(), 740 job_url=sync_result.job_url, 741 ) 742 for sync_result in sync_results 743 ] 744 745 return SyncJobListResult( 746 jobs=jobs, 747 jobs_count=len(jobs), 748 jobs_offset=jobs_offset or 0, 749 from_tail=from_tail, 750 ) 751 752 753@mcp_tool( 754 domain="cloud", 755 read_only=True, 756 idempotent=True, 757 open_world=True, 758 extra_help_text=CLOUD_AUTH_TIP_TEXT, 759) 760def list_deployed_cloud_source_connectors( 761 *, 762 workspace_id: Annotated[ 763 str | None, 764 Field( 765 description=WORKSPACE_ID_TIP_TEXT, 766 default=None, 767 ), 768 ], 769 name_contains: Annotated[ 770 str | None, 771 Field( 772 description="Optional case-insensitive substring to filter sources by name", 773 default=None, 774 ), 775 ], 776 max_items_limit: Annotated[ 777 int | None, 778 Field( 779 description="Optional maximum number of items to return (default: no limit)", 780 default=None, 781 ), 782 ], 783) -> list[CloudSourceResult]: 784 """List all deployed source connectors in the Airbyte Cloud workspace.""" 785 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 786 sources = workspace.list_sources() 787 788 # Filter by name if requested 789 if name_contains: 790 needle = name_contains.lower() 791 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 792 793 # Apply limit if requested 794 if max_items_limit is not None: 795 sources = sources[:max_items_limit] 796 797 # Note: name and url are guaranteed non-null from list API responses 798 return [ 799 CloudSourceResult( 800 id=source.source_id, 801 name=cast(str, source.name), 802 url=cast(str, source.connector_url), 803 ) 804 for source in sources 805 ] 806 807 808@mcp_tool( 809 domain="cloud", 810 read_only=True, 811 idempotent=True, 812 open_world=True, 813 extra_help_text=CLOUD_AUTH_TIP_TEXT, 814) 815def list_deployed_cloud_destination_connectors( 816 *, 817 workspace_id: Annotated[ 818 str | None, 819 Field( 820 description=WORKSPACE_ID_TIP_TEXT, 821 default=None, 822 ), 823 ], 824 name_contains: Annotated[ 825 str | None, 826 Field( 827 description="Optional case-insensitive substring to filter destinations by name", 828 default=None, 829 ), 830 ], 831 max_items_limit: Annotated[ 832 int | None, 833 Field( 834 description="Optional maximum number of items to return (default: no limit)", 835 default=None, 836 ), 837 ], 838) -> list[CloudDestinationResult]: 839 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 840 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 841 destinations = workspace.list_destinations() 842 843 # Filter by name if requested 844 if name_contains: 845 needle = name_contains.lower() 846 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 847 848 # Apply limit if requested 849 if max_items_limit is not None: 850 destinations = destinations[:max_items_limit] 851 852 # Note: name and url are guaranteed non-null from list API responses 853 return [ 854 CloudDestinationResult( 855 id=destination.destination_id, 856 name=cast(str, destination.name), 857 url=cast(str, destination.connector_url), 858 ) 859 for destination in destinations 860 ] 861 862 863@mcp_tool( 864 domain="cloud", 865 read_only=True, 866 idempotent=True, 867 open_world=True, 868 extra_help_text=CLOUD_AUTH_TIP_TEXT, 869) 870def describe_cloud_source( 871 source_id: Annotated[ 872 str, 873 Field(description="The ID of the source to describe."), 874 ], 875 *, 876 workspace_id: Annotated[ 877 str | None, 878 Field( 879 description=WORKSPACE_ID_TIP_TEXT, 880 default=None, 881 ), 882 ], 883) -> CloudSourceDetails: 884 """Get detailed information about a specific deployed source connector.""" 885 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 886 source = workspace.get_source(source_id=source_id) 887 888 # Access name property to ensure _connector_info is populated 889 source_name = cast(str, source.name) 890 891 return CloudSourceDetails( 892 source_id=source.source_id, 893 source_name=source_name, 894 source_url=source.connector_url, 895 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 896 ) 897 898 899@mcp_tool( 900 domain="cloud", 901 read_only=True, 902 idempotent=True, 903 open_world=True, 904 extra_help_text=CLOUD_AUTH_TIP_TEXT, 905) 906def describe_cloud_destination( 907 destination_id: Annotated[ 908 str, 909 Field(description="The ID of the destination to describe."), 910 ], 911 *, 912 workspace_id: Annotated[ 913 str | None, 914 Field( 915 description=WORKSPACE_ID_TIP_TEXT, 916 default=None, 917 ), 918 ], 919) -> CloudDestinationDetails: 920 """Get detailed information about a specific deployed destination connector.""" 921 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 922 destination = workspace.get_destination(destination_id=destination_id) 923 924 # Access name property to ensure _connector_info is populated 925 destination_name = cast(str, destination.name) 926 927 return CloudDestinationDetails( 928 destination_id=destination.destination_id, 929 destination_name=destination_name, 930 destination_url=destination.connector_url, 931 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 932 ) 933 934 935@mcp_tool( 936 domain="cloud", 937 read_only=True, 938 idempotent=True, 939 open_world=True, 940 extra_help_text=CLOUD_AUTH_TIP_TEXT, 941) 942def describe_cloud_connection( 943 connection_id: Annotated[ 944 str, 945 Field(description="The ID of the connection to describe."), 946 ], 947 *, 948 workspace_id: Annotated[ 949 str | None, 950 Field( 951 description=WORKSPACE_ID_TIP_TEXT, 952 default=None, 953 ), 954 ], 955) -> CloudConnectionDetails: 956 """Get detailed information about a specific deployed connection.""" 957 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 958 connection = workspace.get_connection(connection_id=connection_id) 959 960 return CloudConnectionDetails( 961 connection_id=connection.connection_id, 962 connection_name=cast(str, connection.name), 963 connection_url=cast(str, connection.connection_url), 964 source_id=connection.source_id, 965 source_name=cast(str, connection.source.name), 966 destination_id=connection.destination_id, 967 destination_name=cast(str, connection.destination.name), 968 selected_streams=connection.stream_names, 969 table_prefix=connection.table_prefix, 970 ) 971 972 973@mcp_tool( 974 domain="cloud", 975 read_only=True, 976 idempotent=True, 977 open_world=True, 978 extra_help_text=CLOUD_AUTH_TIP_TEXT, 979) 980def get_cloud_sync_logs( 981 connection_id: Annotated[ 982 str, 983 Field(description="The ID of the Airbyte Cloud connection."), 984 ], 985 job_id: Annotated[ 986 int | None, 987 Field(description="Optional job ID. If not provided, the latest job will be used."), 988 ] = None, 989 attempt_number: Annotated[ 990 int | None, 991 Field( 992 description="Optional attempt number. If not provided, the latest attempt will be used." 993 ), 994 ] = None, 995 *, 996 workspace_id: Annotated[ 997 str | None, 998 Field( 999 description=WORKSPACE_ID_TIP_TEXT, 1000 default=None, 1001 ), 1002 ], 1003 max_lines: Annotated[ 1004 int, 1005 Field( 1006 description=( 1007 "Maximum number of lines to return. " 1008 "Defaults to 4000 if not specified. " 1009 "If '0' is provided, no limit is applied." 1010 ), 1011 default=4000, 1012 ), 1013 ], 1014 from_tail: Annotated[ 1015 bool | None, 1016 Field( 1017 description=( 1018 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1019 "Defaults to True if `line_offset` is not specified. " 1020 "Cannot combine `from_tail=True` with `line_offset`." 1021 ), 1022 default=None, 1023 ), 1024 ], 1025 line_offset: Annotated[ 1026 int | None, 1027 Field( 1028 description=( 1029 "Number of lines to skip from the beginning of the logs. " 1030 "Cannot be combined with `from_tail=True`." 1031 ), 1032 default=None, 1033 ), 1034 ], 1035) -> LogReadResult: 1036 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1037 # Validate that line_offset and from_tail are not both set 1038 if line_offset is not None and from_tail: 1039 raise PyAirbyteInputError( 1040 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1041 context={"line_offset": line_offset, "from_tail": from_tail}, 1042 ) 1043 1044 if from_tail is None and line_offset is None: 1045 from_tail = True 1046 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1047 connection = workspace.get_connection(connection_id=connection_id) 1048 1049 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1050 1051 if not sync_result: 1052 raise AirbyteMissingResourceError( 1053 resource_type="sync job", 1054 resource_name_or_id=connection_id, 1055 ) 1056 1057 attempts = sync_result.get_attempts() 1058 1059 if not attempts: 1060 raise AirbyteMissingResourceError( 1061 resource_type="sync attempt", 1062 resource_name_or_id=str(sync_result.job_id), 1063 ) 1064 1065 if attempt_number is not None: 1066 target_attempt = None 1067 for attempt in attempts: 1068 if attempt.attempt_number == attempt_number: 1069 target_attempt = attempt 1070 break 1071 1072 if target_attempt is None: 1073 raise AirbyteMissingResourceError( 1074 resource_type="sync attempt", 1075 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1076 ) 1077 else: 1078 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1079 1080 logs = target_attempt.get_full_log_text() 1081 1082 if not logs: 1083 # Return empty result with zero lines 1084 return LogReadResult( 1085 log_text=( 1086 f"[No logs available for job '{sync_result.job_id}', " 1087 f"attempt {target_attempt.attempt_number}.]" 1088 ), 1089 log_text_start_line=1, 1090 log_text_line_count=0, 1091 total_log_lines_available=0, 1092 job_id=sync_result.job_id, 1093 attempt_number=target_attempt.attempt_number, 1094 ) 1095 1096 # Apply line limiting 1097 log_lines = logs.splitlines() 1098 total_lines = len(log_lines) 1099 1100 # Determine effective max_lines (0 means no limit) 1101 effective_max = total_lines if max_lines == 0 else max_lines 1102 1103 # Calculate start_index and slice based on from_tail or line_offset 1104 if from_tail: 1105 start_index = max(0, total_lines - effective_max) 1106 selected_lines = log_lines[start_index:][:effective_max] 1107 else: 1108 start_index = line_offset or 0 1109 selected_lines = log_lines[start_index : start_index + effective_max] 1110 1111 return LogReadResult( 1112 log_text="\n".join(selected_lines), 1113 log_text_start_line=start_index + 1, # Convert to 1-based index 1114 log_text_line_count=len(selected_lines), 1115 total_log_lines_available=total_lines, 1116 job_id=sync_result.job_id, 1117 attempt_number=target_attempt.attempt_number, 1118 ) 1119 1120 1121@mcp_tool( 1122 domain="cloud", 1123 read_only=True, 1124 idempotent=True, 1125 open_world=True, 1126 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1127) 1128def list_deployed_cloud_connections( 1129 *, 1130 workspace_id: Annotated[ 1131 str | None, 1132 Field( 1133 description=WORKSPACE_ID_TIP_TEXT, 1134 default=None, 1135 ), 1136 ], 1137 name_contains: Annotated[ 1138 str | None, 1139 Field( 1140 description="Optional case-insensitive substring to filter connections by name", 1141 default=None, 1142 ), 1143 ], 1144 max_items_limit: Annotated[ 1145 int | None, 1146 Field( 1147 description="Optional maximum number of items to return (default: no limit)", 1148 default=None, 1149 ), 1150 ], 1151 with_connection_status: Annotated[ 1152 bool | None, 1153 Field( 1154 description="If True, include status info for each connection's most recent sync job", 1155 default=False, 1156 ), 1157 ], 1158 failing_connections_only: Annotated[ 1159 bool | None, 1160 Field( 1161 description="If True, only return connections with failed/cancelled last sync", 1162 default=False, 1163 ), 1164 ], 1165) -> list[CloudConnectionResult]: 1166 """List all deployed connections in the Airbyte Cloud workspace. 1167 1168 When with_connection_status is True, each connection result will include 1169 information about the most recent sync job status, skipping over any 1170 currently in-progress syncs to find the last completed job. 1171 1172 When failing_connections_only is True, only connections where the most 1173 recent completed sync job failed or was cancelled will be returned. 1174 This implicitly enables with_connection_status. 1175 """ 1176 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1177 connections = workspace.list_connections() 1178 1179 # Filter by name if requested 1180 if name_contains: 1181 needle = name_contains.lower() 1182 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1183 1184 # If failing_connections_only is True, implicitly enable with_connection_status 1185 if failing_connections_only: 1186 with_connection_status = True 1187 1188 results: list[CloudConnectionResult] = [] 1189 1190 for connection in connections: 1191 last_job_status: str | None = None 1192 last_job_id: int | None = None 1193 last_job_time: str | None = None 1194 currently_running_job_id: int | None = None 1195 currently_running_job_start_time: str | None = None 1196 1197 if with_connection_status: 1198 sync_logs = connection.get_previous_sync_logs(limit=5) 1199 last_completed_job_status = None # Keep enum for comparison 1200 1201 for sync_result in sync_logs: 1202 job_status = sync_result.get_job_status() 1203 1204 if not sync_result.is_job_complete(): 1205 currently_running_job_id = sync_result.job_id 1206 currently_running_job_start_time = sync_result.start_time.isoformat() 1207 continue 1208 1209 last_completed_job_status = job_status 1210 last_job_status = str(job_status.value) if job_status else None 1211 last_job_id = sync_result.job_id 1212 last_job_time = sync_result.start_time.isoformat() 1213 break 1214 1215 if failing_connections_only and ( 1216 last_completed_job_status is None 1217 or last_completed_job_status not in FAILED_STATUSES 1218 ): 1219 continue 1220 1221 results.append( 1222 CloudConnectionResult( 1223 id=connection.connection_id, 1224 name=cast(str, connection.name), 1225 url=cast(str, connection.connection_url), 1226 source_id=connection.source_id, 1227 destination_id=connection.destination_id, 1228 last_job_status=last_job_status, 1229 last_job_id=last_job_id, 1230 last_job_time=last_job_time, 1231 currently_running_job_id=currently_running_job_id, 1232 currently_running_job_start_time=currently_running_job_start_time, 1233 ) 1234 ) 1235 1236 if max_items_limit is not None and len(results) >= max_items_limit: 1237 break 1238 1239 return results 1240 1241 1242def _resolve_organization( 1243 organization_id: str | None, 1244 organization_name: str | None, 1245 *, 1246 api_root: str, 1247 client_id: SecretString | None, 1248 client_secret: SecretString | None, 1249 bearer_token: SecretString | None = None, 1250) -> api_util.models.OrganizationResponse: 1251 """Resolve organization from either ID or exact name match. 1252 1253 Args: 1254 organization_id: The organization ID (if provided directly) 1255 organization_name: The organization name (exact match required) 1256 api_root: The API root URL 1257 client_id: OAuth client ID (optional if bearer_token is provided) 1258 client_secret: OAuth client secret (optional if bearer_token is provided) 1259 bearer_token: Bearer token for authentication (optional if client credentials provided) 1260 1261 Returns: 1262 The resolved OrganizationResponse object 1263 1264 Raises: 1265 PyAirbyteInputError: If neither or both parameters are provided, 1266 or if no organization matches the exact name 1267 AirbyteMissingResourceError: If the organization is not found 1268 """ 1269 if organization_id and organization_name: 1270 raise PyAirbyteInputError( 1271 message="Provide either 'organization_id' or 'organization_name', not both." 1272 ) 1273 if not organization_id and not organization_name: 1274 raise PyAirbyteInputError( 1275 message="Either 'organization_id' or 'organization_name' must be provided." 1276 ) 1277 1278 # Get all organizations for the user 1279 orgs = api_util.list_organizations_for_user( 1280 api_root=api_root, 1281 client_id=client_id, 1282 client_secret=client_secret, 1283 bearer_token=bearer_token, 1284 ) 1285 1286 if organization_id: 1287 # Find by ID 1288 matching_orgs = [org for org in orgs if org.organization_id == organization_id] 1289 if not matching_orgs: 1290 raise AirbyteMissingResourceError( 1291 resource_type="organization", 1292 context={ 1293 "organization_id": organization_id, 1294 "message": f"No organization found with ID '{organization_id}' " 1295 "for the current user.", 1296 }, 1297 ) 1298 return matching_orgs[0] 1299 1300 # Find by exact name match (case-sensitive) 1301 matching_orgs = [org for org in orgs if org.organization_name == organization_name] 1302 1303 if not matching_orgs: 1304 raise AirbyteMissingResourceError( 1305 resource_type="organization", 1306 context={ 1307 "organization_name": organization_name, 1308 "message": f"No organization found with exact name '{organization_name}' " 1309 "for the current user.", 1310 }, 1311 ) 1312 1313 if len(matching_orgs) > 1: 1314 raise PyAirbyteInputError( 1315 message=f"Multiple organizations found with name '{organization_name}'. " 1316 "Please use 'organization_id' instead to specify the exact organization." 1317 ) 1318 1319 return matching_orgs[0] 1320 1321 1322def _resolve_organization_id( 1323 organization_id: str | None, 1324 organization_name: str | None, 1325 *, 1326 api_root: str, 1327 client_id: SecretString | None, 1328 client_secret: SecretString | None, 1329 bearer_token: SecretString | None = None, 1330) -> str: 1331 """Resolve organization ID from either ID or exact name match. 1332 1333 This is a convenience wrapper around _resolve_organization that returns just the ID. 1334 """ 1335 org = _resolve_organization( 1336 organization_id=organization_id, 1337 organization_name=organization_name, 1338 api_root=api_root, 1339 client_id=client_id, 1340 client_secret=client_secret, 1341 bearer_token=bearer_token, 1342 ) 1343 return org.organization_id 1344 1345 1346@mcp_tool( 1347 domain="cloud", 1348 read_only=True, 1349 idempotent=True, 1350 open_world=True, 1351 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1352) 1353def list_cloud_workspaces( 1354 *, 1355 organization_id: Annotated[ 1356 str | None, 1357 Field( 1358 description="Organization ID. Required if organization_name is not provided.", 1359 default=None, 1360 ), 1361 ], 1362 organization_name: Annotated[ 1363 str | None, 1364 Field( 1365 description=( 1366 "Organization name (exact match). " "Required if organization_id is not provided." 1367 ), 1368 default=None, 1369 ), 1370 ], 1371 name_contains: Annotated[ 1372 str | None, 1373 Field( 1374 description="Optional substring to filter workspaces by name (server-side filtering)", 1375 default=None, 1376 ), 1377 ], 1378 max_items_limit: Annotated[ 1379 int | None, 1380 Field( 1381 description="Optional maximum number of items to return (default: no limit)", 1382 default=None, 1383 ), 1384 ], 1385) -> list[CloudWorkspaceResult]: 1386 """List all workspaces in a specific organization. 1387 1388 Requires either organization_id OR organization_name (exact match) to be provided. 1389 This tool will NOT list workspaces across all organizations - you must specify 1390 which organization to list workspaces from. 1391 """ 1392 credentials = resolve_cloud_credentials() 1393 1394 resolved_org_id = _resolve_organization_id( 1395 organization_id=organization_id, 1396 organization_name=organization_name, 1397 api_root=credentials.api_root, 1398 client_id=credentials.client_id, 1399 client_secret=credentials.client_secret, 1400 bearer_token=credentials.bearer_token, 1401 ) 1402 1403 workspaces = api_util.list_workspaces_in_organization( 1404 organization_id=resolved_org_id, 1405 api_root=credentials.api_root, 1406 client_id=credentials.client_id, 1407 client_secret=credentials.client_secret, 1408 bearer_token=credentials.bearer_token, 1409 name_contains=name_contains, 1410 max_items_limit=max_items_limit, 1411 ) 1412 1413 return [ 1414 CloudWorkspaceResult( 1415 workspace_id=ws.get("workspaceId", ""), 1416 workspace_name=ws.get("name", ""), 1417 organization_id=ws.get("organizationId", ""), 1418 ) 1419 for ws in workspaces 1420 ] 1421 1422 1423@mcp_tool( 1424 domain="cloud", 1425 read_only=True, 1426 idempotent=True, 1427 open_world=True, 1428 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1429) 1430def describe_cloud_organization( 1431 *, 1432 organization_id: Annotated[ 1433 str | None, 1434 Field( 1435 description="Organization ID. Required if organization_name is not provided.", 1436 default=None, 1437 ), 1438 ], 1439 organization_name: Annotated[ 1440 str | None, 1441 Field( 1442 description=( 1443 "Organization name (exact match). " "Required if organization_id is not provided." 1444 ), 1445 default=None, 1446 ), 1447 ], 1448) -> CloudOrganizationResult: 1449 """Get details about a specific organization. 1450 1451 Requires either organization_id OR organization_name (exact match) to be provided. 1452 This tool is useful for looking up an organization's ID from its name, or vice versa. 1453 """ 1454 credentials = resolve_cloud_credentials() 1455 1456 org = _resolve_organization( 1457 organization_id=organization_id, 1458 organization_name=organization_name, 1459 api_root=credentials.api_root, 1460 client_id=credentials.client_id, 1461 client_secret=credentials.client_secret, 1462 bearer_token=credentials.bearer_token, 1463 ) 1464 1465 return CloudOrganizationResult( 1466 id=org.organization_id, 1467 name=org.organization_name, 1468 email=org.email, 1469 ) 1470 1471 1472def _get_custom_source_definition_description( 1473 custom_source: CustomCloudSourceDefinition, 1474) -> str: 1475 return "\n".join( 1476 [ 1477 f" - Custom Source Name: {custom_source.name}", 1478 f" - Definition ID: {custom_source.definition_id}", 1479 f" - Definition Version: {custom_source.version}", 1480 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1481 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1482 ] 1483 ) 1484 1485 1486@mcp_tool( 1487 domain="cloud", 1488 open_world=True, 1489 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1490) 1491def publish_custom_source_definition( 1492 name: Annotated[ 1493 str, 1494 Field(description="The name for the custom connector definition."), 1495 ], 1496 *, 1497 workspace_id: Annotated[ 1498 str | None, 1499 Field( 1500 description=WORKSPACE_ID_TIP_TEXT, 1501 default=None, 1502 ), 1503 ], 1504 manifest_yaml: Annotated[ 1505 str | Path | None, 1506 Field( 1507 description=( 1508 "The Low-code CDK manifest as a YAML string or file path. " 1509 "Required for YAML connectors." 1510 ), 1511 default=None, 1512 ), 1513 ] = None, 1514 unique: Annotated[ 1515 bool, 1516 Field( 1517 description="Whether to require a unique name.", 1518 default=True, 1519 ), 1520 ] = True, 1521 pre_validate: Annotated[ 1522 bool, 1523 Field( 1524 description="Whether to validate the manifest client-side before publishing.", 1525 default=True, 1526 ), 1527 ] = True, 1528 testing_values: Annotated[ 1529 dict | str | None, 1530 Field( 1531 description=( 1532 "Optional testing configuration values for the Builder UI. " 1533 "Can be provided as a JSON object or JSON string. " 1534 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1535 "If provided, these values replace any existing testing values " 1536 "for the connector builder project, allowing immediate test read operations." 1537 ), 1538 default=None, 1539 ), 1540 ], 1541 testing_values_secret_name: Annotated[ 1542 str | None, 1543 Field( 1544 description=( 1545 "Optional name of a secret containing testing configuration values " 1546 "in JSON or YAML format. The secret will be resolved by the MCP " 1547 "server and merged into testing_values, with secret values taking " 1548 "precedence. This lets the agent reference secrets without sending " 1549 "raw values as tool arguments." 1550 ), 1551 default=None, 1552 ), 1553 ], 1554) -> str: 1555 """Publish a custom YAML source connector definition to Airbyte Cloud. 1556 1557 Note: Only YAML (declarative) connectors are currently supported. 1558 Docker-based custom sources are not yet available. 1559 """ 1560 processed_manifest = manifest_yaml 1561 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1562 processed_manifest = Path(manifest_yaml) 1563 1564 # Resolve testing values from inline config and/or secret 1565 testing_values_dict: dict[str, Any] | None = None 1566 if testing_values is not None or testing_values_secret_name is not None: 1567 testing_values_dict = ( 1568 resolve_config( 1569 config=testing_values, 1570 config_secret_name=testing_values_secret_name, 1571 ) 1572 or None 1573 ) 1574 1575 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1576 custom_source = workspace.publish_custom_source_definition( 1577 name=name, 1578 manifest_yaml=processed_manifest, 1579 unique=unique, 1580 pre_validate=pre_validate, 1581 testing_values=testing_values_dict, 1582 ) 1583 register_guid_created_in_session(custom_source.definition_id) 1584 return ( 1585 "Successfully published custom YAML source definition:\n" 1586 + _get_custom_source_definition_description( 1587 custom_source=custom_source, 1588 ) 1589 + "\n" 1590 ) 1591 1592 1593@mcp_tool( 1594 domain="cloud", 1595 read_only=True, 1596 idempotent=True, 1597 open_world=True, 1598) 1599def list_custom_source_definitions( 1600 *, 1601 workspace_id: Annotated[ 1602 str | None, 1603 Field( 1604 description=WORKSPACE_ID_TIP_TEXT, 1605 default=None, 1606 ), 1607 ], 1608) -> list[dict[str, Any]]: 1609 """List custom YAML source definitions in the Airbyte Cloud workspace. 1610 1611 Note: Only YAML (declarative) connectors are currently supported. 1612 Docker-based custom sources are not yet available. 1613 """ 1614 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1615 definitions = workspace.list_custom_source_definitions( 1616 definition_type="yaml", 1617 ) 1618 1619 return [ 1620 { 1621 "definition_id": d.definition_id, 1622 "name": d.name, 1623 "version": d.version, 1624 "connector_builder_project_url": d.connector_builder_project_url, 1625 } 1626 for d in definitions 1627 ] 1628 1629 1630@mcp_tool( 1631 domain="cloud", 1632 destructive=True, 1633 open_world=True, 1634) 1635def update_custom_source_definition( 1636 definition_id: Annotated[ 1637 str, 1638 Field(description="The ID of the definition to update."), 1639 ], 1640 manifest_yaml: Annotated[ 1641 str | Path | None, 1642 Field( 1643 description=( 1644 "New manifest as YAML string or file path. " 1645 "Optional; omit to update only testing values." 1646 ), 1647 default=None, 1648 ), 1649 ] = None, 1650 *, 1651 workspace_id: Annotated[ 1652 str | None, 1653 Field( 1654 description=WORKSPACE_ID_TIP_TEXT, 1655 default=None, 1656 ), 1657 ], 1658 pre_validate: Annotated[ 1659 bool, 1660 Field( 1661 description="Whether to validate the manifest client-side before updating.", 1662 default=True, 1663 ), 1664 ] = True, 1665 testing_values: Annotated[ 1666 dict | str | None, 1667 Field( 1668 description=( 1669 "Optional testing configuration values for the Builder UI. " 1670 "Can be provided as a JSON object or JSON string. " 1671 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1672 "If provided, these values replace any existing testing values " 1673 "for the connector builder project. The entire testing values object " 1674 "is overwritten, so pass the full set of values you want to persist." 1675 ), 1676 default=None, 1677 ), 1678 ], 1679 testing_values_secret_name: Annotated[ 1680 str | None, 1681 Field( 1682 description=( 1683 "Optional name of a secret containing testing configuration values " 1684 "in JSON or YAML format. The secret will be resolved by the MCP " 1685 "server and merged into testing_values, with secret values taking " 1686 "precedence. This lets the agent reference secrets without sending " 1687 "raw values as tool arguments." 1688 ), 1689 default=None, 1690 ), 1691 ], 1692) -> str: 1693 """Update a custom YAML source definition in Airbyte Cloud. 1694 1695 Updates the manifest and/or testing values for an existing custom source definition. 1696 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1697 """ 1698 check_guid_created_in_session(definition_id) 1699 1700 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1701 1702 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1703 raise PyAirbyteInputError( 1704 message=( 1705 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1706 "must be provided to update a custom source definition." 1707 ), 1708 context={ 1709 "definition_id": definition_id, 1710 "workspace_id": workspace.workspace_id, 1711 }, 1712 ) 1713 1714 processed_manifest: str | Path | None = manifest_yaml 1715 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1716 processed_manifest = Path(manifest_yaml) 1717 1718 # Resolve testing values from inline config and/or secret 1719 testing_values_dict: dict[str, Any] | None = None 1720 if testing_values is not None or testing_values_secret_name is not None: 1721 testing_values_dict = ( 1722 resolve_config( 1723 config=testing_values, 1724 config_secret_name=testing_values_secret_name, 1725 ) 1726 or None 1727 ) 1728 1729 definition = workspace.get_custom_source_definition( 1730 definition_id=definition_id, 1731 definition_type="yaml", 1732 ) 1733 custom_source: CustomCloudSourceDefinition = definition 1734 1735 if processed_manifest is not None: 1736 custom_source = definition.update_definition( 1737 manifest_yaml=processed_manifest, 1738 pre_validate=pre_validate, 1739 ) 1740 1741 if testing_values_dict is not None: 1742 custom_source.set_testing_values(testing_values_dict) 1743 1744 return ( 1745 "Successfully updated custom YAML source definition:\n" 1746 + _get_custom_source_definition_description( 1747 custom_source=custom_source, 1748 ) 1749 ) 1750 1751 1752@mcp_tool( 1753 domain="cloud", 1754 destructive=True, 1755 open_world=True, 1756) 1757def permanently_delete_custom_source_definition( 1758 definition_id: Annotated[ 1759 str, 1760 Field(description="The ID of the custom source definition to delete."), 1761 ], 1762 name: Annotated[ 1763 str, 1764 Field(description="The expected name of the custom source definition (for verification)."), 1765 ], 1766 *, 1767 workspace_id: Annotated[ 1768 str | None, 1769 Field( 1770 description=WORKSPACE_ID_TIP_TEXT, 1771 default=None, 1772 ), 1773 ], 1774) -> str: 1775 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1776 1777 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1778 (case insensitive). 1779 1780 If the connector does not meet this requirement, the deletion will be rejected with a 1781 helpful error message. Instruct the user to rename the connector appropriately to authorize 1782 the deletion. 1783 1784 The provided name must match the actual name of the definition for the operation to proceed. 1785 This is a safety measure to ensure you are deleting the correct resource. 1786 1787 Note: Only YAML (declarative) connectors are currently supported. 1788 Docker-based custom sources are not yet available. 1789 """ 1790 check_guid_created_in_session(definition_id) 1791 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1792 definition = workspace.get_custom_source_definition( 1793 definition_id=definition_id, 1794 definition_type="yaml", 1795 ) 1796 actual_name: str = definition.name 1797 1798 # Verify the name matches 1799 if actual_name != name: 1800 raise PyAirbyteInputError( 1801 message=( 1802 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1803 "The provided name must exactly match the definition's actual name. " 1804 "This is a safety measure to prevent accidental deletion." 1805 ), 1806 context={ 1807 "definition_id": definition_id, 1808 "expected_name": name, 1809 "actual_name": actual_name, 1810 }, 1811 ) 1812 1813 definition.permanently_delete( 1814 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1815 ) 1816 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 1817 1818 1819@mcp_tool( 1820 domain="cloud", 1821 destructive=True, 1822 open_world=True, 1823 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1824) 1825def permanently_delete_cloud_source( 1826 source_id: Annotated[ 1827 str, 1828 Field(description="The ID of the deployed source to delete."), 1829 ], 1830 name: Annotated[ 1831 str, 1832 Field(description="The expected name of the source (for verification)."), 1833 ], 1834) -> str: 1835 """Permanently delete a deployed source connector from Airbyte Cloud. 1836 1837 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1838 (case insensitive). 1839 1840 If the source does not meet this requirement, the deletion will be rejected with a 1841 helpful error message. Instruct the user to rename the source appropriately to authorize 1842 the deletion. 1843 1844 The provided name must match the actual name of the source for the operation to proceed. 1845 This is a safety measure to ensure you are deleting the correct resource. 1846 """ 1847 check_guid_created_in_session(source_id) 1848 workspace: CloudWorkspace = _get_cloud_workspace() 1849 source = workspace.get_source(source_id=source_id) 1850 actual_name: str = cast(str, source.name) 1851 1852 # Verify the name matches 1853 if actual_name != name: 1854 raise PyAirbyteInputError( 1855 message=( 1856 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1857 "The provided name must exactly match the source's actual name. " 1858 "This is a safety measure to prevent accidental deletion." 1859 ), 1860 context={ 1861 "source_id": source_id, 1862 "expected_name": name, 1863 "actual_name": actual_name, 1864 }, 1865 ) 1866 1867 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1868 workspace.permanently_delete_source( 1869 source=source_id, 1870 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1871 ) 1872 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 1873 1874 1875@mcp_tool( 1876 domain="cloud", 1877 destructive=True, 1878 open_world=True, 1879 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1880) 1881def permanently_delete_cloud_destination( 1882 destination_id: Annotated[ 1883 str, 1884 Field(description="The ID of the deployed destination to delete."), 1885 ], 1886 name: Annotated[ 1887 str, 1888 Field(description="The expected name of the destination (for verification)."), 1889 ], 1890) -> str: 1891 """Permanently delete a deployed destination connector from Airbyte Cloud. 1892 1893 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1894 (case insensitive). 1895 1896 If the destination does not meet this requirement, the deletion will be rejected with a 1897 helpful error message. Instruct the user to rename the destination appropriately to authorize 1898 the deletion. 1899 1900 The provided name must match the actual name of the destination for the operation to proceed. 1901 This is a safety measure to ensure you are deleting the correct resource. 1902 """ 1903 check_guid_created_in_session(destination_id) 1904 workspace: CloudWorkspace = _get_cloud_workspace() 1905 destination = workspace.get_destination(destination_id=destination_id) 1906 actual_name: str = cast(str, destination.name) 1907 1908 # Verify the name matches 1909 if actual_name != name: 1910 raise PyAirbyteInputError( 1911 message=( 1912 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1913 "The provided name must exactly match the destination's actual name. " 1914 "This is a safety measure to prevent accidental deletion." 1915 ), 1916 context={ 1917 "destination_id": destination_id, 1918 "expected_name": name, 1919 "actual_name": actual_name, 1920 }, 1921 ) 1922 1923 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1924 workspace.permanently_delete_destination( 1925 destination=destination_id, 1926 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1927 ) 1928 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 1929 1930 1931@mcp_tool( 1932 domain="cloud", 1933 destructive=True, 1934 open_world=True, 1935 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1936) 1937def permanently_delete_cloud_connection( 1938 connection_id: Annotated[ 1939 str, 1940 Field(description="The ID of the connection to delete."), 1941 ], 1942 name: Annotated[ 1943 str, 1944 Field(description="The expected name of the connection (for verification)."), 1945 ], 1946 *, 1947 cascade_delete_source: Annotated[ 1948 bool, 1949 Field( 1950 description=( 1951 "Whether to also delete the source connector associated with this connection." 1952 ), 1953 default=False, 1954 ), 1955 ] = False, 1956 cascade_delete_destination: Annotated[ 1957 bool, 1958 Field( 1959 description=( 1960 "Whether to also delete the destination connector associated with this connection." 1961 ), 1962 default=False, 1963 ), 1964 ] = False, 1965) -> str: 1966 """Permanently delete a connection from Airbyte Cloud. 1967 1968 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 1969 (case insensitive). 1970 1971 If the connection does not meet this requirement, the deletion will be rejected with a 1972 helpful error message. Instruct the user to rename the connection appropriately to authorize 1973 the deletion. 1974 1975 The provided name must match the actual name of the connection for the operation to proceed. 1976 This is a safety measure to ensure you are deleting the correct resource. 1977 """ 1978 check_guid_created_in_session(connection_id) 1979 workspace: CloudWorkspace = _get_cloud_workspace() 1980 connection = workspace.get_connection(connection_id=connection_id) 1981 actual_name: str = cast(str, connection.name) 1982 1983 # Verify the name matches 1984 if actual_name != name: 1985 raise PyAirbyteInputError( 1986 message=( 1987 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1988 "The provided name must exactly match the connection's actual name. " 1989 "This is a safety measure to prevent accidental deletion." 1990 ), 1991 context={ 1992 "connection_id": connection_id, 1993 "expected_name": name, 1994 "actual_name": actual_name, 1995 }, 1996 ) 1997 1998 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1999 workspace.permanently_delete_connection( 2000 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2001 connection=connection_id, 2002 cascade_delete_source=cascade_delete_source, 2003 cascade_delete_destination=cascade_delete_destination, 2004 ) 2005 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 2006 2007 2008@mcp_tool( 2009 domain="cloud", 2010 open_world=True, 2011 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2012) 2013def rename_cloud_source( 2014 source_id: Annotated[ 2015 str, 2016 Field(description="The ID of the deployed source to rename."), 2017 ], 2018 name: Annotated[ 2019 str, 2020 Field(description="New name for the source."), 2021 ], 2022 *, 2023 workspace_id: Annotated[ 2024 str | None, 2025 Field( 2026 description=WORKSPACE_ID_TIP_TEXT, 2027 default=None, 2028 ), 2029 ], 2030) -> str: 2031 """Rename a deployed source connector on Airbyte Cloud.""" 2032 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2033 source = workspace.get_source(source_id=source_id) 2034 source.rename(name=name) 2035 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 2036 2037 2038@mcp_tool( 2039 domain="cloud", 2040 destructive=True, 2041 open_world=True, 2042 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2043) 2044def update_cloud_source_config( 2045 source_id: Annotated[ 2046 str, 2047 Field(description="The ID of the deployed source to update."), 2048 ], 2049 config: Annotated[ 2050 dict | str, 2051 Field( 2052 description="New configuration for the source connector.", 2053 ), 2054 ], 2055 config_secret_name: Annotated[ 2056 str | None, 2057 Field( 2058 description="The name of the secret containing the configuration.", 2059 default=None, 2060 ), 2061 ] = None, 2062 *, 2063 workspace_id: Annotated[ 2064 str | None, 2065 Field( 2066 description=WORKSPACE_ID_TIP_TEXT, 2067 default=None, 2068 ), 2069 ], 2070) -> str: 2071 """Update a deployed source connector's configuration on Airbyte Cloud. 2072 2073 This is a destructive operation that can break existing connections if the 2074 configuration is changed incorrectly. Use with caution. 2075 """ 2076 check_guid_created_in_session(source_id) 2077 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2078 source = workspace.get_source(source_id=source_id) 2079 2080 config_dict = resolve_config( 2081 config=config, 2082 config_secret_name=config_secret_name, 2083 config_spec_jsonschema=None, # We don't have the spec here 2084 ) 2085 2086 source.update_config(config=config_dict) 2087 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 2088 2089 2090@mcp_tool( 2091 domain="cloud", 2092 open_world=True, 2093 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2094) 2095def rename_cloud_destination( 2096 destination_id: Annotated[ 2097 str, 2098 Field(description="The ID of the deployed destination to rename."), 2099 ], 2100 name: Annotated[ 2101 str, 2102 Field(description="New name for the destination."), 2103 ], 2104 *, 2105 workspace_id: Annotated[ 2106 str | None, 2107 Field( 2108 description=WORKSPACE_ID_TIP_TEXT, 2109 default=None, 2110 ), 2111 ], 2112) -> str: 2113 """Rename a deployed destination connector on Airbyte Cloud.""" 2114 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2115 destination = workspace.get_destination(destination_id=destination_id) 2116 destination.rename(name=name) 2117 return ( 2118 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2119 f"URL: {destination.connector_url}" 2120 ) 2121 2122 2123@mcp_tool( 2124 domain="cloud", 2125 destructive=True, 2126 open_world=True, 2127 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2128) 2129def update_cloud_destination_config( 2130 destination_id: Annotated[ 2131 str, 2132 Field(description="The ID of the deployed destination to update."), 2133 ], 2134 config: Annotated[ 2135 dict | str, 2136 Field( 2137 description="New configuration for the destination connector.", 2138 ), 2139 ], 2140 config_secret_name: Annotated[ 2141 str | None, 2142 Field( 2143 description="The name of the secret containing the configuration.", 2144 default=None, 2145 ), 2146 ], 2147 *, 2148 workspace_id: Annotated[ 2149 str | None, 2150 Field( 2151 description=WORKSPACE_ID_TIP_TEXT, 2152 default=None, 2153 ), 2154 ], 2155) -> str: 2156 """Update a deployed destination connector's configuration on Airbyte Cloud. 2157 2158 This is a destructive operation that can break existing connections if the 2159 configuration is changed incorrectly. Use with caution. 2160 """ 2161 check_guid_created_in_session(destination_id) 2162 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2163 destination = workspace.get_destination(destination_id=destination_id) 2164 2165 config_dict = resolve_config( 2166 config=config, 2167 config_secret_name=config_secret_name, 2168 config_spec_jsonschema=None, # We don't have the spec here 2169 ) 2170 2171 destination.update_config(config=config_dict) 2172 return ( 2173 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2174 ) 2175 2176 2177@mcp_tool( 2178 domain="cloud", 2179 open_world=True, 2180 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2181) 2182def rename_cloud_connection( 2183 connection_id: Annotated[ 2184 str, 2185 Field(description="The ID of the connection to rename."), 2186 ], 2187 name: Annotated[ 2188 str, 2189 Field(description="New name for the connection."), 2190 ], 2191 *, 2192 workspace_id: Annotated[ 2193 str | None, 2194 Field( 2195 description=WORKSPACE_ID_TIP_TEXT, 2196 default=None, 2197 ), 2198 ], 2199) -> str: 2200 """Rename a connection on Airbyte Cloud.""" 2201 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2202 connection = workspace.get_connection(connection_id=connection_id) 2203 connection.rename(name=name) 2204 return ( 2205 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2206 f"URL: {connection.connection_url}" 2207 ) 2208 2209 2210@mcp_tool( 2211 domain="cloud", 2212 destructive=True, 2213 open_world=True, 2214 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2215) 2216def set_cloud_connection_table_prefix( 2217 connection_id: Annotated[ 2218 str, 2219 Field(description="The ID of the connection to update."), 2220 ], 2221 prefix: Annotated[ 2222 str, 2223 Field(description="New table prefix to use when syncing to the destination."), 2224 ], 2225 *, 2226 workspace_id: Annotated[ 2227 str | None, 2228 Field( 2229 description=WORKSPACE_ID_TIP_TEXT, 2230 default=None, 2231 ), 2232 ], 2233) -> str: 2234 """Set the table prefix for a connection on Airbyte Cloud. 2235 2236 This is a destructive operation that can break downstream dependencies if the 2237 table prefix is changed incorrectly. Use with caution. 2238 """ 2239 check_guid_created_in_session(connection_id) 2240 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2241 connection = workspace.get_connection(connection_id=connection_id) 2242 connection.set_table_prefix(prefix=prefix) 2243 return ( 2244 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2245 f"URL: {connection.connection_url}" 2246 ) 2247 2248 2249@mcp_tool( 2250 domain="cloud", 2251 destructive=True, 2252 open_world=True, 2253 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2254) 2255def set_cloud_connection_selected_streams( 2256 connection_id: Annotated[ 2257 str, 2258 Field(description="The ID of the connection to update."), 2259 ], 2260 stream_names: Annotated[ 2261 str | list[str], 2262 Field( 2263 description=( 2264 "The selected stream names to sync within the connection. " 2265 "Must be an explicit stream name or list of streams." 2266 ) 2267 ), 2268 ], 2269 *, 2270 workspace_id: Annotated[ 2271 str | None, 2272 Field( 2273 description=WORKSPACE_ID_TIP_TEXT, 2274 default=None, 2275 ), 2276 ], 2277) -> str: 2278 """Set the selected streams for a connection on Airbyte Cloud. 2279 2280 This is a destructive operation that can break existing connections if the 2281 stream selection is changed incorrectly. Use with caution. 2282 """ 2283 check_guid_created_in_session(connection_id) 2284 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2285 connection = workspace.get_connection(connection_id=connection_id) 2286 2287 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2288 connection.set_selected_streams(stream_names=resolved_streams_list) 2289 2290 return ( 2291 f"Successfully set selected streams for connection '{connection_id}' " 2292 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2293 ) 2294 2295 2296@mcp_tool( 2297 domain="cloud", 2298 read_only=True, 2299 idempotent=True, 2300 open_world=True, 2301 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2302) 2303def get_connection_artifact( 2304 connection_id: Annotated[ 2305 str, 2306 Field(description="The ID of the Airbyte Cloud connection."), 2307 ], 2308 artifact_type: Annotated[ 2309 Literal["state", "catalog"], 2310 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2311 ], 2312 *, 2313 workspace_id: Annotated[ 2314 str | None, 2315 Field( 2316 description=WORKSPACE_ID_TIP_TEXT, 2317 default=None, 2318 ), 2319 ], 2320) -> dict[str, Any] | list[dict[str, Any]]: 2321 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2322 2323 Retrieves the specified artifact for a connection: 2324 - 'state': Returns the persisted state for incremental syncs as a list of 2325 stream state objects, or {"ERROR": "..."} if no state is set. 2326 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2327 or {"ERROR": "..."} if not found. 2328 """ 2329 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2330 connection = workspace.get_connection(connection_id=connection_id) 2331 2332 if artifact_type == "state": 2333 result = connection.get_state_artifacts() 2334 if result is None: 2335 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2336 return result 2337 2338 # artifact_type == "catalog" 2339 result = connection.get_catalog_artifact() 2340 if result is None: 2341 return {"ERROR": "No catalog found for this connection"} 2342 return result 2343 2344 2345def register_cloud_ops_tools(app: FastMCP) -> None: 2346 """@private Register tools with the FastMCP app. 2347 2348 This is an internal function and should not be called directly. 2349 2350 Tools are filtered based on mode settings: 2351 - AIRBYTE_CLOUD_MCP_READONLY_MODE=1: Only read-only tools are registered 2352 - AIRBYTE_CLOUD_MCP_SAFE_MODE=1: All tools are registered, but destructive 2353 operations are protected by runtime session checks 2354 """ 2355 register_tools(app, domain="cloud")
41class CloudSourceResult(BaseModel): 42 """Information about a deployed source connector in Airbyte Cloud.""" 43 44 id: str 45 """The source ID.""" 46 name: str 47 """Display name of the source.""" 48 url: str 49 """Web URL for managing this source in Airbyte Cloud."""
Information about a deployed source connector in Airbyte Cloud.
52class CloudDestinationResult(BaseModel): 53 """Information about a deployed destination connector in Airbyte Cloud.""" 54 55 id: str 56 """The destination ID.""" 57 name: str 58 """Display name of the destination.""" 59 url: str 60 """Web URL for managing this destination in Airbyte Cloud."""
Information about a deployed destination connector in Airbyte Cloud.
63class CloudConnectionResult(BaseModel): 64 """Information about a deployed connection in Airbyte Cloud.""" 65 66 id: str 67 """The connection ID.""" 68 name: str 69 """Display name of the connection.""" 70 url: str 71 """Web URL for managing this connection in Airbyte Cloud.""" 72 source_id: str 73 """ID of the source used by this connection.""" 74 destination_id: str 75 """ID of the destination used by this connection.""" 76 last_job_status: str | None = None 77 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 78 Only populated when with_connection_status=True.""" 79 last_job_id: int | None = None 80 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 81 last_job_time: str | None = None 82 """ISO 8601 timestamp of the most recent completed sync. 83 Only populated when with_connection_status=True.""" 84 currently_running_job_id: int | None = None 85 """Job ID of a currently running sync, if any. 86 Only populated when with_connection_status=True.""" 87 currently_running_job_start_time: str | None = None 88 """ISO 8601 timestamp of when the currently running sync started. 89 Only populated when with_connection_status=True."""
Information about a deployed connection in Airbyte Cloud.
Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.
Job ID of the most recent completed sync. Only populated when with_connection_status=True.
ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.
92class CloudSourceDetails(BaseModel): 93 """Detailed information about a deployed source connector in Airbyte Cloud.""" 94 95 source_id: str 96 """The source ID.""" 97 source_name: str 98 """Display name of the source.""" 99 source_url: str 100 """Web URL for managing this source in Airbyte Cloud.""" 101 connector_definition_id: str 102 """The connector definition ID (e.g., the ID for 'source-postgres')."""
Detailed information about a deployed source connector in Airbyte Cloud.
105class CloudDestinationDetails(BaseModel): 106 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 107 108 destination_id: str 109 """The destination ID.""" 110 destination_name: str 111 """Display name of the destination.""" 112 destination_url: str 113 """Web URL for managing this destination in Airbyte Cloud.""" 114 connector_definition_id: str 115 """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
Detailed information about a deployed destination connector in Airbyte Cloud.
118class CloudConnectionDetails(BaseModel): 119 """Detailed information about a deployed connection in Airbyte Cloud.""" 120 121 connection_id: str 122 """The connection ID.""" 123 connection_name: str 124 """Display name of the connection.""" 125 connection_url: str 126 """Web URL for managing this connection in Airbyte Cloud.""" 127 source_id: str 128 """ID of the source used by this connection.""" 129 source_name: str 130 """Display name of the source.""" 131 destination_id: str 132 """ID of the destination used by this connection.""" 133 destination_name: str 134 """Display name of the destination.""" 135 selected_streams: list[str] 136 """List of stream names selected for syncing.""" 137 table_prefix: str | None 138 """Table prefix applied when syncing to the destination."""
Detailed information about a deployed connection in Airbyte Cloud.
141class CloudOrganizationResult(BaseModel): 142 """Information about an organization in Airbyte Cloud.""" 143 144 id: str 145 """The organization ID.""" 146 name: str 147 """Display name of the organization.""" 148 email: str 149 """Email associated with the organization."""
Information about an organization in Airbyte Cloud.
152class CloudWorkspaceResult(BaseModel): 153 """Information about a workspace in Airbyte Cloud.""" 154 155 workspace_id: str 156 """The workspace ID.""" 157 workspace_name: str 158 """Display name of the workspace.""" 159 workspace_url: str | None = None 160 """URL to access the workspace in Airbyte Cloud.""" 161 organization_id: str 162 """ID of the organization (requires ORGANIZATION_READER permission).""" 163 organization_name: str | None = None 164 """Name of the organization (requires ORGANIZATION_READER permission)."""
Information about a workspace in Airbyte Cloud.
167class LogReadResult(BaseModel): 168 """Result of reading sync logs with pagination support.""" 169 170 job_id: int 171 """The job ID the logs belong to.""" 172 attempt_number: int 173 """The attempt number the logs belong to.""" 174 log_text: str 175 """The string containing the log text we are returning.""" 176 log_text_start_line: int 177 """1-based line index of the first line returned.""" 178 log_text_line_count: int 179 """Count of lines we are returning.""" 180 total_log_lines_available: int 181 """Total number of log lines available, shows if any lines were missed due to the limit."""
Result of reading sync logs with pagination support.
184class SyncJobResult(BaseModel): 185 """Information about a sync job.""" 186 187 job_id: int 188 """The job ID.""" 189 status: str 190 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 191 bytes_synced: int 192 """Number of bytes synced in this job.""" 193 records_synced: int 194 """Number of records synced in this job.""" 195 start_time: str 196 """ISO 8601 timestamp of when the job started.""" 197 job_url: str 198 """URL to view the job in Airbyte Cloud."""
Information about a sync job.
201class SyncJobListResult(BaseModel): 202 """Result of listing sync jobs with pagination support.""" 203 204 jobs: list[SyncJobResult] 205 """List of sync jobs.""" 206 jobs_count: int 207 """Number of jobs returned in this response.""" 208 jobs_offset: int 209 """Offset used for this request (0 if not specified).""" 210 from_tail: bool 211 """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
Result of listing sync jobs with pagination support.
237@mcp_tool( 238 domain="cloud", 239 open_world=True, 240 extra_help_text=CLOUD_AUTH_TIP_TEXT, 241) 242def deploy_source_to_cloud( 243 source_name: Annotated[ 244 str, 245 Field(description="The name to use when deploying the source."), 246 ], 247 source_connector_name: Annotated[ 248 str, 249 Field(description="The name of the source connector (e.g., 'source-faker')."), 250 ], 251 *, 252 workspace_id: Annotated[ 253 str | None, 254 Field( 255 description=WORKSPACE_ID_TIP_TEXT, 256 default=None, 257 ), 258 ], 259 config: Annotated[ 260 dict | str | None, 261 Field( 262 description="The configuration for the source connector.", 263 default=None, 264 ), 265 ], 266 config_secret_name: Annotated[ 267 str | None, 268 Field( 269 description="The name of the secret containing the configuration.", 270 default=None, 271 ), 272 ], 273 unique: Annotated[ 274 bool, 275 Field( 276 description="Whether to require a unique name.", 277 default=True, 278 ), 279 ], 280) -> str: 281 """Deploy a source connector to Airbyte Cloud.""" 282 source = get_source( 283 source_connector_name, 284 no_executor=True, 285 ) 286 config_dict = resolve_config( 287 config=config, 288 config_secret_name=config_secret_name, 289 config_spec_jsonschema=source.config_spec, 290 ) 291 source.set_config(config_dict, validate=True) 292 293 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 294 deployed_source = workspace.deploy_source( 295 name=source_name, 296 source=source, 297 unique=unique, 298 ) 299 300 register_guid_created_in_session(deployed_source.connector_id) 301 return ( 302 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 303 f" and URL: {deployed_source.connector_url}" 304 )
Deploy a source connector to Airbyte Cloud.
307@mcp_tool( 308 domain="cloud", 309 open_world=True, 310 extra_help_text=CLOUD_AUTH_TIP_TEXT, 311) 312def deploy_destination_to_cloud( 313 destination_name: Annotated[ 314 str, 315 Field(description="The name to use when deploying the destination."), 316 ], 317 destination_connector_name: Annotated[ 318 str, 319 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 320 ], 321 *, 322 workspace_id: Annotated[ 323 str | None, 324 Field( 325 description=WORKSPACE_ID_TIP_TEXT, 326 default=None, 327 ), 328 ], 329 config: Annotated[ 330 dict | str | None, 331 Field( 332 description="The configuration for the destination connector.", 333 default=None, 334 ), 335 ], 336 config_secret_name: Annotated[ 337 str | None, 338 Field( 339 description="The name of the secret containing the configuration.", 340 default=None, 341 ), 342 ], 343 unique: Annotated[ 344 bool, 345 Field( 346 description="Whether to require a unique name.", 347 default=True, 348 ), 349 ], 350) -> str: 351 """Deploy a destination connector to Airbyte Cloud.""" 352 destination = get_destination( 353 destination_connector_name, 354 no_executor=True, 355 ) 356 config_dict = resolve_config( 357 config=config, 358 config_secret_name=config_secret_name, 359 config_spec_jsonschema=destination.config_spec, 360 ) 361 destination.set_config(config_dict, validate=True) 362 363 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 364 deployed_destination = workspace.deploy_destination( 365 name=destination_name, 366 destination=destination, 367 unique=unique, 368 ) 369 370 register_guid_created_in_session(deployed_destination.connector_id) 371 return ( 372 f"Successfully deployed destination '{destination_name}' " 373 f"with ID: {deployed_destination.connector_id}" 374 )
Deploy a destination connector to Airbyte Cloud.
377@mcp_tool( 378 domain="cloud", 379 open_world=True, 380 extra_help_text=CLOUD_AUTH_TIP_TEXT, 381) 382def create_connection_on_cloud( 383 connection_name: Annotated[ 384 str, 385 Field(description="The name of the connection."), 386 ], 387 source_id: Annotated[ 388 str, 389 Field(description="The ID of the deployed source."), 390 ], 391 destination_id: Annotated[ 392 str, 393 Field(description="The ID of the deployed destination."), 394 ], 395 selected_streams: Annotated[ 396 str | list[str], 397 Field( 398 description=( 399 "The selected stream names to sync within the connection. " 400 "Must be an explicit stream name or list of streams. " 401 "Cannot be empty or '*'." 402 ) 403 ), 404 ], 405 *, 406 workspace_id: Annotated[ 407 str | None, 408 Field( 409 description=WORKSPACE_ID_TIP_TEXT, 410 default=None, 411 ), 412 ], 413 table_prefix: Annotated[ 414 str | None, 415 Field( 416 description="Optional table prefix to use when syncing to the destination.", 417 default=None, 418 ), 419 ], 420) -> str: 421 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 422 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 423 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 424 deployed_connection = workspace.deploy_connection( 425 connection_name=connection_name, 426 source=source_id, 427 destination=destination_id, 428 selected_streams=resolved_streams_list, 429 table_prefix=table_prefix, 430 ) 431 432 register_guid_created_in_session(deployed_connection.connection_id) 433 return ( 434 f"Successfully created connection '{connection_name}' " 435 f"with ID '{deployed_connection.connection_id}' and " 436 f"URL: {deployed_connection.connection_url}" 437 )
Create a connection between a deployed source and destination on Airbyte Cloud.
440@mcp_tool( 441 domain="cloud", 442 open_world=True, 443 extra_help_text=CLOUD_AUTH_TIP_TEXT, 444) 445def run_cloud_sync( 446 connection_id: Annotated[ 447 str, 448 Field(description="The ID of the Airbyte Cloud connection."), 449 ], 450 *, 451 workspace_id: Annotated[ 452 str | None, 453 Field( 454 description=WORKSPACE_ID_TIP_TEXT, 455 default=None, 456 ), 457 ], 458 wait: Annotated[ 459 bool, 460 Field( 461 description=( 462 "Whether to wait for the sync to complete. Since a sync can take between several " 463 "minutes and several hours, this option is not recommended for most " 464 "scenarios." 465 ), 466 default=False, 467 ), 468 ], 469 wait_timeout: Annotated[ 470 int, 471 Field( 472 description="Maximum time to wait for sync completion (seconds).", 473 default=300, 474 ), 475 ], 476) -> str: 477 """Run a sync job on Airbyte Cloud.""" 478 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 479 connection = workspace.get_connection(connection_id=connection_id) 480 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 481 482 if wait: 483 status = sync_result.get_job_status() 484 return ( 485 f"Sync completed with status: {status}. " 486 f"Job ID is '{sync_result.job_id}' and " 487 f"job URL is: {sync_result.job_url}" 488 ) 489 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
Run a sync job on Airbyte Cloud.
492@mcp_tool( 493 domain="cloud", 494 read_only=True, 495 idempotent=True, 496 open_world=True, 497 extra_help_text=CLOUD_AUTH_TIP_TEXT, 498) 499def check_airbyte_cloud_workspace( 500 *, 501 workspace_id: Annotated[ 502 str | None, 503 Field( 504 description=WORKSPACE_ID_TIP_TEXT, 505 default=None, 506 ), 507 ], 508) -> CloudWorkspaceResult: 509 """Check if we have a valid Airbyte Cloud connection and return workspace info. 510 511 Returns workspace details including workspace ID, name, and organization info. 512 """ 513 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 514 515 # Get workspace details from the public API using workspace's credentials 516 workspace_response = api_util.get_workspace( 517 workspace_id=workspace.workspace_id, 518 api_root=workspace.api_root, 519 client_id=workspace.client_id, 520 client_secret=workspace.client_secret, 521 bearer_token=workspace.bearer_token, 522 ) 523 524 # Try to get organization info, but fail gracefully if we don't have permissions. 525 # Fetching organization info requires ORGANIZATION_READER permissions on the organization, 526 # which may not be available with workspace-scoped credentials. 527 organization = workspace.get_organization(raise_on_error=False) 528 529 return CloudWorkspaceResult( 530 workspace_id=workspace_response.workspace_id, 531 workspace_name=workspace_response.name, 532 workspace_url=workspace.workspace_url, 533 organization_id=( 534 organization.organization_id 535 if organization 536 else "[unavailable - requires ORGANIZATION_READER permission]" 537 ), 538 organization_name=organization.organization_name if organization else None, 539 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace details including workspace ID, name, and organization info.
542@mcp_tool( 543 domain="cloud", 544 open_world=True, 545 extra_help_text=CLOUD_AUTH_TIP_TEXT, 546) 547def deploy_noop_destination_to_cloud( 548 name: str = "No-op Destination", 549 *, 550 workspace_id: Annotated[ 551 str | None, 552 Field( 553 description=WORKSPACE_ID_TIP_TEXT, 554 default=None, 555 ), 556 ], 557 unique: bool = True, 558) -> str: 559 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 560 destination = get_noop_destination() 561 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 562 deployed_destination = workspace.deploy_destination( 563 name=name, 564 destination=destination, 565 unique=unique, 566 ) 567 register_guid_created_in_session(deployed_destination.connector_id) 568 return ( 569 f"Successfully deployed No-op Destination " 570 f"with ID '{deployed_destination.connector_id}' and " 571 f"URL: {deployed_destination.connector_url}" 572 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
575@mcp_tool( 576 domain="cloud", 577 read_only=True, 578 idempotent=True, 579 open_world=True, 580 extra_help_text=CLOUD_AUTH_TIP_TEXT, 581) 582def get_cloud_sync_status( 583 connection_id: Annotated[ 584 str, 585 Field( 586 description="The ID of the Airbyte Cloud connection.", 587 ), 588 ], 589 job_id: Annotated[ 590 int | None, 591 Field( 592 description="Optional job ID. If not provided, the latest job will be used.", 593 default=None, 594 ), 595 ], 596 *, 597 workspace_id: Annotated[ 598 str | None, 599 Field( 600 description=WORKSPACE_ID_TIP_TEXT, 601 default=None, 602 ), 603 ], 604 include_attempts: Annotated[ 605 bool, 606 Field( 607 description="Whether to include detailed attempts information.", 608 default=False, 609 ), 610 ], 611) -> dict[str, Any]: 612 """Get the status of a sync job from the Airbyte Cloud.""" 613 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 614 connection = workspace.get_connection(connection_id=connection_id) 615 616 # If a job ID is provided, get the job by ID. 617 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 618 619 if not sync_result: 620 return {"status": None, "job_id": None, "attempts": []} 621 622 result = { 623 "status": sync_result.get_job_status(), 624 "job_id": sync_result.job_id, 625 "bytes_synced": sync_result.bytes_synced, 626 "records_synced": sync_result.records_synced, 627 "start_time": sync_result.start_time.isoformat(), 628 "job_url": sync_result.job_url, 629 "attempts": [], 630 } 631 632 if include_attempts: 633 attempts = sync_result.get_attempts() 634 result["attempts"] = [ 635 { 636 "attempt_number": attempt.attempt_number, 637 "attempt_id": attempt.attempt_id, 638 "status": attempt.status, 639 "bytes_synced": attempt.bytes_synced, 640 "records_synced": attempt.records_synced, 641 "created_at": attempt.created_at.isoformat(), 642 } 643 for attempt in attempts 644 ] 645 646 return result
Get the status of a sync job from the Airbyte Cloud.
649@mcp_tool( 650 domain="cloud", 651 read_only=True, 652 idempotent=True, 653 open_world=True, 654 extra_help_text=CLOUD_AUTH_TIP_TEXT, 655) 656def list_cloud_sync_jobs( 657 connection_id: Annotated[ 658 str, 659 Field(description="The ID of the Airbyte Cloud connection."), 660 ], 661 *, 662 workspace_id: Annotated[ 663 str | None, 664 Field( 665 description=WORKSPACE_ID_TIP_TEXT, 666 default=None, 667 ), 668 ], 669 max_jobs: Annotated[ 670 int, 671 Field( 672 description=( 673 "Maximum number of jobs to return. " 674 "Defaults to 20 if not specified. " 675 "Maximum allowed value is 500." 676 ), 677 default=20, 678 ), 679 ], 680 from_tail: Annotated[ 681 bool | None, 682 Field( 683 description=( 684 "When True, jobs are ordered newest-first (createdAt DESC). " 685 "When False, jobs are ordered oldest-first (createdAt ASC). " 686 "Defaults to True if `jobs_offset` is not specified. " 687 "Cannot combine `from_tail=True` with `jobs_offset`." 688 ), 689 default=None, 690 ), 691 ], 692 jobs_offset: Annotated[ 693 int | None, 694 Field( 695 description=( 696 "Number of jobs to skip from the beginning. " 697 "Cannot be combined with `from_tail=True`." 698 ), 699 default=None, 700 ), 701 ], 702) -> SyncJobListResult: 703 """List sync jobs for a connection with pagination support. 704 705 This tool allows you to retrieve a list of sync jobs for a connection, 706 with control over ordering and pagination. By default, jobs are returned 707 newest-first (from_tail=True). 708 """ 709 # Validate that jobs_offset and from_tail are not both set 710 if jobs_offset is not None and from_tail is True: 711 raise PyAirbyteInputError( 712 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 713 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 714 ) 715 716 # Default to from_tail=True if neither is specified 717 if from_tail is None and jobs_offset is None: 718 from_tail = True 719 elif from_tail is None: 720 from_tail = False 721 722 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 723 connection = workspace.get_connection(connection_id=connection_id) 724 725 # Cap at 500 to avoid overloading agent context 726 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 727 728 sync_results = connection.get_previous_sync_logs( 729 limit=effective_limit, 730 offset=jobs_offset, 731 from_tail=from_tail, 732 ) 733 734 jobs = [ 735 SyncJobResult( 736 job_id=sync_result.job_id, 737 status=str(sync_result.get_job_status()), 738 bytes_synced=sync_result.bytes_synced, 739 records_synced=sync_result.records_synced, 740 start_time=sync_result.start_time.isoformat(), 741 job_url=sync_result.job_url, 742 ) 743 for sync_result in sync_results 744 ] 745 746 return SyncJobListResult( 747 jobs=jobs, 748 jobs_count=len(jobs), 749 jobs_offset=jobs_offset or 0, 750 from_tail=from_tail, 751 )
List sync jobs for a connection with pagination support.
This tool allows you to retrieve a list of sync jobs for a connection, with control over ordering and pagination. By default, jobs are returned newest-first (from_tail=True).
754@mcp_tool( 755 domain="cloud", 756 read_only=True, 757 idempotent=True, 758 open_world=True, 759 extra_help_text=CLOUD_AUTH_TIP_TEXT, 760) 761def list_deployed_cloud_source_connectors( 762 *, 763 workspace_id: Annotated[ 764 str | None, 765 Field( 766 description=WORKSPACE_ID_TIP_TEXT, 767 default=None, 768 ), 769 ], 770 name_contains: Annotated[ 771 str | None, 772 Field( 773 description="Optional case-insensitive substring to filter sources by name", 774 default=None, 775 ), 776 ], 777 max_items_limit: Annotated[ 778 int | None, 779 Field( 780 description="Optional maximum number of items to return (default: no limit)", 781 default=None, 782 ), 783 ], 784) -> list[CloudSourceResult]: 785 """List all deployed source connectors in the Airbyte Cloud workspace.""" 786 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 787 sources = workspace.list_sources() 788 789 # Filter by name if requested 790 if name_contains: 791 needle = name_contains.lower() 792 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 793 794 # Apply limit if requested 795 if max_items_limit is not None: 796 sources = sources[:max_items_limit] 797 798 # Note: name and url are guaranteed non-null from list API responses 799 return [ 800 CloudSourceResult( 801 id=source.source_id, 802 name=cast(str, source.name), 803 url=cast(str, source.connector_url), 804 ) 805 for source in sources 806 ]
List all deployed source connectors in the Airbyte Cloud workspace.
809@mcp_tool( 810 domain="cloud", 811 read_only=True, 812 idempotent=True, 813 open_world=True, 814 extra_help_text=CLOUD_AUTH_TIP_TEXT, 815) 816def list_deployed_cloud_destination_connectors( 817 *, 818 workspace_id: Annotated[ 819 str | None, 820 Field( 821 description=WORKSPACE_ID_TIP_TEXT, 822 default=None, 823 ), 824 ], 825 name_contains: Annotated[ 826 str | None, 827 Field( 828 description="Optional case-insensitive substring to filter destinations by name", 829 default=None, 830 ), 831 ], 832 max_items_limit: Annotated[ 833 int | None, 834 Field( 835 description="Optional maximum number of items to return (default: no limit)", 836 default=None, 837 ), 838 ], 839) -> list[CloudDestinationResult]: 840 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 841 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 842 destinations = workspace.list_destinations() 843 844 # Filter by name if requested 845 if name_contains: 846 needle = name_contains.lower() 847 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 848 849 # Apply limit if requested 850 if max_items_limit is not None: 851 destinations = destinations[:max_items_limit] 852 853 # Note: name and url are guaranteed non-null from list API responses 854 return [ 855 CloudDestinationResult( 856 id=destination.destination_id, 857 name=cast(str, destination.name), 858 url=cast(str, destination.connector_url), 859 ) 860 for destination in destinations 861 ]
List all deployed destination connectors in the Airbyte Cloud workspace.
864@mcp_tool( 865 domain="cloud", 866 read_only=True, 867 idempotent=True, 868 open_world=True, 869 extra_help_text=CLOUD_AUTH_TIP_TEXT, 870) 871def describe_cloud_source( 872 source_id: Annotated[ 873 str, 874 Field(description="The ID of the source to describe."), 875 ], 876 *, 877 workspace_id: Annotated[ 878 str | None, 879 Field( 880 description=WORKSPACE_ID_TIP_TEXT, 881 default=None, 882 ), 883 ], 884) -> CloudSourceDetails: 885 """Get detailed information about a specific deployed source connector.""" 886 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 887 source = workspace.get_source(source_id=source_id) 888 889 # Access name property to ensure _connector_info is populated 890 source_name = cast(str, source.name) 891 892 return CloudSourceDetails( 893 source_id=source.source_id, 894 source_name=source_name, 895 source_url=source.connector_url, 896 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 897 )
Get detailed information about a specific deployed source connector.
900@mcp_tool( 901 domain="cloud", 902 read_only=True, 903 idempotent=True, 904 open_world=True, 905 extra_help_text=CLOUD_AUTH_TIP_TEXT, 906) 907def describe_cloud_destination( 908 destination_id: Annotated[ 909 str, 910 Field(description="The ID of the destination to describe."), 911 ], 912 *, 913 workspace_id: Annotated[ 914 str | None, 915 Field( 916 description=WORKSPACE_ID_TIP_TEXT, 917 default=None, 918 ), 919 ], 920) -> CloudDestinationDetails: 921 """Get detailed information about a specific deployed destination connector.""" 922 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 923 destination = workspace.get_destination(destination_id=destination_id) 924 925 # Access name property to ensure _connector_info is populated 926 destination_name = cast(str, destination.name) 927 928 return CloudDestinationDetails( 929 destination_id=destination.destination_id, 930 destination_name=destination_name, 931 destination_url=destination.connector_url, 932 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 933 )
Get detailed information about a specific deployed destination connector.
936@mcp_tool( 937 domain="cloud", 938 read_only=True, 939 idempotent=True, 940 open_world=True, 941 extra_help_text=CLOUD_AUTH_TIP_TEXT, 942) 943def describe_cloud_connection( 944 connection_id: Annotated[ 945 str, 946 Field(description="The ID of the connection to describe."), 947 ], 948 *, 949 workspace_id: Annotated[ 950 str | None, 951 Field( 952 description=WORKSPACE_ID_TIP_TEXT, 953 default=None, 954 ), 955 ], 956) -> CloudConnectionDetails: 957 """Get detailed information about a specific deployed connection.""" 958 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 959 connection = workspace.get_connection(connection_id=connection_id) 960 961 return CloudConnectionDetails( 962 connection_id=connection.connection_id, 963 connection_name=cast(str, connection.name), 964 connection_url=cast(str, connection.connection_url), 965 source_id=connection.source_id, 966 source_name=cast(str, connection.source.name), 967 destination_id=connection.destination_id, 968 destination_name=cast(str, connection.destination.name), 969 selected_streams=connection.stream_names, 970 table_prefix=connection.table_prefix, 971 )
Get detailed information about a specific deployed connection.
974@mcp_tool( 975 domain="cloud", 976 read_only=True, 977 idempotent=True, 978 open_world=True, 979 extra_help_text=CLOUD_AUTH_TIP_TEXT, 980) 981def get_cloud_sync_logs( 982 connection_id: Annotated[ 983 str, 984 Field(description="The ID of the Airbyte Cloud connection."), 985 ], 986 job_id: Annotated[ 987 int | None, 988 Field(description="Optional job ID. If not provided, the latest job will be used."), 989 ] = None, 990 attempt_number: Annotated[ 991 int | None, 992 Field( 993 description="Optional attempt number. If not provided, the latest attempt will be used." 994 ), 995 ] = None, 996 *, 997 workspace_id: Annotated[ 998 str | None, 999 Field( 1000 description=WORKSPACE_ID_TIP_TEXT, 1001 default=None, 1002 ), 1003 ], 1004 max_lines: Annotated[ 1005 int, 1006 Field( 1007 description=( 1008 "Maximum number of lines to return. " 1009 "Defaults to 4000 if not specified. " 1010 "If '0' is provided, no limit is applied." 1011 ), 1012 default=4000, 1013 ), 1014 ], 1015 from_tail: Annotated[ 1016 bool | None, 1017 Field( 1018 description=( 1019 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1020 "Defaults to True if `line_offset` is not specified. " 1021 "Cannot combine `from_tail=True` with `line_offset`." 1022 ), 1023 default=None, 1024 ), 1025 ], 1026 line_offset: Annotated[ 1027 int | None, 1028 Field( 1029 description=( 1030 "Number of lines to skip from the beginning of the logs. " 1031 "Cannot be combined with `from_tail=True`." 1032 ), 1033 default=None, 1034 ), 1035 ], 1036) -> LogReadResult: 1037 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1038 # Validate that line_offset and from_tail are not both set 1039 if line_offset is not None and from_tail: 1040 raise PyAirbyteInputError( 1041 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1042 context={"line_offset": line_offset, "from_tail": from_tail}, 1043 ) 1044 1045 if from_tail is None and line_offset is None: 1046 from_tail = True 1047 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1048 connection = workspace.get_connection(connection_id=connection_id) 1049 1050 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1051 1052 if not sync_result: 1053 raise AirbyteMissingResourceError( 1054 resource_type="sync job", 1055 resource_name_or_id=connection_id, 1056 ) 1057 1058 attempts = sync_result.get_attempts() 1059 1060 if not attempts: 1061 raise AirbyteMissingResourceError( 1062 resource_type="sync attempt", 1063 resource_name_or_id=str(sync_result.job_id), 1064 ) 1065 1066 if attempt_number is not None: 1067 target_attempt = None 1068 for attempt in attempts: 1069 if attempt.attempt_number == attempt_number: 1070 target_attempt = attempt 1071 break 1072 1073 if target_attempt is None: 1074 raise AirbyteMissingResourceError( 1075 resource_type="sync attempt", 1076 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1077 ) 1078 else: 1079 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1080 1081 logs = target_attempt.get_full_log_text() 1082 1083 if not logs: 1084 # Return empty result with zero lines 1085 return LogReadResult( 1086 log_text=( 1087 f"[No logs available for job '{sync_result.job_id}', " 1088 f"attempt {target_attempt.attempt_number}.]" 1089 ), 1090 log_text_start_line=1, 1091 log_text_line_count=0, 1092 total_log_lines_available=0, 1093 job_id=sync_result.job_id, 1094 attempt_number=target_attempt.attempt_number, 1095 ) 1096 1097 # Apply line limiting 1098 log_lines = logs.splitlines() 1099 total_lines = len(log_lines) 1100 1101 # Determine effective max_lines (0 means no limit) 1102 effective_max = total_lines if max_lines == 0 else max_lines 1103 1104 # Calculate start_index and slice based on from_tail or line_offset 1105 if from_tail: 1106 start_index = max(0, total_lines - effective_max) 1107 selected_lines = log_lines[start_index:][:effective_max] 1108 else: 1109 start_index = line_offset or 0 1110 selected_lines = log_lines[start_index : start_index + effective_max] 1111 1112 return LogReadResult( 1113 log_text="\n".join(selected_lines), 1114 log_text_start_line=start_index + 1, # Convert to 1-based index 1115 log_text_line_count=len(selected_lines), 1116 total_log_lines_available=total_lines, 1117 job_id=sync_result.job_id, 1118 attempt_number=target_attempt.attempt_number, 1119 )
Get the logs from a sync job attempt on Airbyte Cloud.
1122@mcp_tool( 1123 domain="cloud", 1124 read_only=True, 1125 idempotent=True, 1126 open_world=True, 1127 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1128) 1129def list_deployed_cloud_connections( 1130 *, 1131 workspace_id: Annotated[ 1132 str | None, 1133 Field( 1134 description=WORKSPACE_ID_TIP_TEXT, 1135 default=None, 1136 ), 1137 ], 1138 name_contains: Annotated[ 1139 str | None, 1140 Field( 1141 description="Optional case-insensitive substring to filter connections by name", 1142 default=None, 1143 ), 1144 ], 1145 max_items_limit: Annotated[ 1146 int | None, 1147 Field( 1148 description="Optional maximum number of items to return (default: no limit)", 1149 default=None, 1150 ), 1151 ], 1152 with_connection_status: Annotated[ 1153 bool | None, 1154 Field( 1155 description="If True, include status info for each connection's most recent sync job", 1156 default=False, 1157 ), 1158 ], 1159 failing_connections_only: Annotated[ 1160 bool | None, 1161 Field( 1162 description="If True, only return connections with failed/cancelled last sync", 1163 default=False, 1164 ), 1165 ], 1166) -> list[CloudConnectionResult]: 1167 """List all deployed connections in the Airbyte Cloud workspace. 1168 1169 When with_connection_status is True, each connection result will include 1170 information about the most recent sync job status, skipping over any 1171 currently in-progress syncs to find the last completed job. 1172 1173 When failing_connections_only is True, only connections where the most 1174 recent completed sync job failed or was cancelled will be returned. 1175 This implicitly enables with_connection_status. 1176 """ 1177 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1178 connections = workspace.list_connections() 1179 1180 # Filter by name if requested 1181 if name_contains: 1182 needle = name_contains.lower() 1183 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1184 1185 # If failing_connections_only is True, implicitly enable with_connection_status 1186 if failing_connections_only: 1187 with_connection_status = True 1188 1189 results: list[CloudConnectionResult] = [] 1190 1191 for connection in connections: 1192 last_job_status: str | None = None 1193 last_job_id: int | None = None 1194 last_job_time: str | None = None 1195 currently_running_job_id: int | None = None 1196 currently_running_job_start_time: str | None = None 1197 1198 if with_connection_status: 1199 sync_logs = connection.get_previous_sync_logs(limit=5) 1200 last_completed_job_status = None # Keep enum for comparison 1201 1202 for sync_result in sync_logs: 1203 job_status = sync_result.get_job_status() 1204 1205 if not sync_result.is_job_complete(): 1206 currently_running_job_id = sync_result.job_id 1207 currently_running_job_start_time = sync_result.start_time.isoformat() 1208 continue 1209 1210 last_completed_job_status = job_status 1211 last_job_status = str(job_status.value) if job_status else None 1212 last_job_id = sync_result.job_id 1213 last_job_time = sync_result.start_time.isoformat() 1214 break 1215 1216 if failing_connections_only and ( 1217 last_completed_job_status is None 1218 or last_completed_job_status not in FAILED_STATUSES 1219 ): 1220 continue 1221 1222 results.append( 1223 CloudConnectionResult( 1224 id=connection.connection_id, 1225 name=cast(str, connection.name), 1226 url=cast(str, connection.connection_url), 1227 source_id=connection.source_id, 1228 destination_id=connection.destination_id, 1229 last_job_status=last_job_status, 1230 last_job_id=last_job_id, 1231 last_job_time=last_job_time, 1232 currently_running_job_id=currently_running_job_id, 1233 currently_running_job_start_time=currently_running_job_start_time, 1234 ) 1235 ) 1236 1237 if max_items_limit is not None and len(results) >= max_items_limit: 1238 break 1239 1240 return results
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include information about the most recent sync job status, skipping over any currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most recent completed sync job failed or was cancelled will be returned. This implicitly enables with_connection_status.
1347@mcp_tool( 1348 domain="cloud", 1349 read_only=True, 1350 idempotent=True, 1351 open_world=True, 1352 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1353) 1354def list_cloud_workspaces( 1355 *, 1356 organization_id: Annotated[ 1357 str | None, 1358 Field( 1359 description="Organization ID. Required if organization_name is not provided.", 1360 default=None, 1361 ), 1362 ], 1363 organization_name: Annotated[ 1364 str | None, 1365 Field( 1366 description=( 1367 "Organization name (exact match). " "Required if organization_id is not provided." 1368 ), 1369 default=None, 1370 ), 1371 ], 1372 name_contains: Annotated[ 1373 str | None, 1374 Field( 1375 description="Optional substring to filter workspaces by name (server-side filtering)", 1376 default=None, 1377 ), 1378 ], 1379 max_items_limit: Annotated[ 1380 int | None, 1381 Field( 1382 description="Optional maximum number of items to return (default: no limit)", 1383 default=None, 1384 ), 1385 ], 1386) -> list[CloudWorkspaceResult]: 1387 """List all workspaces in a specific organization. 1388 1389 Requires either organization_id OR organization_name (exact match) to be provided. 1390 This tool will NOT list workspaces across all organizations - you must specify 1391 which organization to list workspaces from. 1392 """ 1393 credentials = resolve_cloud_credentials() 1394 1395 resolved_org_id = _resolve_organization_id( 1396 organization_id=organization_id, 1397 organization_name=organization_name, 1398 api_root=credentials.api_root, 1399 client_id=credentials.client_id, 1400 client_secret=credentials.client_secret, 1401 bearer_token=credentials.bearer_token, 1402 ) 1403 1404 workspaces = api_util.list_workspaces_in_organization( 1405 organization_id=resolved_org_id, 1406 api_root=credentials.api_root, 1407 client_id=credentials.client_id, 1408 client_secret=credentials.client_secret, 1409 bearer_token=credentials.bearer_token, 1410 name_contains=name_contains, 1411 max_items_limit=max_items_limit, 1412 ) 1413 1414 return [ 1415 CloudWorkspaceResult( 1416 workspace_id=ws.get("workspaceId", ""), 1417 workspace_name=ws.get("name", ""), 1418 organization_id=ws.get("organizationId", ""), 1419 ) 1420 for ws in workspaces 1421 ]
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided. This tool will NOT list workspaces across all organizations - you must specify which organization to list workspaces from.
1424@mcp_tool( 1425 domain="cloud", 1426 read_only=True, 1427 idempotent=True, 1428 open_world=True, 1429 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1430) 1431def describe_cloud_organization( 1432 *, 1433 organization_id: Annotated[ 1434 str | None, 1435 Field( 1436 description="Organization ID. Required if organization_name is not provided.", 1437 default=None, 1438 ), 1439 ], 1440 organization_name: Annotated[ 1441 str | None, 1442 Field( 1443 description=( 1444 "Organization name (exact match). " "Required if organization_id is not provided." 1445 ), 1446 default=None, 1447 ), 1448 ], 1449) -> CloudOrganizationResult: 1450 """Get details about a specific organization. 1451 1452 Requires either organization_id OR organization_name (exact match) to be provided. 1453 This tool is useful for looking up an organization's ID from its name, or vice versa. 1454 """ 1455 credentials = resolve_cloud_credentials() 1456 1457 org = _resolve_organization( 1458 organization_id=organization_id, 1459 organization_name=organization_name, 1460 api_root=credentials.api_root, 1461 client_id=credentials.client_id, 1462 client_secret=credentials.client_secret, 1463 bearer_token=credentials.bearer_token, 1464 ) 1465 1466 return CloudOrganizationResult( 1467 id=org.organization_id, 1468 name=org.organization_name, 1469 email=org.email, 1470 )
Get details about a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided. This tool is useful for looking up an organization's ID from its name, or vice versa.
1487@mcp_tool( 1488 domain="cloud", 1489 open_world=True, 1490 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1491) 1492def publish_custom_source_definition( 1493 name: Annotated[ 1494 str, 1495 Field(description="The name for the custom connector definition."), 1496 ], 1497 *, 1498 workspace_id: Annotated[ 1499 str | None, 1500 Field( 1501 description=WORKSPACE_ID_TIP_TEXT, 1502 default=None, 1503 ), 1504 ], 1505 manifest_yaml: Annotated[ 1506 str | Path | None, 1507 Field( 1508 description=( 1509 "The Low-code CDK manifest as a YAML string or file path. " 1510 "Required for YAML connectors." 1511 ), 1512 default=None, 1513 ), 1514 ] = None, 1515 unique: Annotated[ 1516 bool, 1517 Field( 1518 description="Whether to require a unique name.", 1519 default=True, 1520 ), 1521 ] = True, 1522 pre_validate: Annotated[ 1523 bool, 1524 Field( 1525 description="Whether to validate the manifest client-side before publishing.", 1526 default=True, 1527 ), 1528 ] = True, 1529 testing_values: Annotated[ 1530 dict | str | None, 1531 Field( 1532 description=( 1533 "Optional testing configuration values for the Builder UI. " 1534 "Can be provided as a JSON object or JSON string. " 1535 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1536 "If provided, these values replace any existing testing values " 1537 "for the connector builder project, allowing immediate test read operations." 1538 ), 1539 default=None, 1540 ), 1541 ], 1542 testing_values_secret_name: Annotated[ 1543 str | None, 1544 Field( 1545 description=( 1546 "Optional name of a secret containing testing configuration values " 1547 "in JSON or YAML format. The secret will be resolved by the MCP " 1548 "server and merged into testing_values, with secret values taking " 1549 "precedence. This lets the agent reference secrets without sending " 1550 "raw values as tool arguments." 1551 ), 1552 default=None, 1553 ), 1554 ], 1555) -> str: 1556 """Publish a custom YAML source connector definition to Airbyte Cloud. 1557 1558 Note: Only YAML (declarative) connectors are currently supported. 1559 Docker-based custom sources are not yet available. 1560 """ 1561 processed_manifest = manifest_yaml 1562 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1563 processed_manifest = Path(manifest_yaml) 1564 1565 # Resolve testing values from inline config and/or secret 1566 testing_values_dict: dict[str, Any] | None = None 1567 if testing_values is not None or testing_values_secret_name is not None: 1568 testing_values_dict = ( 1569 resolve_config( 1570 config=testing_values, 1571 config_secret_name=testing_values_secret_name, 1572 ) 1573 or None 1574 ) 1575 1576 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1577 custom_source = workspace.publish_custom_source_definition( 1578 name=name, 1579 manifest_yaml=processed_manifest, 1580 unique=unique, 1581 pre_validate=pre_validate, 1582 testing_values=testing_values_dict, 1583 ) 1584 register_guid_created_in_session(custom_source.definition_id) 1585 return ( 1586 "Successfully published custom YAML source definition:\n" 1587 + _get_custom_source_definition_description( 1588 custom_source=custom_source, 1589 ) 1590 + "\n" 1591 )
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1594@mcp_tool( 1595 domain="cloud", 1596 read_only=True, 1597 idempotent=True, 1598 open_world=True, 1599) 1600def list_custom_source_definitions( 1601 *, 1602 workspace_id: Annotated[ 1603 str | None, 1604 Field( 1605 description=WORKSPACE_ID_TIP_TEXT, 1606 default=None, 1607 ), 1608 ], 1609) -> list[dict[str, Any]]: 1610 """List custom YAML source definitions in the Airbyte Cloud workspace. 1611 1612 Note: Only YAML (declarative) connectors are currently supported. 1613 Docker-based custom sources are not yet available. 1614 """ 1615 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1616 definitions = workspace.list_custom_source_definitions( 1617 definition_type="yaml", 1618 ) 1619 1620 return [ 1621 { 1622 "definition_id": d.definition_id, 1623 "name": d.name, 1624 "version": d.version, 1625 "connector_builder_project_url": d.connector_builder_project_url, 1626 } 1627 for d in definitions 1628 ]
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1631@mcp_tool( 1632 domain="cloud", 1633 destructive=True, 1634 open_world=True, 1635) 1636def update_custom_source_definition( 1637 definition_id: Annotated[ 1638 str, 1639 Field(description="The ID of the definition to update."), 1640 ], 1641 manifest_yaml: Annotated[ 1642 str | Path | None, 1643 Field( 1644 description=( 1645 "New manifest as YAML string or file path. " 1646 "Optional; omit to update only testing values." 1647 ), 1648 default=None, 1649 ), 1650 ] = None, 1651 *, 1652 workspace_id: Annotated[ 1653 str | None, 1654 Field( 1655 description=WORKSPACE_ID_TIP_TEXT, 1656 default=None, 1657 ), 1658 ], 1659 pre_validate: Annotated[ 1660 bool, 1661 Field( 1662 description="Whether to validate the manifest client-side before updating.", 1663 default=True, 1664 ), 1665 ] = True, 1666 testing_values: Annotated[ 1667 dict | str | None, 1668 Field( 1669 description=( 1670 "Optional testing configuration values for the Builder UI. " 1671 "Can be provided as a JSON object or JSON string. " 1672 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1673 "If provided, these values replace any existing testing values " 1674 "for the connector builder project. The entire testing values object " 1675 "is overwritten, so pass the full set of values you want to persist." 1676 ), 1677 default=None, 1678 ), 1679 ], 1680 testing_values_secret_name: Annotated[ 1681 str | None, 1682 Field( 1683 description=( 1684 "Optional name of a secret containing testing configuration values " 1685 "in JSON or YAML format. The secret will be resolved by the MCP " 1686 "server and merged into testing_values, with secret values taking " 1687 "precedence. This lets the agent reference secrets without sending " 1688 "raw values as tool arguments." 1689 ), 1690 default=None, 1691 ), 1692 ], 1693) -> str: 1694 """Update a custom YAML source definition in Airbyte Cloud. 1695 1696 Updates the manifest and/or testing values for an existing custom source definition. 1697 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1698 """ 1699 check_guid_created_in_session(definition_id) 1700 1701 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1702 1703 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1704 raise PyAirbyteInputError( 1705 message=( 1706 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1707 "must be provided to update a custom source definition." 1708 ), 1709 context={ 1710 "definition_id": definition_id, 1711 "workspace_id": workspace.workspace_id, 1712 }, 1713 ) 1714 1715 processed_manifest: str | Path | None = manifest_yaml 1716 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1717 processed_manifest = Path(manifest_yaml) 1718 1719 # Resolve testing values from inline config and/or secret 1720 testing_values_dict: dict[str, Any] | None = None 1721 if testing_values is not None or testing_values_secret_name is not None: 1722 testing_values_dict = ( 1723 resolve_config( 1724 config=testing_values, 1725 config_secret_name=testing_values_secret_name, 1726 ) 1727 or None 1728 ) 1729 1730 definition = workspace.get_custom_source_definition( 1731 definition_id=definition_id, 1732 definition_type="yaml", 1733 ) 1734 custom_source: CustomCloudSourceDefinition = definition 1735 1736 if processed_manifest is not None: 1737 custom_source = definition.update_definition( 1738 manifest_yaml=processed_manifest, 1739 pre_validate=pre_validate, 1740 ) 1741 1742 if testing_values_dict is not None: 1743 custom_source.set_testing_values(testing_values_dict) 1744 1745 return ( 1746 "Successfully updated custom YAML source definition:\n" 1747 + _get_custom_source_definition_description( 1748 custom_source=custom_source, 1749 ) 1750 )
Update a custom YAML source definition in Airbyte Cloud.
Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1753@mcp_tool( 1754 domain="cloud", 1755 destructive=True, 1756 open_world=True, 1757) 1758def permanently_delete_custom_source_definition( 1759 definition_id: Annotated[ 1760 str, 1761 Field(description="The ID of the custom source definition to delete."), 1762 ], 1763 name: Annotated[ 1764 str, 1765 Field(description="The expected name of the custom source definition (for verification)."), 1766 ], 1767 *, 1768 workspace_id: Annotated[ 1769 str | None, 1770 Field( 1771 description=WORKSPACE_ID_TIP_TEXT, 1772 default=None, 1773 ), 1774 ], 1775) -> str: 1776 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1777 1778 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1779 (case insensitive). 1780 1781 If the connector does not meet this requirement, the deletion will be rejected with a 1782 helpful error message. Instruct the user to rename the connector appropriately to authorize 1783 the deletion. 1784 1785 The provided name must match the actual name of the definition for the operation to proceed. 1786 This is a safety measure to ensure you are deleting the correct resource. 1787 1788 Note: Only YAML (declarative) connectors are currently supported. 1789 Docker-based custom sources are not yet available. 1790 """ 1791 check_guid_created_in_session(definition_id) 1792 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 1793 definition = workspace.get_custom_source_definition( 1794 definition_id=definition_id, 1795 definition_type="yaml", 1796 ) 1797 actual_name: str = definition.name 1798 1799 # Verify the name matches 1800 if actual_name != name: 1801 raise PyAirbyteInputError( 1802 message=( 1803 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1804 "The provided name must exactly match the definition's actual name. " 1805 "This is a safety measure to prevent accidental deletion." 1806 ), 1807 context={ 1808 "definition_id": definition_id, 1809 "expected_name": name, 1810 "actual_name": actual_name, 1811 }, 1812 ) 1813 1814 definition.permanently_delete( 1815 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1816 ) 1817 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1820@mcp_tool( 1821 domain="cloud", 1822 destructive=True, 1823 open_world=True, 1824 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1825) 1826def permanently_delete_cloud_source( 1827 source_id: Annotated[ 1828 str, 1829 Field(description="The ID of the deployed source to delete."), 1830 ], 1831 name: Annotated[ 1832 str, 1833 Field(description="The expected name of the source (for verification)."), 1834 ], 1835) -> str: 1836 """Permanently delete a deployed source connector from Airbyte Cloud. 1837 1838 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1839 (case insensitive). 1840 1841 If the source does not meet this requirement, the deletion will be rejected with a 1842 helpful error message. Instruct the user to rename the source appropriately to authorize 1843 the deletion. 1844 1845 The provided name must match the actual name of the source for the operation to proceed. 1846 This is a safety measure to ensure you are deleting the correct resource. 1847 """ 1848 check_guid_created_in_session(source_id) 1849 workspace: CloudWorkspace = _get_cloud_workspace() 1850 source = workspace.get_source(source_id=source_id) 1851 actual_name: str = cast(str, source.name) 1852 1853 # Verify the name matches 1854 if actual_name != name: 1855 raise PyAirbyteInputError( 1856 message=( 1857 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1858 "The provided name must exactly match the source's actual name. " 1859 "This is a safety measure to prevent accidental deletion." 1860 ), 1861 context={ 1862 "source_id": source_id, 1863 "expected_name": name, 1864 "actual_name": actual_name, 1865 }, 1866 ) 1867 1868 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1869 workspace.permanently_delete_source( 1870 source=source_id, 1871 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1872 ) 1873 return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" (case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the source appropriately to authorize the deletion.
The provided name must match the actual name of the source for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1876@mcp_tool( 1877 domain="cloud", 1878 destructive=True, 1879 open_world=True, 1880 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1881) 1882def permanently_delete_cloud_destination( 1883 destination_id: Annotated[ 1884 str, 1885 Field(description="The ID of the deployed destination to delete."), 1886 ], 1887 name: Annotated[ 1888 str, 1889 Field(description="The expected name of the destination (for verification)."), 1890 ], 1891) -> str: 1892 """Permanently delete a deployed destination connector from Airbyte Cloud. 1893 1894 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 1895 (case insensitive). 1896 1897 If the destination does not meet this requirement, the deletion will be rejected with a 1898 helpful error message. Instruct the user to rename the destination appropriately to authorize 1899 the deletion. 1900 1901 The provided name must match the actual name of the destination for the operation to proceed. 1902 This is a safety measure to ensure you are deleting the correct resource. 1903 """ 1904 check_guid_created_in_session(destination_id) 1905 workspace: CloudWorkspace = _get_cloud_workspace() 1906 destination = workspace.get_destination(destination_id=destination_id) 1907 actual_name: str = cast(str, destination.name) 1908 1909 # Verify the name matches 1910 if actual_name != name: 1911 raise PyAirbyteInputError( 1912 message=( 1913 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1914 "The provided name must exactly match the destination's actual name. " 1915 "This is a safety measure to prevent accidental deletion." 1916 ), 1917 context={ 1918 "destination_id": destination_id, 1919 "expected_name": name, 1920 "actual_name": actual_name, 1921 }, 1922 ) 1923 1924 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1925 workspace.permanently_delete_destination( 1926 destination=destination_id, 1927 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 1928 ) 1929 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" (case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the destination appropriately to authorize the deletion.
The provided name must match the actual name of the destination for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
1932@mcp_tool( 1933 domain="cloud", 1934 destructive=True, 1935 open_world=True, 1936 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1937) 1938def permanently_delete_cloud_connection( 1939 connection_id: Annotated[ 1940 str, 1941 Field(description="The ID of the connection to delete."), 1942 ], 1943 name: Annotated[ 1944 str, 1945 Field(description="The expected name of the connection (for verification)."), 1946 ], 1947 *, 1948 cascade_delete_source: Annotated[ 1949 bool, 1950 Field( 1951 description=( 1952 "Whether to also delete the source connector associated with this connection." 1953 ), 1954 default=False, 1955 ), 1956 ] = False, 1957 cascade_delete_destination: Annotated[ 1958 bool, 1959 Field( 1960 description=( 1961 "Whether to also delete the destination connector associated with this connection." 1962 ), 1963 default=False, 1964 ), 1965 ] = False, 1966) -> str: 1967 """Permanently delete a connection from Airbyte Cloud. 1968 1969 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 1970 (case insensitive). 1971 1972 If the connection does not meet this requirement, the deletion will be rejected with a 1973 helpful error message. Instruct the user to rename the connection appropriately to authorize 1974 the deletion. 1975 1976 The provided name must match the actual name of the connection for the operation to proceed. 1977 This is a safety measure to ensure you are deleting the correct resource. 1978 """ 1979 check_guid_created_in_session(connection_id) 1980 workspace: CloudWorkspace = _get_cloud_workspace() 1981 connection = workspace.get_connection(connection_id=connection_id) 1982 actual_name: str = cast(str, connection.name) 1983 1984 # Verify the name matches 1985 if actual_name != name: 1986 raise PyAirbyteInputError( 1987 message=( 1988 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1989 "The provided name must exactly match the connection's actual name. " 1990 "This is a safety measure to prevent accidental deletion." 1991 ), 1992 context={ 1993 "connection_id": connection_id, 1994 "expected_name": name, 1995 "actual_name": actual_name, 1996 }, 1997 ) 1998 1999 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2000 workspace.permanently_delete_connection( 2001 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2002 connection=connection_id, 2003 cascade_delete_source=cascade_delete_source, 2004 cascade_delete_destination=cascade_delete_destination, 2005 ) 2006 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" (case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connection appropriately to authorize the deletion.
The provided name must match the actual name of the connection for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
2009@mcp_tool( 2010 domain="cloud", 2011 open_world=True, 2012 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2013) 2014def rename_cloud_source( 2015 source_id: Annotated[ 2016 str, 2017 Field(description="The ID of the deployed source to rename."), 2018 ], 2019 name: Annotated[ 2020 str, 2021 Field(description="New name for the source."), 2022 ], 2023 *, 2024 workspace_id: Annotated[ 2025 str | None, 2026 Field( 2027 description=WORKSPACE_ID_TIP_TEXT, 2028 default=None, 2029 ), 2030 ], 2031) -> str: 2032 """Rename a deployed source connector on Airbyte Cloud.""" 2033 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2034 source = workspace.get_source(source_id=source_id) 2035 source.rename(name=name) 2036 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
Rename a deployed source connector on Airbyte Cloud.
2039@mcp_tool( 2040 domain="cloud", 2041 destructive=True, 2042 open_world=True, 2043 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2044) 2045def update_cloud_source_config( 2046 source_id: Annotated[ 2047 str, 2048 Field(description="The ID of the deployed source to update."), 2049 ], 2050 config: Annotated[ 2051 dict | str, 2052 Field( 2053 description="New configuration for the source connector.", 2054 ), 2055 ], 2056 config_secret_name: Annotated[ 2057 str | None, 2058 Field( 2059 description="The name of the secret containing the configuration.", 2060 default=None, 2061 ), 2062 ] = None, 2063 *, 2064 workspace_id: Annotated[ 2065 str | None, 2066 Field( 2067 description=WORKSPACE_ID_TIP_TEXT, 2068 default=None, 2069 ), 2070 ], 2071) -> str: 2072 """Update a deployed source connector's configuration on Airbyte Cloud. 2073 2074 This is a destructive operation that can break existing connections if the 2075 configuration is changed incorrectly. Use with caution. 2076 """ 2077 check_guid_created_in_session(source_id) 2078 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2079 source = workspace.get_source(source_id=source_id) 2080 2081 config_dict = resolve_config( 2082 config=config, 2083 config_secret_name=config_secret_name, 2084 config_spec_jsonschema=None, # We don't have the spec here 2085 ) 2086 2087 source.update_config(config=config_dict) 2088 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
2091@mcp_tool( 2092 domain="cloud", 2093 open_world=True, 2094 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2095) 2096def rename_cloud_destination( 2097 destination_id: Annotated[ 2098 str, 2099 Field(description="The ID of the deployed destination to rename."), 2100 ], 2101 name: Annotated[ 2102 str, 2103 Field(description="New name for the destination."), 2104 ], 2105 *, 2106 workspace_id: Annotated[ 2107 str | None, 2108 Field( 2109 description=WORKSPACE_ID_TIP_TEXT, 2110 default=None, 2111 ), 2112 ], 2113) -> str: 2114 """Rename a deployed destination connector on Airbyte Cloud.""" 2115 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2116 destination = workspace.get_destination(destination_id=destination_id) 2117 destination.rename(name=name) 2118 return ( 2119 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2120 f"URL: {destination.connector_url}" 2121 )
Rename a deployed destination connector on Airbyte Cloud.
2124@mcp_tool( 2125 domain="cloud", 2126 destructive=True, 2127 open_world=True, 2128 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2129) 2130def update_cloud_destination_config( 2131 destination_id: Annotated[ 2132 str, 2133 Field(description="The ID of the deployed destination to update."), 2134 ], 2135 config: Annotated[ 2136 dict | str, 2137 Field( 2138 description="New configuration for the destination connector.", 2139 ), 2140 ], 2141 config_secret_name: Annotated[ 2142 str | None, 2143 Field( 2144 description="The name of the secret containing the configuration.", 2145 default=None, 2146 ), 2147 ], 2148 *, 2149 workspace_id: Annotated[ 2150 str | None, 2151 Field( 2152 description=WORKSPACE_ID_TIP_TEXT, 2153 default=None, 2154 ), 2155 ], 2156) -> str: 2157 """Update a deployed destination connector's configuration on Airbyte Cloud. 2158 2159 This is a destructive operation that can break existing connections if the 2160 configuration is changed incorrectly. Use with caution. 2161 """ 2162 check_guid_created_in_session(destination_id) 2163 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2164 destination = workspace.get_destination(destination_id=destination_id) 2165 2166 config_dict = resolve_config( 2167 config=config, 2168 config_secret_name=config_secret_name, 2169 config_spec_jsonschema=None, # We don't have the spec here 2170 ) 2171 2172 destination.update_config(config=config_dict) 2173 return ( 2174 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2175 )
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the configuration is changed incorrectly. Use with caution.
2178@mcp_tool( 2179 domain="cloud", 2180 open_world=True, 2181 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2182) 2183def rename_cloud_connection( 2184 connection_id: Annotated[ 2185 str, 2186 Field(description="The ID of the connection to rename."), 2187 ], 2188 name: Annotated[ 2189 str, 2190 Field(description="New name for the connection."), 2191 ], 2192 *, 2193 workspace_id: Annotated[ 2194 str | None, 2195 Field( 2196 description=WORKSPACE_ID_TIP_TEXT, 2197 default=None, 2198 ), 2199 ], 2200) -> str: 2201 """Rename a connection on Airbyte Cloud.""" 2202 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2203 connection = workspace.get_connection(connection_id=connection_id) 2204 connection.rename(name=name) 2205 return ( 2206 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2207 f"URL: {connection.connection_url}" 2208 )
Rename a connection on Airbyte Cloud.
2211@mcp_tool( 2212 domain="cloud", 2213 destructive=True, 2214 open_world=True, 2215 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2216) 2217def set_cloud_connection_table_prefix( 2218 connection_id: Annotated[ 2219 str, 2220 Field(description="The ID of the connection to update."), 2221 ], 2222 prefix: Annotated[ 2223 str, 2224 Field(description="New table prefix to use when syncing to the destination."), 2225 ], 2226 *, 2227 workspace_id: Annotated[ 2228 str | None, 2229 Field( 2230 description=WORKSPACE_ID_TIP_TEXT, 2231 default=None, 2232 ), 2233 ], 2234) -> str: 2235 """Set the table prefix for a connection on Airbyte Cloud. 2236 2237 This is a destructive operation that can break downstream dependencies if the 2238 table prefix is changed incorrectly. Use with caution. 2239 """ 2240 check_guid_created_in_session(connection_id) 2241 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2242 connection = workspace.get_connection(connection_id=connection_id) 2243 connection.set_table_prefix(prefix=prefix) 2244 return ( 2245 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2246 f"URL: {connection.connection_url}" 2247 )
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the table prefix is changed incorrectly. Use with caution.
2250@mcp_tool( 2251 domain="cloud", 2252 destructive=True, 2253 open_world=True, 2254 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2255) 2256def set_cloud_connection_selected_streams( 2257 connection_id: Annotated[ 2258 str, 2259 Field(description="The ID of the connection to update."), 2260 ], 2261 stream_names: Annotated[ 2262 str | list[str], 2263 Field( 2264 description=( 2265 "The selected stream names to sync within the connection. " 2266 "Must be an explicit stream name or list of streams." 2267 ) 2268 ), 2269 ], 2270 *, 2271 workspace_id: Annotated[ 2272 str | None, 2273 Field( 2274 description=WORKSPACE_ID_TIP_TEXT, 2275 default=None, 2276 ), 2277 ], 2278) -> str: 2279 """Set the selected streams for a connection on Airbyte Cloud. 2280 2281 This is a destructive operation that can break existing connections if the 2282 stream selection is changed incorrectly. Use with caution. 2283 """ 2284 check_guid_created_in_session(connection_id) 2285 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2286 connection = workspace.get_connection(connection_id=connection_id) 2287 2288 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2289 connection.set_selected_streams(stream_names=resolved_streams_list) 2290 2291 return ( 2292 f"Successfully set selected streams for connection '{connection_id}' " 2293 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2294 )
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
2297@mcp_tool( 2298 domain="cloud", 2299 read_only=True, 2300 idempotent=True, 2301 open_world=True, 2302 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2303) 2304def get_connection_artifact( 2305 connection_id: Annotated[ 2306 str, 2307 Field(description="The ID of the Airbyte Cloud connection."), 2308 ], 2309 artifact_type: Annotated[ 2310 Literal["state", "catalog"], 2311 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2312 ], 2313 *, 2314 workspace_id: Annotated[ 2315 str | None, 2316 Field( 2317 description=WORKSPACE_ID_TIP_TEXT, 2318 default=None, 2319 ), 2320 ], 2321) -> dict[str, Any] | list[dict[str, Any]]: 2322 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2323 2324 Retrieves the specified artifact for a connection: 2325 - 'state': Returns the persisted state for incremental syncs as a list of 2326 stream state objects, or {"ERROR": "..."} if no state is set. 2327 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2328 or {"ERROR": "..."} if not found. 2329 """ 2330 workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) 2331 connection = workspace.get_connection(connection_id=connection_id) 2332 2333 if artifact_type == "state": 2334 result = connection.get_state_artifacts() 2335 if result is None: 2336 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2337 return result 2338 2339 # artifact_type == "catalog" 2340 result = connection.get_catalog_artifact() 2341 if result is None: 2342 return {"ERROR": "No catalog found for this connection"} 2343 return result
Get a connection artifact (state or catalog) from Airbyte Cloud.
Retrieves the specified artifact for a connection:
- 'state': Returns the persisted state for incremental syncs as a list of stream state objects, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict, or {"ERROR": "..."} if not found.