airbyte.mcp.cloud
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from pathlib import Path 5from typing import Annotated, Any, Literal, cast 6 7from airbyte_api.models import JobTypeEnum 8from fastmcp import Context, FastMCP 9from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 10from pydantic import BaseModel, Field 11 12from airbyte import cloud, get_destination, get_source 13from airbyte._util import api_util 14from airbyte.cloud.connectors import CustomCloudSourceDefinition 15from airbyte.cloud.constants import FAILED_STATUSES 16from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace 17from airbyte.constants import ( 18 MCP_CONFIG_API_URL, 19 MCP_CONFIG_BEARER_TOKEN, 20 MCP_CONFIG_CLIENT_ID, 21 MCP_CONFIG_CLIENT_SECRET, 22 MCP_CONFIG_WORKSPACE_ID, 23) 24from airbyte.destinations.util import get_noop_destination 25from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 26from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 27from airbyte.mcp._tool_utils import ( 28 AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET, 29 check_guid_created_in_session, 30 register_guid_created_in_session, 31) 32from airbyte.secrets import SecretString 33 34 35CLOUD_AUTH_TIP_TEXT = ( 36 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 37 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 38 "will be used to authenticate with the Airbyte Cloud API." 39) 40WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 41 42 43class CloudSourceResult(BaseModel): 44 """Information about a deployed source connector in Airbyte Cloud.""" 45 46 id: str 47 """The source ID.""" 48 name: str 49 """Display name of the source.""" 50 url: str 51 """Web URL for managing this source in Airbyte Cloud.""" 52 53 54class CloudDestinationResult(BaseModel): 55 """Information about a deployed destination connector in Airbyte Cloud.""" 56 57 id: str 58 """The destination ID.""" 59 name: str 60 """Display name of the destination.""" 61 url: str 62 """Web URL for managing this destination in Airbyte Cloud.""" 63 64 65class CloudConnectionResult(BaseModel): 66 """Information about a deployed connection in Airbyte Cloud.""" 67 68 id: str 69 """The connection ID.""" 70 name: str 71 """Display name of the connection.""" 72 url: str 73 """Web URL for managing this connection in Airbyte Cloud.""" 74 source_id: str 75 """ID of the source used by this connection.""" 76 destination_id: str 77 """ID of the destination used by this connection.""" 78 last_job_status: str | None = None 79 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 80 Only populated when with_connection_status=True.""" 81 last_job_id: int | None = None 82 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 83 last_job_time: str | None = None 84 """ISO 8601 timestamp of the most recent completed sync. 85 Only populated when with_connection_status=True.""" 86 currently_running_job_id: int | None = None 87 """Job ID of a currently running sync, if any. 88 Only populated when with_connection_status=True.""" 89 currently_running_job_start_time: str | None = None 90 """ISO 8601 timestamp of when the currently running sync started. 91 Only populated when with_connection_status=True.""" 92 93 94class CloudSourceDetails(BaseModel): 95 """Detailed information about a deployed source connector in Airbyte Cloud.""" 96 97 source_id: str 98 """The source ID.""" 99 source_name: str 100 """Display name of the source.""" 101 source_url: str 102 """Web URL for managing this source in Airbyte Cloud.""" 103 connector_definition_id: str 104 """The connector definition ID (e.g., the ID for 'source-postgres').""" 105 106 107class CloudDestinationDetails(BaseModel): 108 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 109 110 destination_id: str 111 """The destination ID.""" 112 destination_name: str 113 """Display name of the destination.""" 114 destination_url: str 115 """Web URL for managing this destination in Airbyte Cloud.""" 116 connector_definition_id: str 117 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 118 119 120class CloudConnectionDetails(BaseModel): 121 """Detailed information about a deployed connection in Airbyte Cloud.""" 122 123 connection_id: str 124 """The connection ID.""" 125 connection_name: str 126 """Display name of the connection.""" 127 connection_url: str 128 """Web URL for managing this connection in Airbyte Cloud.""" 129 source_id: str 130 """ID of the source used by this connection.""" 131 source_name: str 132 """Display name of the source.""" 133 destination_id: str 134 """ID of the destination used by this connection.""" 135 destination_name: str 136 """Display name of the destination.""" 137 selected_streams: list[str] 138 """List of stream names selected for syncing.""" 139 table_prefix: str | None 140 """Table prefix applied when syncing to the destination.""" 141 142 143class CloudOrganizationResult(BaseModel): 144 """Information about an organization in Airbyte Cloud.""" 145 146 id: str 147 """The organization ID.""" 148 name: str 149 """Display name of the organization.""" 150 email: str 151 """Email associated with the organization.""" 152 payment_status: str | None = None 153 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 154 When 'disabled', syncs are blocked due to unpaid invoices.""" 155 subscription_status: str | None = None 156 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 157 'unsubscribed').""" 158 is_account_locked: bool = False 159 """Whether the account is locked due to billing issues. 160 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 161 Defaults to False unless we have affirmative evidence of a locked state.""" 162 163 164class CloudWorkspaceResult(BaseModel): 165 """Information about a workspace in Airbyte Cloud.""" 166 167 workspace_id: str 168 """The workspace ID.""" 169 workspace_name: str 170 """Display name of the workspace.""" 171 workspace_url: str | None = None 172 """URL to access the workspace in Airbyte Cloud.""" 173 organization_id: str 174 """ID of the organization (requires ORGANIZATION_READER permission).""" 175 organization_name: str | None = None 176 """Name of the organization (requires ORGANIZATION_READER permission).""" 177 payment_status: str | None = None 178 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 179 When 'disabled', syncs are blocked due to unpaid invoices. 180 Requires ORGANIZATION_READER permission.""" 181 subscription_status: str | None = None 182 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 183 'unsubscribed'). Requires ORGANIZATION_READER permission.""" 184 is_account_locked: bool = False 185 """Whether the account is locked due to billing issues. 186 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 187 Defaults to False unless we have affirmative evidence of a locked state. 188 Requires ORGANIZATION_READER permission.""" 189 190 191class LogReadResult(BaseModel): 192 """Result of reading sync logs with pagination support.""" 193 194 job_id: int 195 """The job ID the logs belong to.""" 196 attempt_number: int 197 """The attempt number the logs belong to.""" 198 log_text: str 199 """The string containing the log text we are returning.""" 200 log_text_start_line: int 201 """1-based line index of the first line returned.""" 202 log_text_line_count: int 203 """Count of lines we are returning.""" 204 total_log_lines_available: int 205 """Total number of log lines available, shows if any lines were missed due to the limit.""" 206 207 208class SyncJobResult(BaseModel): 209 """Information about a sync job.""" 210 211 job_id: int 212 """The job ID.""" 213 status: str 214 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 215 bytes_synced: int 216 """Number of bytes synced in this job.""" 217 records_synced: int 218 """Number of records synced in this job.""" 219 start_time: str 220 """ISO 8601 timestamp of when the job started.""" 221 job_url: str 222 """URL to view the job in Airbyte Cloud.""" 223 224 225class SyncJobListResult(BaseModel): 226 """Result of listing sync jobs with pagination support.""" 227 228 jobs: list[SyncJobResult] 229 """List of sync jobs.""" 230 jobs_count: int 231 """Number of jobs returned in this response.""" 232 jobs_offset: int 233 """Offset used for this request (0 if not specified).""" 234 from_tail: bool 235 """Whether jobs are ordered newest-first (True) or oldest-first (False).""" 236 237 238def _get_cloud_workspace( 239 ctx: Context, 240 workspace_id: str | None = None, 241) -> CloudWorkspace: 242 """Get an authenticated CloudWorkspace. 243 244 Resolves credentials from multiple sources via MCP config args in order: 245 1. HTTP headers (when running as MCP server with HTTP/SSE transport) 246 2. Environment variables 247 248 The ctx parameter provides access to MCP config values that are resolved 249 from HTTP headers or environment variables based on the config args 250 defined in server.py. 251 """ 252 resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID) 253 if not resolved_workspace_id: 254 raise PyAirbyteInputError( 255 message="Workspace ID is required but not provided.", 256 guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.", 257 ) 258 259 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 260 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 261 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 262 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 263 264 return CloudWorkspace( 265 workspace_id=resolved_workspace_id, 266 client_id=SecretString(client_id) if client_id else None, 267 client_secret=SecretString(client_secret) if client_secret else None, 268 bearer_token=SecretString(bearer_token) if bearer_token else None, 269 api_root=api_url, 270 ) 271 272 273@mcp_tool( 274 open_world=True, 275 extra_help_text=CLOUD_AUTH_TIP_TEXT, 276) 277def deploy_source_to_cloud( 278 ctx: Context, 279 source_name: Annotated[ 280 str, 281 Field(description="The name to use when deploying the source."), 282 ], 283 source_connector_name: Annotated[ 284 str, 285 Field(description="The name of the source connector (e.g., 'source-faker')."), 286 ], 287 *, 288 workspace_id: Annotated[ 289 str | None, 290 Field( 291 description=WORKSPACE_ID_TIP_TEXT, 292 default=None, 293 ), 294 ], 295 config: Annotated[ 296 dict | str | None, 297 Field( 298 description="The configuration for the source connector.", 299 default=None, 300 ), 301 ], 302 config_secret_name: Annotated[ 303 str | None, 304 Field( 305 description="The name of the secret containing the configuration.", 306 default=None, 307 ), 308 ], 309 unique: Annotated[ 310 bool, 311 Field( 312 description="Whether to require a unique name.", 313 default=True, 314 ), 315 ], 316) -> str: 317 """Deploy a source connector to Airbyte Cloud.""" 318 source = get_source( 319 source_connector_name, 320 no_executor=True, 321 ) 322 config_dict = resolve_connector_config( 323 config=config, 324 config_secret_name=config_secret_name, 325 config_spec_jsonschema=source.config_spec, 326 ) 327 source.set_config(config_dict, validate=True) 328 329 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 330 deployed_source = workspace.deploy_source( 331 name=source_name, 332 source=source, 333 unique=unique, 334 ) 335 336 register_guid_created_in_session(deployed_source.connector_id) 337 return ( 338 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 339 f" and URL: {deployed_source.connector_url}" 340 ) 341 342 343@mcp_tool( 344 open_world=True, 345 extra_help_text=CLOUD_AUTH_TIP_TEXT, 346) 347def deploy_destination_to_cloud( 348 ctx: Context, 349 destination_name: Annotated[ 350 str, 351 Field(description="The name to use when deploying the destination."), 352 ], 353 destination_connector_name: Annotated[ 354 str, 355 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 356 ], 357 *, 358 workspace_id: Annotated[ 359 str | None, 360 Field( 361 description=WORKSPACE_ID_TIP_TEXT, 362 default=None, 363 ), 364 ], 365 config: Annotated[ 366 dict | str | None, 367 Field( 368 description="The configuration for the destination connector.", 369 default=None, 370 ), 371 ], 372 config_secret_name: Annotated[ 373 str | None, 374 Field( 375 description="The name of the secret containing the configuration.", 376 default=None, 377 ), 378 ], 379 unique: Annotated[ 380 bool, 381 Field( 382 description="Whether to require a unique name.", 383 default=True, 384 ), 385 ], 386) -> str: 387 """Deploy a destination connector to Airbyte Cloud.""" 388 destination = get_destination( 389 destination_connector_name, 390 no_executor=True, 391 ) 392 config_dict = resolve_connector_config( 393 config=config, 394 config_secret_name=config_secret_name, 395 config_spec_jsonschema=destination.config_spec, 396 ) 397 destination.set_config(config_dict, validate=True) 398 399 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 400 deployed_destination = workspace.deploy_destination( 401 name=destination_name, 402 destination=destination, 403 unique=unique, 404 ) 405 406 register_guid_created_in_session(deployed_destination.connector_id) 407 return ( 408 f"Successfully deployed destination '{destination_name}' " 409 f"with ID: {deployed_destination.connector_id}" 410 ) 411 412 413@mcp_tool( 414 open_world=True, 415 extra_help_text=CLOUD_AUTH_TIP_TEXT, 416) 417def create_connection_on_cloud( 418 ctx: Context, 419 connection_name: Annotated[ 420 str, 421 Field(description="The name of the connection."), 422 ], 423 source_id: Annotated[ 424 str, 425 Field(description="The ID of the deployed source."), 426 ], 427 destination_id: Annotated[ 428 str, 429 Field(description="The ID of the deployed destination."), 430 ], 431 selected_streams: Annotated[ 432 str | list[str], 433 Field( 434 description=( 435 "The selected stream names to sync within the connection. " 436 "Must be an explicit stream name or list of streams. " 437 "Cannot be empty or '*'." 438 ) 439 ), 440 ], 441 *, 442 workspace_id: Annotated[ 443 str | None, 444 Field( 445 description=WORKSPACE_ID_TIP_TEXT, 446 default=None, 447 ), 448 ], 449 table_prefix: Annotated[ 450 str | None, 451 Field( 452 description="Optional table prefix to use when syncing to the destination.", 453 default=None, 454 ), 455 ], 456) -> str: 457 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 458 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 459 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 460 deployed_connection = workspace.deploy_connection( 461 connection_name=connection_name, 462 source=source_id, 463 destination=destination_id, 464 selected_streams=resolved_streams_list, 465 table_prefix=table_prefix, 466 ) 467 468 register_guid_created_in_session(deployed_connection.connection_id) 469 return ( 470 f"Successfully created connection '{connection_name}' " 471 f"with ID '{deployed_connection.connection_id}' and " 472 f"URL: {deployed_connection.connection_url}" 473 ) 474 475 476@mcp_tool( 477 open_world=True, 478 extra_help_text=CLOUD_AUTH_TIP_TEXT, 479) 480def run_cloud_sync( 481 ctx: Context, 482 connection_id: Annotated[ 483 str, 484 Field(description="The ID of the Airbyte Cloud connection."), 485 ], 486 *, 487 workspace_id: Annotated[ 488 str | None, 489 Field( 490 description=WORKSPACE_ID_TIP_TEXT, 491 default=None, 492 ), 493 ], 494 wait: Annotated[ 495 bool, 496 Field( 497 description=( 498 "Whether to wait for the sync to complete. Since a sync can take between several " 499 "minutes and several hours, this option is not recommended for most " 500 "scenarios." 501 ), 502 default=False, 503 ), 504 ], 505 wait_timeout: Annotated[ 506 int, 507 Field( 508 description="Maximum time to wait for sync completion (seconds).", 509 default=300, 510 ), 511 ], 512) -> str: 513 """Run a sync job on Airbyte Cloud.""" 514 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 515 connection = workspace.get_connection(connection_id=connection_id) 516 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 517 518 if wait: 519 status = sync_result.get_job_status() 520 return ( 521 f"Sync completed with status: {status}. " 522 f"Job ID is '{sync_result.job_id}' and " 523 f"job URL is: {sync_result.job_url}" 524 ) 525 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 526 527 528@mcp_tool( 529 read_only=True, 530 idempotent=True, 531 open_world=True, 532 extra_help_text=CLOUD_AUTH_TIP_TEXT, 533) 534def check_airbyte_cloud_workspace( 535 ctx: Context, 536 *, 537 workspace_id: Annotated[ 538 str | None, 539 Field( 540 description=WORKSPACE_ID_TIP_TEXT, 541 default=None, 542 ), 543 ], 544) -> CloudWorkspaceResult: 545 """Check if we have a valid Airbyte Cloud connection and return workspace info. 546 547 Returns workspace details including workspace ID, name, organization info, and billing status. 548 """ 549 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 550 551 # Get workspace details from the public API using workspace's credentials 552 workspace_response = api_util.get_workspace( 553 workspace_id=workspace.workspace_id, 554 api_root=workspace.api_root, 555 client_id=workspace.client_id, 556 client_secret=workspace.client_secret, 557 bearer_token=workspace.bearer_token, 558 ) 559 560 # Try to get organization info (including billing), but fail gracefully if we don't have 561 # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the 562 # organization, which may not be available with workspace-scoped credentials. 563 organization = workspace.get_organization(raise_on_error=False) 564 565 return CloudWorkspaceResult( 566 workspace_id=workspace_response.workspace_id, 567 workspace_name=workspace_response.name, 568 workspace_url=workspace.workspace_url, 569 organization_id=( 570 organization.organization_id 571 if organization 572 else "[unavailable - requires ORGANIZATION_READER permission]" 573 ), 574 organization_name=organization.organization_name if organization else None, 575 payment_status=organization.payment_status if organization else None, 576 subscription_status=organization.subscription_status if organization else None, 577 is_account_locked=organization.is_account_locked if organization else False, 578 ) 579 580 581@mcp_tool( 582 open_world=True, 583 extra_help_text=CLOUD_AUTH_TIP_TEXT, 584) 585def deploy_noop_destination_to_cloud( 586 ctx: Context, 587 name: str = "No-op Destination", 588 *, 589 workspace_id: Annotated[ 590 str | None, 591 Field( 592 description=WORKSPACE_ID_TIP_TEXT, 593 default=None, 594 ), 595 ], 596 unique: bool = True, 597) -> str: 598 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 599 destination = get_noop_destination() 600 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 601 deployed_destination = workspace.deploy_destination( 602 name=name, 603 destination=destination, 604 unique=unique, 605 ) 606 register_guid_created_in_session(deployed_destination.connector_id) 607 return ( 608 f"Successfully deployed No-op Destination " 609 f"with ID '{deployed_destination.connector_id}' and " 610 f"URL: {deployed_destination.connector_url}" 611 ) 612 613 614@mcp_tool( 615 read_only=True, 616 idempotent=True, 617 open_world=True, 618 extra_help_text=CLOUD_AUTH_TIP_TEXT, 619) 620def get_cloud_sync_status( 621 ctx: Context, 622 connection_id: Annotated[ 623 str, 624 Field( 625 description="The ID of the Airbyte Cloud connection.", 626 ), 627 ], 628 job_id: Annotated[ 629 int | None, 630 Field( 631 description="Optional job ID. If not provided, the latest job will be used.", 632 default=None, 633 ), 634 ], 635 *, 636 workspace_id: Annotated[ 637 str | None, 638 Field( 639 description=WORKSPACE_ID_TIP_TEXT, 640 default=None, 641 ), 642 ], 643 include_attempts: Annotated[ 644 bool, 645 Field( 646 description="Whether to include detailed attempts information.", 647 default=False, 648 ), 649 ], 650) -> dict[str, Any]: 651 """Get the status of a sync job from the Airbyte Cloud.""" 652 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 653 connection = workspace.get_connection(connection_id=connection_id) 654 655 # If a job ID is provided, get the job by ID. 656 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 657 658 if not sync_result: 659 return {"status": None, "job_id": None, "attempts": []} 660 661 result = { 662 "status": sync_result.get_job_status(), 663 "job_id": sync_result.job_id, 664 "bytes_synced": sync_result.bytes_synced, 665 "records_synced": sync_result.records_synced, 666 "start_time": sync_result.start_time.isoformat(), 667 "job_url": sync_result.job_url, 668 "attempts": [], 669 } 670 671 if include_attempts: 672 attempts = sync_result.get_attempts() 673 result["attempts"] = [ 674 { 675 "attempt_number": attempt.attempt_number, 676 "attempt_id": attempt.attempt_id, 677 "status": attempt.status, 678 "bytes_synced": attempt.bytes_synced, 679 "records_synced": attempt.records_synced, 680 "created_at": attempt.created_at.isoformat(), 681 } 682 for attempt in attempts 683 ] 684 685 return result 686 687 688@mcp_tool( 689 read_only=True, 690 idempotent=True, 691 open_world=True, 692 extra_help_text=CLOUD_AUTH_TIP_TEXT, 693) 694def list_cloud_sync_jobs( 695 ctx: Context, 696 connection_id: Annotated[ 697 str, 698 Field(description="The ID of the Airbyte Cloud connection."), 699 ], 700 *, 701 workspace_id: Annotated[ 702 str | None, 703 Field( 704 description=WORKSPACE_ID_TIP_TEXT, 705 default=None, 706 ), 707 ], 708 max_jobs: Annotated[ 709 int, 710 Field( 711 description=( 712 "Maximum number of jobs to return. " 713 "Defaults to 20 if not specified. " 714 "Maximum allowed value is 500." 715 ), 716 default=20, 717 ), 718 ], 719 from_tail: Annotated[ 720 bool | None, 721 Field( 722 description=( 723 "When True, jobs are ordered newest-first (createdAt DESC). " 724 "When False, jobs are ordered oldest-first (createdAt ASC). " 725 "Defaults to True if `jobs_offset` is not specified. " 726 "Cannot combine `from_tail=True` with `jobs_offset`." 727 ), 728 default=None, 729 ), 730 ], 731 jobs_offset: Annotated[ 732 int | None, 733 Field( 734 description=( 735 "Number of jobs to skip from the beginning. " 736 "Cannot be combined with `from_tail=True`." 737 ), 738 default=None, 739 ), 740 ], 741 job_type: Annotated[ 742 JobTypeEnum | None, 743 Field( 744 description=( 745 "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. " 746 "If not specified, defaults to sync and reset jobs only (API default). " 747 "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs." 748 ), 749 default=None, 750 ), 751 ], 752) -> SyncJobListResult: 753 """List sync jobs for a connection with pagination support. 754 755 This tool allows you to retrieve a list of sync jobs for a connection, 756 with control over ordering and pagination. By default, jobs are returned 757 newest-first (from_tail=True). 758 """ 759 # Validate that jobs_offset and from_tail are not both set 760 if jobs_offset is not None and from_tail is True: 761 raise PyAirbyteInputError( 762 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 763 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 764 ) 765 766 # Default to from_tail=True if neither is specified 767 if from_tail is None and jobs_offset is None: 768 from_tail = True 769 elif from_tail is None: 770 from_tail = False 771 772 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 773 connection = workspace.get_connection(connection_id=connection_id) 774 775 # Cap at 500 to avoid overloading agent context 776 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 777 778 sync_results = connection.get_previous_sync_logs( 779 limit=effective_limit, 780 offset=jobs_offset, 781 from_tail=from_tail, 782 job_type=job_type, 783 ) 784 785 jobs = [ 786 SyncJobResult( 787 job_id=sync_result.job_id, 788 status=str(sync_result.get_job_status()), 789 bytes_synced=sync_result.bytes_synced, 790 records_synced=sync_result.records_synced, 791 start_time=sync_result.start_time.isoformat(), 792 job_url=sync_result.job_url, 793 ) 794 for sync_result in sync_results 795 ] 796 797 return SyncJobListResult( 798 jobs=jobs, 799 jobs_count=len(jobs), 800 jobs_offset=jobs_offset or 0, 801 from_tail=from_tail, 802 ) 803 804 805@mcp_tool( 806 read_only=True, 807 idempotent=True, 808 open_world=True, 809 extra_help_text=CLOUD_AUTH_TIP_TEXT, 810) 811def list_deployed_cloud_source_connectors( 812 ctx: Context, 813 *, 814 workspace_id: Annotated[ 815 str | None, 816 Field( 817 description=WORKSPACE_ID_TIP_TEXT, 818 default=None, 819 ), 820 ], 821 name_contains: Annotated[ 822 str | None, 823 Field( 824 description="Optional case-insensitive substring to filter sources by name", 825 default=None, 826 ), 827 ], 828 max_items_limit: Annotated[ 829 int | None, 830 Field( 831 description="Optional maximum number of items to return (default: no limit)", 832 default=None, 833 ), 834 ], 835) -> list[CloudSourceResult]: 836 """List all deployed source connectors in the Airbyte Cloud workspace.""" 837 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 838 sources = workspace.list_sources() 839 840 # Filter by name if requested 841 if name_contains: 842 needle = name_contains.lower() 843 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 844 845 # Apply limit if requested 846 if max_items_limit is not None: 847 sources = sources[:max_items_limit] 848 849 # Note: name and url are guaranteed non-null from list API responses 850 return [ 851 CloudSourceResult( 852 id=source.source_id, 853 name=cast(str, source.name), 854 url=cast(str, source.connector_url), 855 ) 856 for source in sources 857 ] 858 859 860@mcp_tool( 861 read_only=True, 862 idempotent=True, 863 open_world=True, 864 extra_help_text=CLOUD_AUTH_TIP_TEXT, 865) 866def list_deployed_cloud_destination_connectors( 867 ctx: Context, 868 *, 869 workspace_id: Annotated[ 870 str | None, 871 Field( 872 description=WORKSPACE_ID_TIP_TEXT, 873 default=None, 874 ), 875 ], 876 name_contains: Annotated[ 877 str | None, 878 Field( 879 description="Optional case-insensitive substring to filter destinations by name", 880 default=None, 881 ), 882 ], 883 max_items_limit: Annotated[ 884 int | None, 885 Field( 886 description="Optional maximum number of items to return (default: no limit)", 887 default=None, 888 ), 889 ], 890) -> list[CloudDestinationResult]: 891 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 892 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 893 destinations = workspace.list_destinations() 894 895 # Filter by name if requested 896 if name_contains: 897 needle = name_contains.lower() 898 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 899 900 # Apply limit if requested 901 if max_items_limit is not None: 902 destinations = destinations[:max_items_limit] 903 904 # Note: name and url are guaranteed non-null from list API responses 905 return [ 906 CloudDestinationResult( 907 id=destination.destination_id, 908 name=cast(str, destination.name), 909 url=cast(str, destination.connector_url), 910 ) 911 for destination in destinations 912 ] 913 914 915@mcp_tool( 916 read_only=True, 917 idempotent=True, 918 open_world=True, 919 extra_help_text=CLOUD_AUTH_TIP_TEXT, 920) 921def describe_cloud_source( 922 ctx: Context, 923 source_id: Annotated[ 924 str, 925 Field(description="The ID of the source to describe."), 926 ], 927 *, 928 workspace_id: Annotated[ 929 str | None, 930 Field( 931 description=WORKSPACE_ID_TIP_TEXT, 932 default=None, 933 ), 934 ], 935) -> CloudSourceDetails: 936 """Get detailed information about a specific deployed source connector.""" 937 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 938 source = workspace.get_source(source_id=source_id) 939 940 # Access name property to ensure _connector_info is populated 941 source_name = cast(str, source.name) 942 943 return CloudSourceDetails( 944 source_id=source.source_id, 945 source_name=source_name, 946 source_url=source.connector_url, 947 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 948 ) 949 950 951@mcp_tool( 952 read_only=True, 953 idempotent=True, 954 open_world=True, 955 extra_help_text=CLOUD_AUTH_TIP_TEXT, 956) 957def describe_cloud_destination( 958 ctx: Context, 959 destination_id: Annotated[ 960 str, 961 Field(description="The ID of the destination to describe."), 962 ], 963 *, 964 workspace_id: Annotated[ 965 str | None, 966 Field( 967 description=WORKSPACE_ID_TIP_TEXT, 968 default=None, 969 ), 970 ], 971) -> CloudDestinationDetails: 972 """Get detailed information about a specific deployed destination connector.""" 973 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 974 destination = workspace.get_destination(destination_id=destination_id) 975 976 # Access name property to ensure _connector_info is populated 977 destination_name = cast(str, destination.name) 978 979 return CloudDestinationDetails( 980 destination_id=destination.destination_id, 981 destination_name=destination_name, 982 destination_url=destination.connector_url, 983 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 984 ) 985 986 987@mcp_tool( 988 read_only=True, 989 idempotent=True, 990 open_world=True, 991 extra_help_text=CLOUD_AUTH_TIP_TEXT, 992) 993def describe_cloud_connection( 994 ctx: Context, 995 connection_id: Annotated[ 996 str, 997 Field(description="The ID of the connection to describe."), 998 ], 999 *, 1000 workspace_id: Annotated[ 1001 str | None, 1002 Field( 1003 description=WORKSPACE_ID_TIP_TEXT, 1004 default=None, 1005 ), 1006 ], 1007) -> CloudConnectionDetails: 1008 """Get detailed information about a specific deployed connection.""" 1009 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1010 connection = workspace.get_connection(connection_id=connection_id) 1011 1012 return CloudConnectionDetails( 1013 connection_id=connection.connection_id, 1014 connection_name=cast(str, connection.name), 1015 connection_url=cast(str, connection.connection_url), 1016 source_id=connection.source_id, 1017 source_name=cast(str, connection.source.name), 1018 destination_id=connection.destination_id, 1019 destination_name=cast(str, connection.destination.name), 1020 selected_streams=connection.stream_names, 1021 table_prefix=connection.table_prefix, 1022 ) 1023 1024 1025@mcp_tool( 1026 read_only=True, 1027 idempotent=True, 1028 open_world=True, 1029 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1030) 1031def get_cloud_sync_logs( 1032 ctx: Context, 1033 connection_id: Annotated[ 1034 str, 1035 Field(description="The ID of the Airbyte Cloud connection."), 1036 ], 1037 job_id: Annotated[ 1038 int | None, 1039 Field(description="Optional job ID. If not provided, the latest job will be used."), 1040 ] = None, 1041 attempt_number: Annotated[ 1042 int | None, 1043 Field( 1044 description="Optional attempt number. If not provided, the latest attempt will be used." 1045 ), 1046 ] = None, 1047 *, 1048 workspace_id: Annotated[ 1049 str | None, 1050 Field( 1051 description=WORKSPACE_ID_TIP_TEXT, 1052 default=None, 1053 ), 1054 ], 1055 max_lines: Annotated[ 1056 int, 1057 Field( 1058 description=( 1059 "Maximum number of lines to return. " 1060 "Defaults to 4000 if not specified. " 1061 "If '0' is provided, no limit is applied." 1062 ), 1063 default=4000, 1064 ), 1065 ], 1066 from_tail: Annotated[ 1067 bool | None, 1068 Field( 1069 description=( 1070 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1071 "Defaults to True if `line_offset` is not specified. " 1072 "Cannot combine `from_tail=True` with `line_offset`." 1073 ), 1074 default=None, 1075 ), 1076 ], 1077 line_offset: Annotated[ 1078 int | None, 1079 Field( 1080 description=( 1081 "Number of lines to skip from the beginning of the logs. " 1082 "Cannot be combined with `from_tail=True`." 1083 ), 1084 default=None, 1085 ), 1086 ], 1087) -> LogReadResult: 1088 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1089 # Validate that line_offset and from_tail are not both set 1090 if line_offset is not None and from_tail: 1091 raise PyAirbyteInputError( 1092 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1093 context={"line_offset": line_offset, "from_tail": from_tail}, 1094 ) 1095 1096 if from_tail is None and line_offset is None: 1097 from_tail = True 1098 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1099 connection = workspace.get_connection(connection_id=connection_id) 1100 1101 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1102 1103 if not sync_result: 1104 raise AirbyteMissingResourceError( 1105 resource_type="sync job", 1106 resource_name_or_id=connection_id, 1107 ) 1108 1109 attempts = sync_result.get_attempts() 1110 1111 if not attempts: 1112 raise AirbyteMissingResourceError( 1113 resource_type="sync attempt", 1114 resource_name_or_id=str(sync_result.job_id), 1115 ) 1116 1117 if attempt_number is not None: 1118 target_attempt = None 1119 for attempt in attempts: 1120 if attempt.attempt_number == attempt_number: 1121 target_attempt = attempt 1122 break 1123 1124 if target_attempt is None: 1125 raise AirbyteMissingResourceError( 1126 resource_type="sync attempt", 1127 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1128 ) 1129 else: 1130 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1131 1132 logs = target_attempt.get_full_log_text() 1133 1134 if not logs: 1135 # Return empty result with zero lines 1136 return LogReadResult( 1137 log_text=( 1138 f"[No logs available for job '{sync_result.job_id}', " 1139 f"attempt {target_attempt.attempt_number}.]" 1140 ), 1141 log_text_start_line=1, 1142 log_text_line_count=0, 1143 total_log_lines_available=0, 1144 job_id=sync_result.job_id, 1145 attempt_number=target_attempt.attempt_number, 1146 ) 1147 1148 # Apply line limiting 1149 log_lines = logs.splitlines() 1150 total_lines = len(log_lines) 1151 1152 # Determine effective max_lines (0 means no limit) 1153 effective_max = total_lines if max_lines == 0 else max_lines 1154 1155 # Calculate start_index and slice based on from_tail or line_offset 1156 if from_tail: 1157 start_index = max(0, total_lines - effective_max) 1158 selected_lines = log_lines[start_index:][:effective_max] 1159 else: 1160 start_index = line_offset or 0 1161 selected_lines = log_lines[start_index : start_index + effective_max] 1162 1163 return LogReadResult( 1164 log_text="\n".join(selected_lines), 1165 log_text_start_line=start_index + 1, # Convert to 1-based index 1166 log_text_line_count=len(selected_lines), 1167 total_log_lines_available=total_lines, 1168 job_id=sync_result.job_id, 1169 attempt_number=target_attempt.attempt_number, 1170 ) 1171 1172 1173@mcp_tool( 1174 read_only=True, 1175 idempotent=True, 1176 open_world=True, 1177 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1178) 1179def list_deployed_cloud_connections( 1180 ctx: Context, 1181 *, 1182 workspace_id: Annotated[ 1183 str | None, 1184 Field( 1185 description=WORKSPACE_ID_TIP_TEXT, 1186 default=None, 1187 ), 1188 ], 1189 name_contains: Annotated[ 1190 str | None, 1191 Field( 1192 description="Optional case-insensitive substring to filter connections by name", 1193 default=None, 1194 ), 1195 ], 1196 max_items_limit: Annotated[ 1197 int | None, 1198 Field( 1199 description="Optional maximum number of items to return (default: no limit)", 1200 default=None, 1201 ), 1202 ], 1203 with_connection_status: Annotated[ 1204 bool | None, 1205 Field( 1206 description="If True, include status info for each connection's most recent sync job", 1207 default=False, 1208 ), 1209 ], 1210 failing_connections_only: Annotated[ 1211 bool | None, 1212 Field( 1213 description="If True, only return connections with failed/cancelled last sync", 1214 default=False, 1215 ), 1216 ], 1217) -> list[CloudConnectionResult]: 1218 """List all deployed connections in the Airbyte Cloud workspace. 1219 1220 When with_connection_status is True, each connection result will include 1221 information about the most recent sync job status, skipping over any 1222 currently in-progress syncs to find the last completed job. 1223 1224 When failing_connections_only is True, only connections where the most 1225 recent completed sync job failed or was cancelled will be returned. 1226 This implicitly enables with_connection_status. 1227 """ 1228 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1229 connections = workspace.list_connections() 1230 1231 # Filter by name if requested 1232 if name_contains: 1233 needle = name_contains.lower() 1234 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1235 1236 # If failing_connections_only is True, implicitly enable with_connection_status 1237 if failing_connections_only: 1238 with_connection_status = True 1239 1240 results: list[CloudConnectionResult] = [] 1241 1242 for connection in connections: 1243 last_job_status: str | None = None 1244 last_job_id: int | None = None 1245 last_job_time: str | None = None 1246 currently_running_job_id: int | None = None 1247 currently_running_job_start_time: str | None = None 1248 1249 if with_connection_status: 1250 sync_logs = connection.get_previous_sync_logs(limit=5) 1251 last_completed_job_status = None # Keep enum for comparison 1252 1253 for sync_result in sync_logs: 1254 job_status = sync_result.get_job_status() 1255 1256 if not sync_result.is_job_complete(): 1257 currently_running_job_id = sync_result.job_id 1258 currently_running_job_start_time = sync_result.start_time.isoformat() 1259 continue 1260 1261 last_completed_job_status = job_status 1262 last_job_status = str(job_status.value) if job_status else None 1263 last_job_id = sync_result.job_id 1264 last_job_time = sync_result.start_time.isoformat() 1265 break 1266 1267 if failing_connections_only and ( 1268 last_completed_job_status is None 1269 or last_completed_job_status not in FAILED_STATUSES 1270 ): 1271 continue 1272 1273 results.append( 1274 CloudConnectionResult( 1275 id=connection.connection_id, 1276 name=cast(str, connection.name), 1277 url=cast(str, connection.connection_url), 1278 source_id=connection.source_id, 1279 destination_id=connection.destination_id, 1280 last_job_status=last_job_status, 1281 last_job_id=last_job_id, 1282 last_job_time=last_job_time, 1283 currently_running_job_id=currently_running_job_id, 1284 currently_running_job_start_time=currently_running_job_start_time, 1285 ) 1286 ) 1287 1288 if max_items_limit is not None and len(results) >= max_items_limit: 1289 break 1290 1291 return results 1292 1293 1294def _resolve_organization( 1295 organization_id: str | None, 1296 organization_name: str | None, 1297 *, 1298 api_root: str, 1299 client_id: SecretString | None, 1300 client_secret: SecretString | None, 1301 bearer_token: SecretString | None = None, 1302) -> CloudOrganization: 1303 """Resolve organization from either ID or exact name match. 1304 1305 Args: 1306 organization_id: The organization ID (if provided directly) 1307 organization_name: The organization name (exact match required) 1308 api_root: The API root URL 1309 client_id: OAuth client ID (optional if bearer_token is provided) 1310 client_secret: OAuth client secret (optional if bearer_token is provided) 1311 bearer_token: Bearer token for authentication (optional if client credentials provided) 1312 1313 Returns: 1314 A CloudOrganization object with credentials for lazy loading of billing info. 1315 1316 Raises: 1317 PyAirbyteInputError: If neither or both parameters are provided, 1318 or if no organization matches the exact name 1319 AirbyteMissingResourceError: If the organization is not found 1320 """ 1321 if organization_id and organization_name: 1322 raise PyAirbyteInputError( 1323 message="Provide either 'organization_id' or 'organization_name', not both." 1324 ) 1325 if not organization_id and not organization_name: 1326 raise PyAirbyteInputError( 1327 message="Either 'organization_id' or 'organization_name' must be provided." 1328 ) 1329 1330 # Get all organizations for the user 1331 orgs = api_util.list_organizations_for_user( 1332 api_root=api_root, 1333 client_id=client_id, 1334 client_secret=client_secret, 1335 bearer_token=bearer_token, 1336 ) 1337 1338 org_response: api_util.models.OrganizationResponse | None = None 1339 1340 if organization_id: 1341 # Find by ID 1342 matching_orgs = [org for org in orgs if org.organization_id == organization_id] 1343 if not matching_orgs: 1344 raise AirbyteMissingResourceError( 1345 resource_type="organization", 1346 context={ 1347 "organization_id": organization_id, 1348 "message": f"No organization found with ID '{organization_id}' " 1349 "for the current user.", 1350 }, 1351 ) 1352 org_response = matching_orgs[0] 1353 else: 1354 # Find by exact name match (case-sensitive) 1355 matching_orgs = [org for org in orgs if org.organization_name == organization_name] 1356 1357 if not matching_orgs: 1358 raise AirbyteMissingResourceError( 1359 resource_type="organization", 1360 context={ 1361 "organization_name": organization_name, 1362 "message": f"No organization found with exact name '{organization_name}' " 1363 "for the current user.", 1364 }, 1365 ) 1366 1367 if len(matching_orgs) > 1: 1368 raise PyAirbyteInputError( 1369 message=f"Multiple organizations found with name '{organization_name}'. " 1370 "Please use 'organization_id' instead to specify the exact organization." 1371 ) 1372 1373 org_response = matching_orgs[0] 1374 1375 # Return a CloudOrganization with credentials for lazy loading of billing info 1376 return CloudOrganization( 1377 organization_id=org_response.organization_id, 1378 organization_name=org_response.organization_name, 1379 email=org_response.email, 1380 api_root=api_root, 1381 client_id=client_id, 1382 client_secret=client_secret, 1383 bearer_token=bearer_token, 1384 ) 1385 1386 1387def _resolve_organization_id( 1388 organization_id: str | None, 1389 organization_name: str | None, 1390 *, 1391 api_root: str, 1392 client_id: SecretString | None, 1393 client_secret: SecretString | None, 1394 bearer_token: SecretString | None = None, 1395) -> str: 1396 """Resolve organization ID from either ID or exact name match. 1397 1398 This is a convenience wrapper around _resolve_organization that returns just the ID. 1399 """ 1400 org = _resolve_organization( 1401 organization_id=organization_id, 1402 organization_name=organization_name, 1403 api_root=api_root, 1404 client_id=client_id, 1405 client_secret=client_secret, 1406 bearer_token=bearer_token, 1407 ) 1408 return org.organization_id 1409 1410 1411@mcp_tool( 1412 read_only=True, 1413 idempotent=True, 1414 open_world=True, 1415 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1416) 1417def list_cloud_workspaces( 1418 ctx: Context, 1419 *, 1420 organization_id: Annotated[ 1421 str | None, 1422 Field( 1423 description="Organization ID. Required if organization_name is not provided.", 1424 default=None, 1425 ), 1426 ], 1427 organization_name: Annotated[ 1428 str | None, 1429 Field( 1430 description=( 1431 "Organization name (exact match). " "Required if organization_id is not provided." 1432 ), 1433 default=None, 1434 ), 1435 ], 1436 name_contains: Annotated[ 1437 str | None, 1438 Field( 1439 description="Optional substring to filter workspaces by name (server-side filtering)", 1440 default=None, 1441 ), 1442 ], 1443 max_items_limit: Annotated[ 1444 int | None, 1445 Field( 1446 description="Optional maximum number of items to return (default: no limit)", 1447 default=None, 1448 ), 1449 ], 1450) -> list[CloudWorkspaceResult]: 1451 """List all workspaces in a specific organization. 1452 1453 Requires either organization_id OR organization_name (exact match) to be provided. 1454 This tool will NOT list workspaces across all organizations - you must specify 1455 which organization to list workspaces from. 1456 """ 1457 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1458 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1459 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1460 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1461 1462 resolved_org_id = _resolve_organization_id( 1463 organization_id=organization_id, 1464 organization_name=organization_name, 1465 api_root=api_url, 1466 client_id=SecretString(client_id) if client_id else None, 1467 client_secret=SecretString(client_secret) if client_secret else None, 1468 bearer_token=SecretString(bearer_token) if bearer_token else None, 1469 ) 1470 1471 workspaces = api_util.list_workspaces_in_organization( 1472 organization_id=resolved_org_id, 1473 api_root=api_url, 1474 client_id=SecretString(client_id) if client_id else None, 1475 client_secret=SecretString(client_secret) if client_secret else None, 1476 bearer_token=SecretString(bearer_token) if bearer_token else None, 1477 name_contains=name_contains, 1478 max_items_limit=max_items_limit, 1479 ) 1480 1481 return [ 1482 CloudWorkspaceResult( 1483 workspace_id=ws.get("workspaceId", ""), 1484 workspace_name=ws.get("name", ""), 1485 organization_id=ws.get("organizationId", ""), 1486 ) 1487 for ws in workspaces 1488 ] 1489 1490 1491@mcp_tool( 1492 read_only=True, 1493 idempotent=True, 1494 open_world=True, 1495 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1496) 1497def describe_cloud_organization( 1498 ctx: Context, 1499 *, 1500 organization_id: Annotated[ 1501 str | None, 1502 Field( 1503 description="Organization ID. Required if organization_name is not provided.", 1504 default=None, 1505 ), 1506 ], 1507 organization_name: Annotated[ 1508 str | None, 1509 Field( 1510 description=( 1511 "Organization name (exact match). " "Required if organization_id is not provided." 1512 ), 1513 default=None, 1514 ), 1515 ], 1516) -> CloudOrganizationResult: 1517 """Get details about a specific organization including billing status. 1518 1519 Requires either organization_id OR organization_name (exact match) to be provided. 1520 This tool is useful for looking up an organization's ID from its name, or vice versa. 1521 """ 1522 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1523 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1524 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1525 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1526 1527 org = _resolve_organization( 1528 organization_id=organization_id, 1529 organization_name=organization_name, 1530 api_root=api_url, 1531 client_id=SecretString(client_id) if client_id else None, 1532 client_secret=SecretString(client_secret) if client_secret else None, 1533 bearer_token=SecretString(bearer_token) if bearer_token else None, 1534 ) 1535 1536 # CloudOrganization has lazy loading of billing properties 1537 return CloudOrganizationResult( 1538 id=org.organization_id, 1539 name=org.organization_name, 1540 email=org.email, 1541 payment_status=org.payment_status, 1542 subscription_status=org.subscription_status, 1543 is_account_locked=org.is_account_locked, 1544 ) 1545 1546 1547def _get_custom_source_definition_description( 1548 custom_source: CustomCloudSourceDefinition, 1549) -> str: 1550 return "\n".join( 1551 [ 1552 f" - Custom Source Name: {custom_source.name}", 1553 f" - Definition ID: {custom_source.definition_id}", 1554 f" - Definition Version: {custom_source.version}", 1555 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1556 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1557 ] 1558 ) 1559 1560 1561@mcp_tool( 1562 open_world=True, 1563 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1564) 1565def publish_custom_source_definition( 1566 ctx: Context, 1567 name: Annotated[ 1568 str, 1569 Field(description="The name for the custom connector definition."), 1570 ], 1571 *, 1572 workspace_id: Annotated[ 1573 str | None, 1574 Field( 1575 description=WORKSPACE_ID_TIP_TEXT, 1576 default=None, 1577 ), 1578 ], 1579 manifest_yaml: Annotated[ 1580 str | Path | None, 1581 Field( 1582 description=( 1583 "The Low-code CDK manifest as a YAML string or file path. " 1584 "Required for YAML connectors." 1585 ), 1586 default=None, 1587 ), 1588 ] = None, 1589 unique: Annotated[ 1590 bool, 1591 Field( 1592 description="Whether to require a unique name.", 1593 default=True, 1594 ), 1595 ] = True, 1596 pre_validate: Annotated[ 1597 bool, 1598 Field( 1599 description="Whether to validate the manifest client-side before publishing.", 1600 default=True, 1601 ), 1602 ] = True, 1603 testing_values: Annotated[ 1604 dict | str | None, 1605 Field( 1606 description=( 1607 "Optional testing configuration values for the Builder UI. " 1608 "Can be provided as a JSON object or JSON string. " 1609 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1610 "If provided, these values replace any existing testing values " 1611 "for the connector builder project, allowing immediate test read operations." 1612 ), 1613 default=None, 1614 ), 1615 ], 1616 testing_values_secret_name: Annotated[ 1617 str | None, 1618 Field( 1619 description=( 1620 "Optional name of a secret containing testing configuration values " 1621 "in JSON or YAML format. The secret will be resolved by the MCP " 1622 "server and merged into testing_values, with secret values taking " 1623 "precedence. This lets the agent reference secrets without sending " 1624 "raw values as tool arguments." 1625 ), 1626 default=None, 1627 ), 1628 ], 1629) -> str: 1630 """Publish a custom YAML source connector definition to Airbyte Cloud. 1631 1632 Note: Only YAML (declarative) connectors are currently supported. 1633 Docker-based custom sources are not yet available. 1634 """ 1635 processed_manifest = manifest_yaml 1636 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1637 processed_manifest = Path(manifest_yaml) 1638 1639 # Resolve testing values from inline config and/or secret 1640 testing_values_dict: dict[str, Any] | None = None 1641 if testing_values is not None or testing_values_secret_name is not None: 1642 testing_values_dict = ( 1643 resolve_connector_config( 1644 config=testing_values, 1645 config_secret_name=testing_values_secret_name, 1646 ) 1647 or None 1648 ) 1649 1650 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1651 custom_source = workspace.publish_custom_source_definition( 1652 name=name, 1653 manifest_yaml=processed_manifest, 1654 unique=unique, 1655 pre_validate=pre_validate, 1656 testing_values=testing_values_dict, 1657 ) 1658 register_guid_created_in_session(custom_source.definition_id) 1659 return ( 1660 "Successfully published custom YAML source definition:\n" 1661 + _get_custom_source_definition_description( 1662 custom_source=custom_source, 1663 ) 1664 + "\n" 1665 ) 1666 1667 1668@mcp_tool( 1669 read_only=True, 1670 idempotent=True, 1671 open_world=True, 1672) 1673def list_custom_source_definitions( 1674 ctx: Context, 1675 *, 1676 workspace_id: Annotated[ 1677 str | None, 1678 Field( 1679 description=WORKSPACE_ID_TIP_TEXT, 1680 default=None, 1681 ), 1682 ], 1683) -> list[dict[str, Any]]: 1684 """List custom YAML source definitions in the Airbyte Cloud workspace. 1685 1686 Note: Only YAML (declarative) connectors are currently supported. 1687 Docker-based custom sources are not yet available. 1688 """ 1689 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1690 definitions = workspace.list_custom_source_definitions( 1691 definition_type="yaml", 1692 ) 1693 1694 return [ 1695 { 1696 "definition_id": d.definition_id, 1697 "name": d.name, 1698 "version": d.version, 1699 "connector_builder_project_url": d.connector_builder_project_url, 1700 } 1701 for d in definitions 1702 ] 1703 1704 1705@mcp_tool( 1706 read_only=True, 1707 idempotent=True, 1708 open_world=True, 1709) 1710def get_custom_source_definition( 1711 ctx: Context, 1712 definition_id: Annotated[ 1713 str, 1714 Field(description="The ID of the custom source definition to retrieve."), 1715 ], 1716 *, 1717 workspace_id: Annotated[ 1718 str | None, 1719 Field( 1720 description=WORKSPACE_ID_TIP_TEXT, 1721 default=None, 1722 ), 1723 ], 1724 include_draft: Annotated[ 1725 bool, 1726 Field( 1727 description=( 1728 "Whether to include the Connector Builder draft manifest in the response. " 1729 "If True and a draft exists, the response will include 'has_draft' and " 1730 "'draft_manifest' fields. Defaults to False." 1731 ), 1732 default=False, 1733 ), 1734 ] = False, 1735) -> dict[str, Any]: 1736 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1737 1738 Returns the full definition details including the published manifest YAML content. 1739 Optionally includes the Connector Builder draft manifest (unpublished changes) 1740 when include_draft=True. 1741 1742 Note: Only YAML (declarative) connectors are currently supported. 1743 Docker-based custom sources are not yet available. 1744 """ 1745 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1746 definition = workspace.get_custom_source_definition( 1747 definition_id=definition_id, 1748 definition_type="yaml", 1749 ) 1750 1751 result: dict[str, Any] = { 1752 "definition_id": definition.definition_id, 1753 "name": definition.name, 1754 "version": definition.version, 1755 "connector_builder_project_id": definition.connector_builder_project_id, 1756 "connector_builder_project_url": definition.connector_builder_project_url, 1757 "manifest": definition.manifest, 1758 } 1759 1760 if include_draft: 1761 result["has_draft"] = definition.has_draft 1762 result["draft_manifest"] = definition.draft_manifest 1763 1764 return result 1765 1766 1767@mcp_tool( 1768 read_only=True, 1769 idempotent=True, 1770 open_world=True, 1771) 1772def get_connector_builder_draft_manifest( 1773 ctx: Context, 1774 definition_id: Annotated[ 1775 str, 1776 Field(description="The ID of the custom source definition to retrieve the draft for."), 1777 ], 1778 *, 1779 workspace_id: Annotated[ 1780 str | None, 1781 Field( 1782 description=WORKSPACE_ID_TIP_TEXT, 1783 default=None, 1784 ), 1785 ], 1786) -> dict[str, Any]: 1787 """Get the Connector Builder draft manifest for a custom source definition. 1788 1789 Returns the working draft manifest that has been saved in the Connector Builder UI 1790 but not yet published. This is useful for inspecting what a user is currently working 1791 on before they publish their changes. 1792 1793 If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. 1794 The published manifest is always included for comparison. 1795 """ 1796 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1797 definition = workspace.get_custom_source_definition( 1798 definition_id=definition_id, 1799 definition_type="yaml", 1800 ) 1801 1802 return { 1803 "definition_id": definition.definition_id, 1804 "name": definition.name, 1805 "connector_builder_project_id": definition.connector_builder_project_id, 1806 "connector_builder_project_url": definition.connector_builder_project_url, 1807 "has_draft": definition.has_draft, 1808 "draft_manifest": definition.draft_manifest, 1809 "published_manifest": definition.manifest, 1810 } 1811 1812 1813@mcp_tool( 1814 destructive=True, 1815 open_world=True, 1816) 1817def update_custom_source_definition( 1818 ctx: Context, 1819 definition_id: Annotated[ 1820 str, 1821 Field(description="The ID of the definition to update."), 1822 ], 1823 manifest_yaml: Annotated[ 1824 str | Path | None, 1825 Field( 1826 description=( 1827 "New manifest as YAML string or file path. " 1828 "Optional; omit to update only testing values." 1829 ), 1830 default=None, 1831 ), 1832 ] = None, 1833 *, 1834 workspace_id: Annotated[ 1835 str | None, 1836 Field( 1837 description=WORKSPACE_ID_TIP_TEXT, 1838 default=None, 1839 ), 1840 ], 1841 pre_validate: Annotated[ 1842 bool, 1843 Field( 1844 description="Whether to validate the manifest client-side before updating.", 1845 default=True, 1846 ), 1847 ] = True, 1848 testing_values: Annotated[ 1849 dict | str | None, 1850 Field( 1851 description=( 1852 "Optional testing configuration values for the Builder UI. " 1853 "Can be provided as a JSON object or JSON string. " 1854 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1855 "If provided, these values replace any existing testing values " 1856 "for the connector builder project. The entire testing values object " 1857 "is overwritten, so pass the full set of values you want to persist." 1858 ), 1859 default=None, 1860 ), 1861 ], 1862 testing_values_secret_name: Annotated[ 1863 str | None, 1864 Field( 1865 description=( 1866 "Optional name of a secret containing testing configuration values " 1867 "in JSON or YAML format. The secret will be resolved by the MCP " 1868 "server and merged into testing_values, with secret values taking " 1869 "precedence. This lets the agent reference secrets without sending " 1870 "raw values as tool arguments." 1871 ), 1872 default=None, 1873 ), 1874 ], 1875) -> str: 1876 """Update a custom YAML source definition in Airbyte Cloud. 1877 1878 Updates the manifest and/or testing values for an existing custom source definition. 1879 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1880 """ 1881 check_guid_created_in_session(definition_id) 1882 1883 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1884 1885 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1886 raise PyAirbyteInputError( 1887 message=( 1888 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1889 "must be provided to update a custom source definition." 1890 ), 1891 context={ 1892 "definition_id": definition_id, 1893 "workspace_id": workspace.workspace_id, 1894 }, 1895 ) 1896 1897 processed_manifest: str | Path | None = manifest_yaml 1898 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1899 processed_manifest = Path(manifest_yaml) 1900 1901 # Resolve testing values from inline config and/or secret 1902 testing_values_dict: dict[str, Any] | None = None 1903 if testing_values is not None or testing_values_secret_name is not None: 1904 testing_values_dict = ( 1905 resolve_connector_config( 1906 config=testing_values, 1907 config_secret_name=testing_values_secret_name, 1908 ) 1909 or None 1910 ) 1911 1912 definition = workspace.get_custom_source_definition( 1913 definition_id=definition_id, 1914 definition_type="yaml", 1915 ) 1916 custom_source: CustomCloudSourceDefinition = definition 1917 1918 if processed_manifest is not None: 1919 custom_source = definition.update_definition( 1920 manifest_yaml=processed_manifest, 1921 pre_validate=pre_validate, 1922 ) 1923 1924 if testing_values_dict is not None: 1925 custom_source.set_testing_values(testing_values_dict) 1926 1927 return ( 1928 "Successfully updated custom YAML source definition:\n" 1929 + _get_custom_source_definition_description( 1930 custom_source=custom_source, 1931 ) 1932 ) 1933 1934 1935@mcp_tool( 1936 destructive=True, 1937 open_world=True, 1938) 1939def permanently_delete_custom_source_definition( 1940 ctx: Context, 1941 definition_id: Annotated[ 1942 str, 1943 Field(description="The ID of the custom source definition to delete."), 1944 ], 1945 name: Annotated[ 1946 str, 1947 Field(description="The expected name of the custom source definition (for verification)."), 1948 ], 1949 *, 1950 workspace_id: Annotated[ 1951 str | None, 1952 Field( 1953 description=WORKSPACE_ID_TIP_TEXT, 1954 default=None, 1955 ), 1956 ], 1957) -> str: 1958 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1959 1960 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1961 (case insensitive). 1962 1963 If the connector does not meet this requirement, the deletion will be rejected with a 1964 helpful error message. Instruct the user to rename the connector appropriately to authorize 1965 the deletion. 1966 1967 The provided name must match the actual name of the definition for the operation to proceed. 1968 This is a safety measure to ensure you are deleting the correct resource. 1969 1970 Note: Only YAML (declarative) connectors are currently supported. 1971 Docker-based custom sources are not yet available. 1972 """ 1973 check_guid_created_in_session(definition_id) 1974 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1975 definition = workspace.get_custom_source_definition( 1976 definition_id=definition_id, 1977 definition_type="yaml", 1978 ) 1979 actual_name: str = definition.name 1980 1981 # Verify the name matches 1982 if actual_name != name: 1983 raise PyAirbyteInputError( 1984 message=( 1985 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1986 "The provided name must exactly match the definition's actual name. " 1987 "This is a safety measure to prevent accidental deletion." 1988 ), 1989 context={ 1990 "definition_id": definition_id, 1991 "expected_name": name, 1992 "actual_name": actual_name, 1993 }, 1994 ) 1995 1996 definition.permanently_delete( 1997 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1998 ) 1999 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 2000 2001 2002@mcp_tool( 2003 destructive=True, 2004 open_world=True, 2005 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2006) 2007def permanently_delete_cloud_source( 2008 ctx: Context, 2009 source_id: Annotated[ 2010 str, 2011 Field(description="The ID of the deployed source to delete."), 2012 ], 2013 name: Annotated[ 2014 str, 2015 Field(description="The expected name of the source (for verification)."), 2016 ], 2017) -> str: 2018 """Permanently delete a deployed source connector from Airbyte Cloud. 2019 2020 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 2021 (case insensitive). 2022 2023 If the source does not meet this requirement, the deletion will be rejected with a 2024 helpful error message. Instruct the user to rename the source appropriately to authorize 2025 the deletion. 2026 2027 The provided name must match the actual name of the source for the operation to proceed. 2028 This is a safety measure to ensure you are deleting the correct resource. 2029 """ 2030 check_guid_created_in_session(source_id) 2031 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2032 source = workspace.get_source(source_id=source_id) 2033 actual_name: str = cast(str, source.name) 2034 2035 # Verify the name matches 2036 if actual_name != name: 2037 raise PyAirbyteInputError( 2038 message=( 2039 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2040 "The provided name must exactly match the source's actual name. " 2041 "This is a safety measure to prevent accidental deletion." 2042 ), 2043 context={ 2044 "source_id": source_id, 2045 "expected_name": name, 2046 "actual_name": actual_name, 2047 }, 2048 ) 2049 2050 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2051 workspace.permanently_delete_source( 2052 source=source_id, 2053 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 2054 ) 2055 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 2056 2057 2058@mcp_tool( 2059 destructive=True, 2060 open_world=True, 2061 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2062) 2063def permanently_delete_cloud_destination( 2064 ctx: Context, 2065 destination_id: Annotated[ 2066 str, 2067 Field(description="The ID of the deployed destination to delete."), 2068 ], 2069 name: Annotated[ 2070 str, 2071 Field(description="The expected name of the destination (for verification)."), 2072 ], 2073) -> str: 2074 """Permanently delete a deployed destination connector from Airbyte Cloud. 2075 2076 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 2077 (case insensitive). 2078 2079 If the destination does not meet this requirement, the deletion will be rejected with a 2080 helpful error message. Instruct the user to rename the destination appropriately to authorize 2081 the deletion. 2082 2083 The provided name must match the actual name of the destination for the operation to proceed. 2084 This is a safety measure to ensure you are deleting the correct resource. 2085 """ 2086 check_guid_created_in_session(destination_id) 2087 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2088 destination = workspace.get_destination(destination_id=destination_id) 2089 actual_name: str = cast(str, destination.name) 2090 2091 # Verify the name matches 2092 if actual_name != name: 2093 raise PyAirbyteInputError( 2094 message=( 2095 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2096 "The provided name must exactly match the destination's actual name. " 2097 "This is a safety measure to prevent accidental deletion." 2098 ), 2099 context={ 2100 "destination_id": destination_id, 2101 "expected_name": name, 2102 "actual_name": actual_name, 2103 }, 2104 ) 2105 2106 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2107 workspace.permanently_delete_destination( 2108 destination=destination_id, 2109 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2110 ) 2111 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 2112 2113 2114@mcp_tool( 2115 destructive=True, 2116 open_world=True, 2117 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2118) 2119def permanently_delete_cloud_connection( 2120 ctx: Context, 2121 connection_id: Annotated[ 2122 str, 2123 Field(description="The ID of the connection to delete."), 2124 ], 2125 name: Annotated[ 2126 str, 2127 Field(description="The expected name of the connection (for verification)."), 2128 ], 2129 *, 2130 cascade_delete_source: Annotated[ 2131 bool, 2132 Field( 2133 description=( 2134 "Whether to also delete the source connector associated with this connection." 2135 ), 2136 default=False, 2137 ), 2138 ] = False, 2139 cascade_delete_destination: Annotated[ 2140 bool, 2141 Field( 2142 description=( 2143 "Whether to also delete the destination connector associated with this connection." 2144 ), 2145 default=False, 2146 ), 2147 ] = False, 2148) -> str: 2149 """Permanently delete a connection from Airbyte Cloud. 2150 2151 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2152 (case insensitive). 2153 2154 If the connection does not meet this requirement, the deletion will be rejected with a 2155 helpful error message. Instruct the user to rename the connection appropriately to authorize 2156 the deletion. 2157 2158 The provided name must match the actual name of the connection for the operation to proceed. 2159 This is a safety measure to ensure you are deleting the correct resource. 2160 """ 2161 check_guid_created_in_session(connection_id) 2162 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2163 connection = workspace.get_connection(connection_id=connection_id) 2164 actual_name: str = cast(str, connection.name) 2165 2166 # Verify the name matches 2167 if actual_name != name: 2168 raise PyAirbyteInputError( 2169 message=( 2170 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2171 "The provided name must exactly match the connection's actual name. " 2172 "This is a safety measure to prevent accidental deletion." 2173 ), 2174 context={ 2175 "connection_id": connection_id, 2176 "expected_name": name, 2177 "actual_name": actual_name, 2178 }, 2179 ) 2180 2181 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2182 workspace.permanently_delete_connection( 2183 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2184 connection=connection_id, 2185 cascade_delete_source=cascade_delete_source, 2186 cascade_delete_destination=cascade_delete_destination, 2187 ) 2188 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 2189 2190 2191@mcp_tool( 2192 open_world=True, 2193 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2194) 2195def rename_cloud_source( 2196 ctx: Context, 2197 source_id: Annotated[ 2198 str, 2199 Field(description="The ID of the deployed source to rename."), 2200 ], 2201 name: Annotated[ 2202 str, 2203 Field(description="New name for the source."), 2204 ], 2205 *, 2206 workspace_id: Annotated[ 2207 str | None, 2208 Field( 2209 description=WORKSPACE_ID_TIP_TEXT, 2210 default=None, 2211 ), 2212 ], 2213) -> str: 2214 """Rename a deployed source connector on Airbyte Cloud.""" 2215 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2216 source = workspace.get_source(source_id=source_id) 2217 source.rename(name=name) 2218 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 2219 2220 2221@mcp_tool( 2222 destructive=True, 2223 open_world=True, 2224 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2225) 2226def update_cloud_source_config( 2227 ctx: Context, 2228 source_id: Annotated[ 2229 str, 2230 Field(description="The ID of the deployed source to update."), 2231 ], 2232 config: Annotated[ 2233 dict | str, 2234 Field( 2235 description="New configuration for the source connector.", 2236 ), 2237 ], 2238 config_secret_name: Annotated[ 2239 str | None, 2240 Field( 2241 description="The name of the secret containing the configuration.", 2242 default=None, 2243 ), 2244 ] = None, 2245 *, 2246 workspace_id: Annotated[ 2247 str | None, 2248 Field( 2249 description=WORKSPACE_ID_TIP_TEXT, 2250 default=None, 2251 ), 2252 ], 2253) -> str: 2254 """Update a deployed source connector's configuration on Airbyte Cloud. 2255 2256 This is a destructive operation that can break existing connections if the 2257 configuration is changed incorrectly. Use with caution. 2258 """ 2259 check_guid_created_in_session(source_id) 2260 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2261 source = workspace.get_source(source_id=source_id) 2262 2263 config_dict = resolve_connector_config( 2264 config=config, 2265 config_secret_name=config_secret_name, 2266 config_spec_jsonschema=None, # We don't have the spec here 2267 ) 2268 2269 source.update_config(config=config_dict) 2270 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 2271 2272 2273@mcp_tool( 2274 open_world=True, 2275 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2276) 2277def rename_cloud_destination( 2278 ctx: Context, 2279 destination_id: Annotated[ 2280 str, 2281 Field(description="The ID of the deployed destination to rename."), 2282 ], 2283 name: Annotated[ 2284 str, 2285 Field(description="New name for the destination."), 2286 ], 2287 *, 2288 workspace_id: Annotated[ 2289 str | None, 2290 Field( 2291 description=WORKSPACE_ID_TIP_TEXT, 2292 default=None, 2293 ), 2294 ], 2295) -> str: 2296 """Rename a deployed destination connector on Airbyte Cloud.""" 2297 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2298 destination = workspace.get_destination(destination_id=destination_id) 2299 destination.rename(name=name) 2300 return ( 2301 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2302 f"URL: {destination.connector_url}" 2303 ) 2304 2305 2306@mcp_tool( 2307 destructive=True, 2308 open_world=True, 2309 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2310) 2311def update_cloud_destination_config( 2312 ctx: Context, 2313 destination_id: Annotated[ 2314 str, 2315 Field(description="The ID of the deployed destination to update."), 2316 ], 2317 config: Annotated[ 2318 dict | str, 2319 Field( 2320 description="New configuration for the destination connector.", 2321 ), 2322 ], 2323 config_secret_name: Annotated[ 2324 str | None, 2325 Field( 2326 description="The name of the secret containing the configuration.", 2327 default=None, 2328 ), 2329 ], 2330 *, 2331 workspace_id: Annotated[ 2332 str | None, 2333 Field( 2334 description=WORKSPACE_ID_TIP_TEXT, 2335 default=None, 2336 ), 2337 ], 2338) -> str: 2339 """Update a deployed destination connector's configuration on Airbyte Cloud. 2340 2341 This is a destructive operation that can break existing connections if the 2342 configuration is changed incorrectly. Use with caution. 2343 """ 2344 check_guid_created_in_session(destination_id) 2345 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2346 destination = workspace.get_destination(destination_id=destination_id) 2347 2348 config_dict = resolve_connector_config( 2349 config=config, 2350 config_secret_name=config_secret_name, 2351 config_spec_jsonschema=None, # We don't have the spec here 2352 ) 2353 2354 destination.update_config(config=config_dict) 2355 return ( 2356 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2357 ) 2358 2359 2360@mcp_tool( 2361 open_world=True, 2362 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2363) 2364def rename_cloud_connection( 2365 ctx: Context, 2366 connection_id: Annotated[ 2367 str, 2368 Field(description="The ID of the connection to rename."), 2369 ], 2370 name: Annotated[ 2371 str, 2372 Field(description="New name for the connection."), 2373 ], 2374 *, 2375 workspace_id: Annotated[ 2376 str | None, 2377 Field( 2378 description=WORKSPACE_ID_TIP_TEXT, 2379 default=None, 2380 ), 2381 ], 2382) -> str: 2383 """Rename a connection on Airbyte Cloud.""" 2384 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2385 connection = workspace.get_connection(connection_id=connection_id) 2386 connection.rename(name=name) 2387 return ( 2388 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2389 f"URL: {connection.connection_url}" 2390 ) 2391 2392 2393@mcp_tool( 2394 destructive=True, 2395 open_world=True, 2396 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2397) 2398def set_cloud_connection_table_prefix( 2399 ctx: Context, 2400 connection_id: Annotated[ 2401 str, 2402 Field(description="The ID of the connection to update."), 2403 ], 2404 prefix: Annotated[ 2405 str, 2406 Field(description="New table prefix to use when syncing to the destination."), 2407 ], 2408 *, 2409 workspace_id: Annotated[ 2410 str | None, 2411 Field( 2412 description=WORKSPACE_ID_TIP_TEXT, 2413 default=None, 2414 ), 2415 ], 2416) -> str: 2417 """Set the table prefix for a connection on Airbyte Cloud. 2418 2419 This is a destructive operation that can break downstream dependencies if the 2420 table prefix is changed incorrectly. Use with caution. 2421 """ 2422 check_guid_created_in_session(connection_id) 2423 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2424 connection = workspace.get_connection(connection_id=connection_id) 2425 connection.set_table_prefix(prefix=prefix) 2426 return ( 2427 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2428 f"URL: {connection.connection_url}" 2429 ) 2430 2431 2432@mcp_tool( 2433 destructive=True, 2434 open_world=True, 2435 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2436) 2437def set_cloud_connection_selected_streams( 2438 ctx: Context, 2439 connection_id: Annotated[ 2440 str, 2441 Field(description="The ID of the connection to update."), 2442 ], 2443 stream_names: Annotated[ 2444 str | list[str], 2445 Field( 2446 description=( 2447 "The selected stream names to sync within the connection. " 2448 "Must be an explicit stream name or list of streams." 2449 ) 2450 ), 2451 ], 2452 *, 2453 workspace_id: Annotated[ 2454 str | None, 2455 Field( 2456 description=WORKSPACE_ID_TIP_TEXT, 2457 default=None, 2458 ), 2459 ], 2460) -> str: 2461 """Set the selected streams for a connection on Airbyte Cloud. 2462 2463 This is a destructive operation that can break existing connections if the 2464 stream selection is changed incorrectly. Use with caution. 2465 """ 2466 check_guid_created_in_session(connection_id) 2467 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2468 connection = workspace.get_connection(connection_id=connection_id) 2469 2470 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2471 connection.set_selected_streams(stream_names=resolved_streams_list) 2472 2473 return ( 2474 f"Successfully set selected streams for connection '{connection_id}' " 2475 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2476 ) 2477 2478 2479@mcp_tool( 2480 open_world=True, 2481 destructive=True, 2482 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2483) 2484def update_cloud_connection( 2485 ctx: Context, 2486 connection_id: Annotated[ 2487 str, 2488 Field(description="The ID of the connection to update."), 2489 ], 2490 *, 2491 enabled: Annotated[ 2492 bool | None, 2493 Field( 2494 description=( 2495 "Set the connection's enabled status. " 2496 "True enables the connection (status='active'), " 2497 "False disables it (status='inactive'). " 2498 "Leave unset to keep the current status." 2499 ), 2500 default=None, 2501 ), 2502 ], 2503 cron_expression: Annotated[ 2504 str | None, 2505 Field( 2506 description=( 2507 "A cron expression defining when syncs should run. " 2508 "Examples: '0 0 * * *' (daily at midnight UTC), " 2509 "'0 */6 * * *' (every 6 hours), " 2510 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2511 "Leave unset to keep the current schedule. " 2512 "Cannot be used together with 'manual_schedule'." 2513 ), 2514 default=None, 2515 ), 2516 ], 2517 manual_schedule: Annotated[ 2518 bool | None, 2519 Field( 2520 description=( 2521 "Set to True to disable automatic syncs (manual scheduling only). " 2522 "Syncs will only run when manually triggered. " 2523 "Cannot be used together with 'cron_expression'." 2524 ), 2525 default=None, 2526 ), 2527 ], 2528 workspace_id: Annotated[ 2529 str | None, 2530 Field( 2531 description=WORKSPACE_ID_TIP_TEXT, 2532 default=None, 2533 ), 2534 ], 2535) -> str: 2536 """Update a connection's settings on Airbyte Cloud. 2537 2538 This tool allows updating multiple connection settings in a single call: 2539 - Enable or disable the connection 2540 - Set a cron schedule for automatic syncs 2541 - Switch to manual scheduling (no automatic syncs) 2542 2543 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2544 parameters are mutually exclusive. 2545 """ 2546 check_guid_created_in_session(connection_id) 2547 2548 # Validate that at least one setting is provided 2549 if enabled is None and cron_expression is None and manual_schedule is None: 2550 raise ValueError( 2551 "At least one setting must be provided: 'enabled', 'cron_expression', " 2552 "or 'manual_schedule'." 2553 ) 2554 2555 # Validate mutually exclusive schedule options 2556 if cron_expression is not None and manual_schedule is True: 2557 raise ValueError( 2558 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2559 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2560 "for manual-only syncs." 2561 ) 2562 2563 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2564 connection = workspace.get_connection(connection_id=connection_id) 2565 2566 changes_made: list[str] = [] 2567 2568 # Apply enabled status change 2569 if enabled is not None: 2570 connection.set_enabled(enabled=enabled) 2571 status_str = "enabled" if enabled else "disabled" 2572 changes_made.append(f"status set to '{status_str}'") 2573 2574 # Apply schedule change 2575 if cron_expression is not None: 2576 connection.set_schedule(cron_expression=cron_expression) 2577 changes_made.append(f"schedule set to '{cron_expression}'") 2578 elif manual_schedule is True: 2579 connection.set_manual_schedule() 2580 changes_made.append("schedule set to 'manual'") 2581 2582 changes_summary = ", ".join(changes_made) 2583 return ( 2584 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2585 f"URL: {connection.connection_url}" 2586 ) 2587 2588 2589@mcp_tool( 2590 read_only=True, 2591 idempotent=True, 2592 open_world=True, 2593 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2594) 2595def get_connection_artifact( 2596 ctx: Context, 2597 connection_id: Annotated[ 2598 str, 2599 Field(description="The ID of the Airbyte Cloud connection."), 2600 ], 2601 artifact_type: Annotated[ 2602 Literal["state", "catalog"], 2603 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2604 ], 2605 *, 2606 workspace_id: Annotated[ 2607 str | None, 2608 Field( 2609 description=WORKSPACE_ID_TIP_TEXT, 2610 default=None, 2611 ), 2612 ], 2613) -> dict[str, Any]: 2614 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2615 2616 Retrieves the specified artifact for a connection: 2617 - 'state': Returns the full raw connection state including stateType and all 2618 state data, or {"ERROR": "..."} if no state is set. 2619 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2620 or {"ERROR": "..."} if not found. 2621 """ 2622 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2623 connection = workspace.get_connection(connection_id=connection_id) 2624 2625 if artifact_type == "state": 2626 result = connection.dump_raw_state() 2627 if result.get("stateType") == "not_set": 2628 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2629 return result 2630 2631 # artifact_type == "catalog" 2632 result = connection.dump_raw_catalog() 2633 if result is None: 2634 return {"ERROR": "No catalog found for this connection"} 2635 return result 2636 2637 2638def _add_defaults_for_exclude_args( 2639 exclude_args: list[str], 2640) -> None: 2641 """Patch registered tool functions to add Python-level defaults for excluded args. 2642 2643 FastMCP requires that excluded args have Python-level default values, but MCP tool 2644 functions should only use Field(default=...) in their Annotated type hints (not 2645 Python-level `= None`). This function bridges the gap by dynamically adding Python 2646 defaults to the function signatures at registration time, so the source code stays 2647 clean while satisfying FastMCP's requirement. 2648 2649 Args: 2650 exclude_args: List of argument names that will be excluded from the tool schema. 2651 """ 2652 import inspect # noqa: PLC0415 # Local import for optional patching logic 2653 2654 from fastmcp_extensions.decorators import ( # noqa: PLC0415 2655 _REGISTERED_TOOLS, # noqa: PLC2701 2656 ) 2657 2658 for func, _annotations in _REGISTERED_TOOLS: 2659 sig = inspect.signature(func) 2660 needs_patch = any( 2661 arg_name in sig.parameters 2662 and sig.parameters[arg_name].default is inspect.Parameter.empty 2663 for arg_name in exclude_args 2664 ) 2665 if needs_patch: 2666 new_params = [ 2667 p.replace(default=None) 2668 if name in exclude_args and p.default is inspect.Parameter.empty 2669 else p 2670 for name, p in sig.parameters.items() 2671 ] 2672 func.__signature__ = sig.replace(parameters=new_params) # type: ignore[attr-defined] 2673 2674 2675def register_cloud_tools(app: FastMCP) -> None: 2676 """Register cloud tools with the FastMCP app. 2677 2678 Args: 2679 app: FastMCP application instance 2680 """ 2681 exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None 2682 if exclude_args: 2683 _add_defaults_for_exclude_args(exclude_args) 2684 register_mcp_tools( 2685 app, 2686 mcp_module=__name__, 2687 exclude_args=exclude_args, 2688 )
44class CloudSourceResult(BaseModel): 45 """Information about a deployed source connector in Airbyte Cloud.""" 46 47 id: str 48 """The source ID.""" 49 name: str 50 """Display name of the source.""" 51 url: str 52 """Web URL for managing this source in Airbyte Cloud."""
Information about a deployed source connector in Airbyte Cloud.
55class CloudDestinationResult(BaseModel): 56 """Information about a deployed destination connector in Airbyte Cloud.""" 57 58 id: str 59 """The destination ID.""" 60 name: str 61 """Display name of the destination.""" 62 url: str 63 """Web URL for managing this destination in Airbyte Cloud."""
Information about a deployed destination connector in Airbyte Cloud.
66class CloudConnectionResult(BaseModel): 67 """Information about a deployed connection in Airbyte Cloud.""" 68 69 id: str 70 """The connection ID.""" 71 name: str 72 """Display name of the connection.""" 73 url: str 74 """Web URL for managing this connection in Airbyte Cloud.""" 75 source_id: str 76 """ID of the source used by this connection.""" 77 destination_id: str 78 """ID of the destination used by this connection.""" 79 last_job_status: str | None = None 80 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 81 Only populated when with_connection_status=True.""" 82 last_job_id: int | None = None 83 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 84 last_job_time: str | None = None 85 """ISO 8601 timestamp of the most recent completed sync. 86 Only populated when with_connection_status=True.""" 87 currently_running_job_id: int | None = None 88 """Job ID of a currently running sync, if any. 89 Only populated when with_connection_status=True.""" 90 currently_running_job_start_time: str | None = None 91 """ISO 8601 timestamp of when the currently running sync started. 92 Only populated when with_connection_status=True."""
Information about a deployed connection in Airbyte Cloud.
Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.
Job ID of the most recent completed sync. Only populated when with_connection_status=True.
ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.
95class CloudSourceDetails(BaseModel): 96 """Detailed information about a deployed source connector in Airbyte Cloud.""" 97 98 source_id: str 99 """The source ID.""" 100 source_name: str 101 """Display name of the source.""" 102 source_url: str 103 """Web URL for managing this source in Airbyte Cloud.""" 104 connector_definition_id: str 105 """The connector definition ID (e.g., the ID for 'source-postgres')."""
Detailed information about a deployed source connector in Airbyte Cloud.
108class CloudDestinationDetails(BaseModel): 109 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 110 111 destination_id: str 112 """The destination ID.""" 113 destination_name: str 114 """Display name of the destination.""" 115 destination_url: str 116 """Web URL for managing this destination in Airbyte Cloud.""" 117 connector_definition_id: str 118 """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
Detailed information about a deployed destination connector in Airbyte Cloud.
121class CloudConnectionDetails(BaseModel): 122 """Detailed information about a deployed connection in Airbyte Cloud.""" 123 124 connection_id: str 125 """The connection ID.""" 126 connection_name: str 127 """Display name of the connection.""" 128 connection_url: str 129 """Web URL for managing this connection in Airbyte Cloud.""" 130 source_id: str 131 """ID of the source used by this connection.""" 132 source_name: str 133 """Display name of the source.""" 134 destination_id: str 135 """ID of the destination used by this connection.""" 136 destination_name: str 137 """Display name of the destination.""" 138 selected_streams: list[str] 139 """List of stream names selected for syncing.""" 140 table_prefix: str | None 141 """Table prefix applied when syncing to the destination."""
Detailed information about a deployed connection in Airbyte Cloud.
144class CloudOrganizationResult(BaseModel): 145 """Information about an organization in Airbyte Cloud.""" 146 147 id: str 148 """The organization ID.""" 149 name: str 150 """Display name of the organization.""" 151 email: str 152 """Email associated with the organization.""" 153 payment_status: str | None = None 154 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 155 When 'disabled', syncs are blocked due to unpaid invoices.""" 156 subscription_status: str | None = None 157 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 158 'unsubscribed').""" 159 is_account_locked: bool = False 160 """Whether the account is locked due to billing issues. 161 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 162 Defaults to False unless we have affirmative evidence of a locked state."""
Information about an organization in Airbyte Cloud.
Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices.
165class CloudWorkspaceResult(BaseModel): 166 """Information about a workspace in Airbyte Cloud.""" 167 168 workspace_id: str 169 """The workspace ID.""" 170 workspace_name: str 171 """Display name of the workspace.""" 172 workspace_url: str | None = None 173 """URL to access the workspace in Airbyte Cloud.""" 174 organization_id: str 175 """ID of the organization (requires ORGANIZATION_READER permission).""" 176 organization_name: str | None = None 177 """Name of the organization (requires ORGANIZATION_READER permission).""" 178 payment_status: str | None = None 179 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 180 When 'disabled', syncs are blocked due to unpaid invoices. 181 Requires ORGANIZATION_READER permission.""" 182 subscription_status: str | None = None 183 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 184 'unsubscribed'). Requires ORGANIZATION_READER permission.""" 185 is_account_locked: bool = False 186 """Whether the account is locked due to billing issues. 187 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 188 Defaults to False unless we have affirmative evidence of a locked state. 189 Requires ORGANIZATION_READER permission."""
Information about a workspace in Airbyte Cloud.
ID of the organization (requires ORGANIZATION_READER permission).
Name of the organization (requires ORGANIZATION_READER permission).
Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices. Requires ORGANIZATION_READER permission.
192class LogReadResult(BaseModel): 193 """Result of reading sync logs with pagination support.""" 194 195 job_id: int 196 """The job ID the logs belong to.""" 197 attempt_number: int 198 """The attempt number the logs belong to.""" 199 log_text: str 200 """The string containing the log text we are returning.""" 201 log_text_start_line: int 202 """1-based line index of the first line returned.""" 203 log_text_line_count: int 204 """Count of lines we are returning.""" 205 total_log_lines_available: int 206 """Total number of log lines available, shows if any lines were missed due to the limit."""
Result of reading sync logs with pagination support.
209class SyncJobResult(BaseModel): 210 """Information about a sync job.""" 211 212 job_id: int 213 """The job ID.""" 214 status: str 215 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 216 bytes_synced: int 217 """Number of bytes synced in this job.""" 218 records_synced: int 219 """Number of records synced in this job.""" 220 start_time: str 221 """ISO 8601 timestamp of when the job started.""" 222 job_url: str 223 """URL to view the job in Airbyte Cloud."""
Information about a sync job.
226class SyncJobListResult(BaseModel): 227 """Result of listing sync jobs with pagination support.""" 228 229 jobs: list[SyncJobResult] 230 """List of sync jobs.""" 231 jobs_count: int 232 """Number of jobs returned in this response.""" 233 jobs_offset: int 234 """Offset used for this request (0 if not specified).""" 235 from_tail: bool 236 """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
Result of listing sync jobs with pagination support.
274@mcp_tool( 275 open_world=True, 276 extra_help_text=CLOUD_AUTH_TIP_TEXT, 277) 278def deploy_source_to_cloud( 279 ctx: Context, 280 source_name: Annotated[ 281 str, 282 Field(description="The name to use when deploying the source."), 283 ], 284 source_connector_name: Annotated[ 285 str, 286 Field(description="The name of the source connector (e.g., 'source-faker')."), 287 ], 288 *, 289 workspace_id: Annotated[ 290 str | None, 291 Field( 292 description=WORKSPACE_ID_TIP_TEXT, 293 default=None, 294 ), 295 ], 296 config: Annotated[ 297 dict | str | None, 298 Field( 299 description="The configuration for the source connector.", 300 default=None, 301 ), 302 ], 303 config_secret_name: Annotated[ 304 str | None, 305 Field( 306 description="The name of the secret containing the configuration.", 307 default=None, 308 ), 309 ], 310 unique: Annotated[ 311 bool, 312 Field( 313 description="Whether to require a unique name.", 314 default=True, 315 ), 316 ], 317) -> str: 318 """Deploy a source connector to Airbyte Cloud.""" 319 source = get_source( 320 source_connector_name, 321 no_executor=True, 322 ) 323 config_dict = resolve_connector_config( 324 config=config, 325 config_secret_name=config_secret_name, 326 config_spec_jsonschema=source.config_spec, 327 ) 328 source.set_config(config_dict, validate=True) 329 330 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 331 deployed_source = workspace.deploy_source( 332 name=source_name, 333 source=source, 334 unique=unique, 335 ) 336 337 register_guid_created_in_session(deployed_source.connector_id) 338 return ( 339 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 340 f" and URL: {deployed_source.connector_url}" 341 )
Deploy a source connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
344@mcp_tool( 345 open_world=True, 346 extra_help_text=CLOUD_AUTH_TIP_TEXT, 347) 348def deploy_destination_to_cloud( 349 ctx: Context, 350 destination_name: Annotated[ 351 str, 352 Field(description="The name to use when deploying the destination."), 353 ], 354 destination_connector_name: Annotated[ 355 str, 356 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 357 ], 358 *, 359 workspace_id: Annotated[ 360 str | None, 361 Field( 362 description=WORKSPACE_ID_TIP_TEXT, 363 default=None, 364 ), 365 ], 366 config: Annotated[ 367 dict | str | None, 368 Field( 369 description="The configuration for the destination connector.", 370 default=None, 371 ), 372 ], 373 config_secret_name: Annotated[ 374 str | None, 375 Field( 376 description="The name of the secret containing the configuration.", 377 default=None, 378 ), 379 ], 380 unique: Annotated[ 381 bool, 382 Field( 383 description="Whether to require a unique name.", 384 default=True, 385 ), 386 ], 387) -> str: 388 """Deploy a destination connector to Airbyte Cloud.""" 389 destination = get_destination( 390 destination_connector_name, 391 no_executor=True, 392 ) 393 config_dict = resolve_connector_config( 394 config=config, 395 config_secret_name=config_secret_name, 396 config_spec_jsonschema=destination.config_spec, 397 ) 398 destination.set_config(config_dict, validate=True) 399 400 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 401 deployed_destination = workspace.deploy_destination( 402 name=destination_name, 403 destination=destination, 404 unique=unique, 405 ) 406 407 register_guid_created_in_session(deployed_destination.connector_id) 408 return ( 409 f"Successfully deployed destination '{destination_name}' " 410 f"with ID: {deployed_destination.connector_id}" 411 )
Deploy a destination connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
414@mcp_tool( 415 open_world=True, 416 extra_help_text=CLOUD_AUTH_TIP_TEXT, 417) 418def create_connection_on_cloud( 419 ctx: Context, 420 connection_name: Annotated[ 421 str, 422 Field(description="The name of the connection."), 423 ], 424 source_id: Annotated[ 425 str, 426 Field(description="The ID of the deployed source."), 427 ], 428 destination_id: Annotated[ 429 str, 430 Field(description="The ID of the deployed destination."), 431 ], 432 selected_streams: Annotated[ 433 str | list[str], 434 Field( 435 description=( 436 "The selected stream names to sync within the connection. " 437 "Must be an explicit stream name or list of streams. " 438 "Cannot be empty or '*'." 439 ) 440 ), 441 ], 442 *, 443 workspace_id: Annotated[ 444 str | None, 445 Field( 446 description=WORKSPACE_ID_TIP_TEXT, 447 default=None, 448 ), 449 ], 450 table_prefix: Annotated[ 451 str | None, 452 Field( 453 description="Optional table prefix to use when syncing to the destination.", 454 default=None, 455 ), 456 ], 457) -> str: 458 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 459 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 460 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 461 deployed_connection = workspace.deploy_connection( 462 connection_name=connection_name, 463 source=source_id, 464 destination=destination_id, 465 selected_streams=resolved_streams_list, 466 table_prefix=table_prefix, 467 ) 468 469 register_guid_created_in_session(deployed_connection.connection_id) 470 return ( 471 f"Successfully created connection '{connection_name}' " 472 f"with ID '{deployed_connection.connection_id}' and " 473 f"URL: {deployed_connection.connection_url}" 474 )
Create a connection between a deployed source and destination on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
477@mcp_tool( 478 open_world=True, 479 extra_help_text=CLOUD_AUTH_TIP_TEXT, 480) 481def run_cloud_sync( 482 ctx: Context, 483 connection_id: Annotated[ 484 str, 485 Field(description="The ID of the Airbyte Cloud connection."), 486 ], 487 *, 488 workspace_id: Annotated[ 489 str | None, 490 Field( 491 description=WORKSPACE_ID_TIP_TEXT, 492 default=None, 493 ), 494 ], 495 wait: Annotated[ 496 bool, 497 Field( 498 description=( 499 "Whether to wait for the sync to complete. Since a sync can take between several " 500 "minutes and several hours, this option is not recommended for most " 501 "scenarios." 502 ), 503 default=False, 504 ), 505 ], 506 wait_timeout: Annotated[ 507 int, 508 Field( 509 description="Maximum time to wait for sync completion (seconds).", 510 default=300, 511 ), 512 ], 513) -> str: 514 """Run a sync job on Airbyte Cloud.""" 515 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 516 connection = workspace.get_connection(connection_id=connection_id) 517 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 518 519 if wait: 520 status = sync_result.get_job_status() 521 return ( 522 f"Sync completed with status: {status}. " 523 f"Job ID is '{sync_result.job_id}' and " 524 f"job URL is: {sync_result.job_url}" 525 ) 526 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
Run a sync job on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
529@mcp_tool( 530 read_only=True, 531 idempotent=True, 532 open_world=True, 533 extra_help_text=CLOUD_AUTH_TIP_TEXT, 534) 535def check_airbyte_cloud_workspace( 536 ctx: Context, 537 *, 538 workspace_id: Annotated[ 539 str | None, 540 Field( 541 description=WORKSPACE_ID_TIP_TEXT, 542 default=None, 543 ), 544 ], 545) -> CloudWorkspaceResult: 546 """Check if we have a valid Airbyte Cloud connection and return workspace info. 547 548 Returns workspace details including workspace ID, name, organization info, and billing status. 549 """ 550 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 551 552 # Get workspace details from the public API using workspace's credentials 553 workspace_response = api_util.get_workspace( 554 workspace_id=workspace.workspace_id, 555 api_root=workspace.api_root, 556 client_id=workspace.client_id, 557 client_secret=workspace.client_secret, 558 bearer_token=workspace.bearer_token, 559 ) 560 561 # Try to get organization info (including billing), but fail gracefully if we don't have 562 # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the 563 # organization, which may not be available with workspace-scoped credentials. 564 organization = workspace.get_organization(raise_on_error=False) 565 566 return CloudWorkspaceResult( 567 workspace_id=workspace_response.workspace_id, 568 workspace_name=workspace_response.name, 569 workspace_url=workspace.workspace_url, 570 organization_id=( 571 organization.organization_id 572 if organization 573 else "[unavailable - requires ORGANIZATION_READER permission]" 574 ), 575 organization_name=organization.organization_name if organization else None, 576 payment_status=organization.payment_status if organization else None, 577 subscription_status=organization.subscription_status if organization else None, 578 is_account_locked=organization.is_account_locked if organization else False, 579 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace details including workspace ID, name, organization info, and billing status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
582@mcp_tool( 583 open_world=True, 584 extra_help_text=CLOUD_AUTH_TIP_TEXT, 585) 586def deploy_noop_destination_to_cloud( 587 ctx: Context, 588 name: str = "No-op Destination", 589 *, 590 workspace_id: Annotated[ 591 str | None, 592 Field( 593 description=WORKSPACE_ID_TIP_TEXT, 594 default=None, 595 ), 596 ], 597 unique: bool = True, 598) -> str: 599 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 600 destination = get_noop_destination() 601 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 602 deployed_destination = workspace.deploy_destination( 603 name=name, 604 destination=destination, 605 unique=unique, 606 ) 607 register_guid_created_in_session(deployed_destination.connector_id) 608 return ( 609 f"Successfully deployed No-op Destination " 610 f"with ID '{deployed_destination.connector_id}' and " 611 f"URL: {deployed_destination.connector_url}" 612 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
615@mcp_tool( 616 read_only=True, 617 idempotent=True, 618 open_world=True, 619 extra_help_text=CLOUD_AUTH_TIP_TEXT, 620) 621def get_cloud_sync_status( 622 ctx: Context, 623 connection_id: Annotated[ 624 str, 625 Field( 626 description="The ID of the Airbyte Cloud connection.", 627 ), 628 ], 629 job_id: Annotated[ 630 int | None, 631 Field( 632 description="Optional job ID. If not provided, the latest job will be used.", 633 default=None, 634 ), 635 ], 636 *, 637 workspace_id: Annotated[ 638 str | None, 639 Field( 640 description=WORKSPACE_ID_TIP_TEXT, 641 default=None, 642 ), 643 ], 644 include_attempts: Annotated[ 645 bool, 646 Field( 647 description="Whether to include detailed attempts information.", 648 default=False, 649 ), 650 ], 651) -> dict[str, Any]: 652 """Get the status of a sync job from the Airbyte Cloud.""" 653 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 654 connection = workspace.get_connection(connection_id=connection_id) 655 656 # If a job ID is provided, get the job by ID. 657 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 658 659 if not sync_result: 660 return {"status": None, "job_id": None, "attempts": []} 661 662 result = { 663 "status": sync_result.get_job_status(), 664 "job_id": sync_result.job_id, 665 "bytes_synced": sync_result.bytes_synced, 666 "records_synced": sync_result.records_synced, 667 "start_time": sync_result.start_time.isoformat(), 668 "job_url": sync_result.job_url, 669 "attempts": [], 670 } 671 672 if include_attempts: 673 attempts = sync_result.get_attempts() 674 result["attempts"] = [ 675 { 676 "attempt_number": attempt.attempt_number, 677 "attempt_id": attempt.attempt_id, 678 "status": attempt.status, 679 "bytes_synced": attempt.bytes_synced, 680 "records_synced": attempt.records_synced, 681 "created_at": attempt.created_at.isoformat(), 682 } 683 for attempt in attempts 684 ] 685 686 return result
Get the status of a sync job from the Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
689@mcp_tool( 690 read_only=True, 691 idempotent=True, 692 open_world=True, 693 extra_help_text=CLOUD_AUTH_TIP_TEXT, 694) 695def list_cloud_sync_jobs( 696 ctx: Context, 697 connection_id: Annotated[ 698 str, 699 Field(description="The ID of the Airbyte Cloud connection."), 700 ], 701 *, 702 workspace_id: Annotated[ 703 str | None, 704 Field( 705 description=WORKSPACE_ID_TIP_TEXT, 706 default=None, 707 ), 708 ], 709 max_jobs: Annotated[ 710 int, 711 Field( 712 description=( 713 "Maximum number of jobs to return. " 714 "Defaults to 20 if not specified. " 715 "Maximum allowed value is 500." 716 ), 717 default=20, 718 ), 719 ], 720 from_tail: Annotated[ 721 bool | None, 722 Field( 723 description=( 724 "When True, jobs are ordered newest-first (createdAt DESC). " 725 "When False, jobs are ordered oldest-first (createdAt ASC). " 726 "Defaults to True if `jobs_offset` is not specified. " 727 "Cannot combine `from_tail=True` with `jobs_offset`." 728 ), 729 default=None, 730 ), 731 ], 732 jobs_offset: Annotated[ 733 int | None, 734 Field( 735 description=( 736 "Number of jobs to skip from the beginning. " 737 "Cannot be combined with `from_tail=True`." 738 ), 739 default=None, 740 ), 741 ], 742 job_type: Annotated[ 743 JobTypeEnum | None, 744 Field( 745 description=( 746 "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. " 747 "If not specified, defaults to sync and reset jobs only (API default). " 748 "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs." 749 ), 750 default=None, 751 ), 752 ], 753) -> SyncJobListResult: 754 """List sync jobs for a connection with pagination support. 755 756 This tool allows you to retrieve a list of sync jobs for a connection, 757 with control over ordering and pagination. By default, jobs are returned 758 newest-first (from_tail=True). 759 """ 760 # Validate that jobs_offset and from_tail are not both set 761 if jobs_offset is not None and from_tail is True: 762 raise PyAirbyteInputError( 763 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 764 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 765 ) 766 767 # Default to from_tail=True if neither is specified 768 if from_tail is None and jobs_offset is None: 769 from_tail = True 770 elif from_tail is None: 771 from_tail = False 772 773 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 774 connection = workspace.get_connection(connection_id=connection_id) 775 776 # Cap at 500 to avoid overloading agent context 777 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 778 779 sync_results = connection.get_previous_sync_logs( 780 limit=effective_limit, 781 offset=jobs_offset, 782 from_tail=from_tail, 783 job_type=job_type, 784 ) 785 786 jobs = [ 787 SyncJobResult( 788 job_id=sync_result.job_id, 789 status=str(sync_result.get_job_status()), 790 bytes_synced=sync_result.bytes_synced, 791 records_synced=sync_result.records_synced, 792 start_time=sync_result.start_time.isoformat(), 793 job_url=sync_result.job_url, 794 ) 795 for sync_result in sync_results 796 ] 797 798 return SyncJobListResult( 799 jobs=jobs, 800 jobs_count=len(jobs), 801 jobs_offset=jobs_offset or 0, 802 from_tail=from_tail, 803 )
List sync jobs for a connection with pagination support.
This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
806@mcp_tool( 807 read_only=True, 808 idempotent=True, 809 open_world=True, 810 extra_help_text=CLOUD_AUTH_TIP_TEXT, 811) 812def list_deployed_cloud_source_connectors( 813 ctx: Context, 814 *, 815 workspace_id: Annotated[ 816 str | None, 817 Field( 818 description=WORKSPACE_ID_TIP_TEXT, 819 default=None, 820 ), 821 ], 822 name_contains: Annotated[ 823 str | None, 824 Field( 825 description="Optional case-insensitive substring to filter sources by name", 826 default=None, 827 ), 828 ], 829 max_items_limit: Annotated[ 830 int | None, 831 Field( 832 description="Optional maximum number of items to return (default: no limit)", 833 default=None, 834 ), 835 ], 836) -> list[CloudSourceResult]: 837 """List all deployed source connectors in the Airbyte Cloud workspace.""" 838 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 839 sources = workspace.list_sources() 840 841 # Filter by name if requested 842 if name_contains: 843 needle = name_contains.lower() 844 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 845 846 # Apply limit if requested 847 if max_items_limit is not None: 848 sources = sources[:max_items_limit] 849 850 # Note: name and url are guaranteed non-null from list API responses 851 return [ 852 CloudSourceResult( 853 id=source.source_id, 854 name=cast(str, source.name), 855 url=cast(str, source.connector_url), 856 ) 857 for source in sources 858 ]
List all deployed source connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
861@mcp_tool( 862 read_only=True, 863 idempotent=True, 864 open_world=True, 865 extra_help_text=CLOUD_AUTH_TIP_TEXT, 866) 867def list_deployed_cloud_destination_connectors( 868 ctx: Context, 869 *, 870 workspace_id: Annotated[ 871 str | None, 872 Field( 873 description=WORKSPACE_ID_TIP_TEXT, 874 default=None, 875 ), 876 ], 877 name_contains: Annotated[ 878 str | None, 879 Field( 880 description="Optional case-insensitive substring to filter destinations by name", 881 default=None, 882 ), 883 ], 884 max_items_limit: Annotated[ 885 int | None, 886 Field( 887 description="Optional maximum number of items to return (default: no limit)", 888 default=None, 889 ), 890 ], 891) -> list[CloudDestinationResult]: 892 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 893 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 894 destinations = workspace.list_destinations() 895 896 # Filter by name if requested 897 if name_contains: 898 needle = name_contains.lower() 899 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 900 901 # Apply limit if requested 902 if max_items_limit is not None: 903 destinations = destinations[:max_items_limit] 904 905 # Note: name and url are guaranteed non-null from list API responses 906 return [ 907 CloudDestinationResult( 908 id=destination.destination_id, 909 name=cast(str, destination.name), 910 url=cast(str, destination.connector_url), 911 ) 912 for destination in destinations 913 ]
List all deployed destination connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
916@mcp_tool( 917 read_only=True, 918 idempotent=True, 919 open_world=True, 920 extra_help_text=CLOUD_AUTH_TIP_TEXT, 921) 922def describe_cloud_source( 923 ctx: Context, 924 source_id: Annotated[ 925 str, 926 Field(description="The ID of the source to describe."), 927 ], 928 *, 929 workspace_id: Annotated[ 930 str | None, 931 Field( 932 description=WORKSPACE_ID_TIP_TEXT, 933 default=None, 934 ), 935 ], 936) -> CloudSourceDetails: 937 """Get detailed information about a specific deployed source connector.""" 938 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 939 source = workspace.get_source(source_id=source_id) 940 941 # Access name property to ensure _connector_info is populated 942 source_name = cast(str, source.name) 943 944 return CloudSourceDetails( 945 source_id=source.source_id, 946 source_name=source_name, 947 source_url=source.connector_url, 948 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 949 )
Get detailed information about a specific deployed source connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
952@mcp_tool( 953 read_only=True, 954 idempotent=True, 955 open_world=True, 956 extra_help_text=CLOUD_AUTH_TIP_TEXT, 957) 958def describe_cloud_destination( 959 ctx: Context, 960 destination_id: Annotated[ 961 str, 962 Field(description="The ID of the destination to describe."), 963 ], 964 *, 965 workspace_id: Annotated[ 966 str | None, 967 Field( 968 description=WORKSPACE_ID_TIP_TEXT, 969 default=None, 970 ), 971 ], 972) -> CloudDestinationDetails: 973 """Get detailed information about a specific deployed destination connector.""" 974 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 975 destination = workspace.get_destination(destination_id=destination_id) 976 977 # Access name property to ensure _connector_info is populated 978 destination_name = cast(str, destination.name) 979 980 return CloudDestinationDetails( 981 destination_id=destination.destination_id, 982 destination_name=destination_name, 983 destination_url=destination.connector_url, 984 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 985 )
Get detailed information about a specific deployed destination connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
988@mcp_tool( 989 read_only=True, 990 idempotent=True, 991 open_world=True, 992 extra_help_text=CLOUD_AUTH_TIP_TEXT, 993) 994def describe_cloud_connection( 995 ctx: Context, 996 connection_id: Annotated[ 997 str, 998 Field(description="The ID of the connection to describe."), 999 ], 1000 *, 1001 workspace_id: Annotated[ 1002 str | None, 1003 Field( 1004 description=WORKSPACE_ID_TIP_TEXT, 1005 default=None, 1006 ), 1007 ], 1008) -> CloudConnectionDetails: 1009 """Get detailed information about a specific deployed connection.""" 1010 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1011 connection = workspace.get_connection(connection_id=connection_id) 1012 1013 return CloudConnectionDetails( 1014 connection_id=connection.connection_id, 1015 connection_name=cast(str, connection.name), 1016 connection_url=cast(str, connection.connection_url), 1017 source_id=connection.source_id, 1018 source_name=cast(str, connection.source.name), 1019 destination_id=connection.destination_id, 1020 destination_name=cast(str, connection.destination.name), 1021 selected_streams=connection.stream_names, 1022 table_prefix=connection.table_prefix, 1023 )
Get detailed information about a specific deployed connection.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1026@mcp_tool( 1027 read_only=True, 1028 idempotent=True, 1029 open_world=True, 1030 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1031) 1032def get_cloud_sync_logs( 1033 ctx: Context, 1034 connection_id: Annotated[ 1035 str, 1036 Field(description="The ID of the Airbyte Cloud connection."), 1037 ], 1038 job_id: Annotated[ 1039 int | None, 1040 Field(description="Optional job ID. If not provided, the latest job will be used."), 1041 ] = None, 1042 attempt_number: Annotated[ 1043 int | None, 1044 Field( 1045 description="Optional attempt number. If not provided, the latest attempt will be used." 1046 ), 1047 ] = None, 1048 *, 1049 workspace_id: Annotated[ 1050 str | None, 1051 Field( 1052 description=WORKSPACE_ID_TIP_TEXT, 1053 default=None, 1054 ), 1055 ], 1056 max_lines: Annotated[ 1057 int, 1058 Field( 1059 description=( 1060 "Maximum number of lines to return. " 1061 "Defaults to 4000 if not specified. " 1062 "If '0' is provided, no limit is applied." 1063 ), 1064 default=4000, 1065 ), 1066 ], 1067 from_tail: Annotated[ 1068 bool | None, 1069 Field( 1070 description=( 1071 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1072 "Defaults to True if `line_offset` is not specified. " 1073 "Cannot combine `from_tail=True` with `line_offset`." 1074 ), 1075 default=None, 1076 ), 1077 ], 1078 line_offset: Annotated[ 1079 int | None, 1080 Field( 1081 description=( 1082 "Number of lines to skip from the beginning of the logs. " 1083 "Cannot be combined with `from_tail=True`." 1084 ), 1085 default=None, 1086 ), 1087 ], 1088) -> LogReadResult: 1089 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1090 # Validate that line_offset and from_tail are not both set 1091 if line_offset is not None and from_tail: 1092 raise PyAirbyteInputError( 1093 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1094 context={"line_offset": line_offset, "from_tail": from_tail}, 1095 ) 1096 1097 if from_tail is None and line_offset is None: 1098 from_tail = True 1099 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1100 connection = workspace.get_connection(connection_id=connection_id) 1101 1102 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1103 1104 if not sync_result: 1105 raise AirbyteMissingResourceError( 1106 resource_type="sync job", 1107 resource_name_or_id=connection_id, 1108 ) 1109 1110 attempts = sync_result.get_attempts() 1111 1112 if not attempts: 1113 raise AirbyteMissingResourceError( 1114 resource_type="sync attempt", 1115 resource_name_or_id=str(sync_result.job_id), 1116 ) 1117 1118 if attempt_number is not None: 1119 target_attempt = None 1120 for attempt in attempts: 1121 if attempt.attempt_number == attempt_number: 1122 target_attempt = attempt 1123 break 1124 1125 if target_attempt is None: 1126 raise AirbyteMissingResourceError( 1127 resource_type="sync attempt", 1128 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1129 ) 1130 else: 1131 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1132 1133 logs = target_attempt.get_full_log_text() 1134 1135 if not logs: 1136 # Return empty result with zero lines 1137 return LogReadResult( 1138 log_text=( 1139 f"[No logs available for job '{sync_result.job_id}', " 1140 f"attempt {target_attempt.attempt_number}.]" 1141 ), 1142 log_text_start_line=1, 1143 log_text_line_count=0, 1144 total_log_lines_available=0, 1145 job_id=sync_result.job_id, 1146 attempt_number=target_attempt.attempt_number, 1147 ) 1148 1149 # Apply line limiting 1150 log_lines = logs.splitlines() 1151 total_lines = len(log_lines) 1152 1153 # Determine effective max_lines (0 means no limit) 1154 effective_max = total_lines if max_lines == 0 else max_lines 1155 1156 # Calculate start_index and slice based on from_tail or line_offset 1157 if from_tail: 1158 start_index = max(0, total_lines - effective_max) 1159 selected_lines = log_lines[start_index:][:effective_max] 1160 else: 1161 start_index = line_offset or 0 1162 selected_lines = log_lines[start_index : start_index + effective_max] 1163 1164 return LogReadResult( 1165 log_text="\n".join(selected_lines), 1166 log_text_start_line=start_index + 1, # Convert to 1-based index 1167 log_text_line_count=len(selected_lines), 1168 total_log_lines_available=total_lines, 1169 job_id=sync_result.job_id, 1170 attempt_number=target_attempt.attempt_number, 1171 )
Get the logs from a sync job attempt on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1174@mcp_tool( 1175 read_only=True, 1176 idempotent=True, 1177 open_world=True, 1178 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1179) 1180def list_deployed_cloud_connections( 1181 ctx: Context, 1182 *, 1183 workspace_id: Annotated[ 1184 str | None, 1185 Field( 1186 description=WORKSPACE_ID_TIP_TEXT, 1187 default=None, 1188 ), 1189 ], 1190 name_contains: Annotated[ 1191 str | None, 1192 Field( 1193 description="Optional case-insensitive substring to filter connections by name", 1194 default=None, 1195 ), 1196 ], 1197 max_items_limit: Annotated[ 1198 int | None, 1199 Field( 1200 description="Optional maximum number of items to return (default: no limit)", 1201 default=None, 1202 ), 1203 ], 1204 with_connection_status: Annotated[ 1205 bool | None, 1206 Field( 1207 description="If True, include status info for each connection's most recent sync job", 1208 default=False, 1209 ), 1210 ], 1211 failing_connections_only: Annotated[ 1212 bool | None, 1213 Field( 1214 description="If True, only return connections with failed/cancelled last sync", 1215 default=False, 1216 ), 1217 ], 1218) -> list[CloudConnectionResult]: 1219 """List all deployed connections in the Airbyte Cloud workspace. 1220 1221 When with_connection_status is True, each connection result will include 1222 information about the most recent sync job status, skipping over any 1223 currently in-progress syncs to find the last completed job. 1224 1225 When failing_connections_only is True, only connections where the most 1226 recent completed sync job failed or was cancelled will be returned. 1227 This implicitly enables with_connection_status. 1228 """ 1229 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1230 connections = workspace.list_connections() 1231 1232 # Filter by name if requested 1233 if name_contains: 1234 needle = name_contains.lower() 1235 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1236 1237 # If failing_connections_only is True, implicitly enable with_connection_status 1238 if failing_connections_only: 1239 with_connection_status = True 1240 1241 results: list[CloudConnectionResult] = [] 1242 1243 for connection in connections: 1244 last_job_status: str | None = None 1245 last_job_id: int | None = None 1246 last_job_time: str | None = None 1247 currently_running_job_id: int | None = None 1248 currently_running_job_start_time: str | None = None 1249 1250 if with_connection_status: 1251 sync_logs = connection.get_previous_sync_logs(limit=5) 1252 last_completed_job_status = None # Keep enum for comparison 1253 1254 for sync_result in sync_logs: 1255 job_status = sync_result.get_job_status() 1256 1257 if not sync_result.is_job_complete(): 1258 currently_running_job_id = sync_result.job_id 1259 currently_running_job_start_time = sync_result.start_time.isoformat() 1260 continue 1261 1262 last_completed_job_status = job_status 1263 last_job_status = str(job_status.value) if job_status else None 1264 last_job_id = sync_result.job_id 1265 last_job_time = sync_result.start_time.isoformat() 1266 break 1267 1268 if failing_connections_only and ( 1269 last_completed_job_status is None 1270 or last_completed_job_status not in FAILED_STATUSES 1271 ): 1272 continue 1273 1274 results.append( 1275 CloudConnectionResult( 1276 id=connection.connection_id, 1277 name=cast(str, connection.name), 1278 url=cast(str, connection.connection_url), 1279 source_id=connection.source_id, 1280 destination_id=connection.destination_id, 1281 last_job_status=last_job_status, 1282 last_job_id=last_job_id, 1283 last_job_time=last_job_time, 1284 currently_running_job_id=currently_running_job_id, 1285 currently_running_job_start_time=currently_running_job_start_time, 1286 ) 1287 ) 1288 1289 if max_items_limit is not None and len(results) >= max_items_limit: 1290 break 1291 1292 return results
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1412@mcp_tool( 1413 read_only=True, 1414 idempotent=True, 1415 open_world=True, 1416 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1417) 1418def list_cloud_workspaces( 1419 ctx: Context, 1420 *, 1421 organization_id: Annotated[ 1422 str | None, 1423 Field( 1424 description="Organization ID. Required if organization_name is not provided.", 1425 default=None, 1426 ), 1427 ], 1428 organization_name: Annotated[ 1429 str | None, 1430 Field( 1431 description=( 1432 "Organization name (exact match). " "Required if organization_id is not provided." 1433 ), 1434 default=None, 1435 ), 1436 ], 1437 name_contains: Annotated[ 1438 str | None, 1439 Field( 1440 description="Optional substring to filter workspaces by name (server-side filtering)", 1441 default=None, 1442 ), 1443 ], 1444 max_items_limit: Annotated[ 1445 int | None, 1446 Field( 1447 description="Optional maximum number of items to return (default: no limit)", 1448 default=None, 1449 ), 1450 ], 1451) -> list[CloudWorkspaceResult]: 1452 """List all workspaces in a specific organization. 1453 1454 Requires either organization_id OR organization_name (exact match) to be provided. 1455 This tool will NOT list workspaces across all organizations - you must specify 1456 which organization to list workspaces from. 1457 """ 1458 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1459 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1460 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1461 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1462 1463 resolved_org_id = _resolve_organization_id( 1464 organization_id=organization_id, 1465 organization_name=organization_name, 1466 api_root=api_url, 1467 client_id=SecretString(client_id) if client_id else None, 1468 client_secret=SecretString(client_secret) if client_secret else None, 1469 bearer_token=SecretString(bearer_token) if bearer_token else None, 1470 ) 1471 1472 workspaces = api_util.list_workspaces_in_organization( 1473 organization_id=resolved_org_id, 1474 api_root=api_url, 1475 client_id=SecretString(client_id) if client_id else None, 1476 client_secret=SecretString(client_secret) if client_secret else None, 1477 bearer_token=SecretString(bearer_token) if bearer_token else None, 1478 name_contains=name_contains, 1479 max_items_limit=max_items_limit, 1480 ) 1481 1482 return [ 1483 CloudWorkspaceResult( 1484 workspace_id=ws.get("workspaceId", ""), 1485 workspace_name=ws.get("name", ""), 1486 organization_id=ws.get("organizationId", ""), 1487 ) 1488 for ws in workspaces 1489 ]
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1492@mcp_tool( 1493 read_only=True, 1494 idempotent=True, 1495 open_world=True, 1496 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1497) 1498def describe_cloud_organization( 1499 ctx: Context, 1500 *, 1501 organization_id: Annotated[ 1502 str | None, 1503 Field( 1504 description="Organization ID. Required if organization_name is not provided.", 1505 default=None, 1506 ), 1507 ], 1508 organization_name: Annotated[ 1509 str | None, 1510 Field( 1511 description=( 1512 "Organization name (exact match). " "Required if organization_id is not provided." 1513 ), 1514 default=None, 1515 ), 1516 ], 1517) -> CloudOrganizationResult: 1518 """Get details about a specific organization including billing status. 1519 1520 Requires either organization_id OR organization_name (exact match) to be provided. 1521 This tool is useful for looking up an organization's ID from its name, or vice versa. 1522 """ 1523 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1524 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1525 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1526 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1527 1528 org = _resolve_organization( 1529 organization_id=organization_id, 1530 organization_name=organization_name, 1531 api_root=api_url, 1532 client_id=SecretString(client_id) if client_id else None, 1533 client_secret=SecretString(client_secret) if client_secret else None, 1534 bearer_token=SecretString(bearer_token) if bearer_token else None, 1535 ) 1536 1537 # CloudOrganization has lazy loading of billing properties 1538 return CloudOrganizationResult( 1539 id=org.organization_id, 1540 name=org.organization_name, 1541 email=org.email, 1542 payment_status=org.payment_status, 1543 subscription_status=org.subscription_status, 1544 is_account_locked=org.is_account_locked, 1545 )
Get details about a specific organization including billing status.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1562@mcp_tool( 1563 open_world=True, 1564 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1565) 1566def publish_custom_source_definition( 1567 ctx: Context, 1568 name: Annotated[ 1569 str, 1570 Field(description="The name for the custom connector definition."), 1571 ], 1572 *, 1573 workspace_id: Annotated[ 1574 str | None, 1575 Field( 1576 description=WORKSPACE_ID_TIP_TEXT, 1577 default=None, 1578 ), 1579 ], 1580 manifest_yaml: Annotated[ 1581 str | Path | None, 1582 Field( 1583 description=( 1584 "The Low-code CDK manifest as a YAML string or file path. " 1585 "Required for YAML connectors." 1586 ), 1587 default=None, 1588 ), 1589 ] = None, 1590 unique: Annotated[ 1591 bool, 1592 Field( 1593 description="Whether to require a unique name.", 1594 default=True, 1595 ), 1596 ] = True, 1597 pre_validate: Annotated[ 1598 bool, 1599 Field( 1600 description="Whether to validate the manifest client-side before publishing.", 1601 default=True, 1602 ), 1603 ] = True, 1604 testing_values: Annotated[ 1605 dict | str | None, 1606 Field( 1607 description=( 1608 "Optional testing configuration values for the Builder UI. " 1609 "Can be provided as a JSON object or JSON string. " 1610 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1611 "If provided, these values replace any existing testing values " 1612 "for the connector builder project, allowing immediate test read operations." 1613 ), 1614 default=None, 1615 ), 1616 ], 1617 testing_values_secret_name: Annotated[ 1618 str | None, 1619 Field( 1620 description=( 1621 "Optional name of a secret containing testing configuration values " 1622 "in JSON or YAML format. The secret will be resolved by the MCP " 1623 "server and merged into testing_values, with secret values taking " 1624 "precedence. This lets the agent reference secrets without sending " 1625 "raw values as tool arguments." 1626 ), 1627 default=None, 1628 ), 1629 ], 1630) -> str: 1631 """Publish a custom YAML source connector definition to Airbyte Cloud. 1632 1633 Note: Only YAML (declarative) connectors are currently supported. 1634 Docker-based custom sources are not yet available. 1635 """ 1636 processed_manifest = manifest_yaml 1637 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1638 processed_manifest = Path(manifest_yaml) 1639 1640 # Resolve testing values from inline config and/or secret 1641 testing_values_dict: dict[str, Any] | None = None 1642 if testing_values is not None or testing_values_secret_name is not None: 1643 testing_values_dict = ( 1644 resolve_connector_config( 1645 config=testing_values, 1646 config_secret_name=testing_values_secret_name, 1647 ) 1648 or None 1649 ) 1650 1651 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1652 custom_source = workspace.publish_custom_source_definition( 1653 name=name, 1654 manifest_yaml=processed_manifest, 1655 unique=unique, 1656 pre_validate=pre_validate, 1657 testing_values=testing_values_dict, 1658 ) 1659 register_guid_created_in_session(custom_source.definition_id) 1660 return ( 1661 "Successfully published custom YAML source definition:\n" 1662 + _get_custom_source_definition_description( 1663 custom_source=custom_source, 1664 ) 1665 + "\n" 1666 )
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1669@mcp_tool( 1670 read_only=True, 1671 idempotent=True, 1672 open_world=True, 1673) 1674def list_custom_source_definitions( 1675 ctx: Context, 1676 *, 1677 workspace_id: Annotated[ 1678 str | None, 1679 Field( 1680 description=WORKSPACE_ID_TIP_TEXT, 1681 default=None, 1682 ), 1683 ], 1684) -> list[dict[str, Any]]: 1685 """List custom YAML source definitions in the Airbyte Cloud workspace. 1686 1687 Note: Only YAML (declarative) connectors are currently supported. 1688 Docker-based custom sources are not yet available. 1689 """ 1690 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1691 definitions = workspace.list_custom_source_definitions( 1692 definition_type="yaml", 1693 ) 1694 1695 return [ 1696 { 1697 "definition_id": d.definition_id, 1698 "name": d.name, 1699 "version": d.version, 1700 "connector_builder_project_url": d.connector_builder_project_url, 1701 } 1702 for d in definitions 1703 ]
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1706@mcp_tool( 1707 read_only=True, 1708 idempotent=True, 1709 open_world=True, 1710) 1711def get_custom_source_definition( 1712 ctx: Context, 1713 definition_id: Annotated[ 1714 str, 1715 Field(description="The ID of the custom source definition to retrieve."), 1716 ], 1717 *, 1718 workspace_id: Annotated[ 1719 str | None, 1720 Field( 1721 description=WORKSPACE_ID_TIP_TEXT, 1722 default=None, 1723 ), 1724 ], 1725 include_draft: Annotated[ 1726 bool, 1727 Field( 1728 description=( 1729 "Whether to include the Connector Builder draft manifest in the response. " 1730 "If True and a draft exists, the response will include 'has_draft' and " 1731 "'draft_manifest' fields. Defaults to False." 1732 ), 1733 default=False, 1734 ), 1735 ] = False, 1736) -> dict[str, Any]: 1737 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1738 1739 Returns the full definition details including the published manifest YAML content. 1740 Optionally includes the Connector Builder draft manifest (unpublished changes) 1741 when include_draft=True. 1742 1743 Note: Only YAML (declarative) connectors are currently supported. 1744 Docker-based custom sources are not yet available. 1745 """ 1746 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1747 definition = workspace.get_custom_source_definition( 1748 definition_id=definition_id, 1749 definition_type="yaml", 1750 ) 1751 1752 result: dict[str, Any] = { 1753 "definition_id": definition.definition_id, 1754 "name": definition.name, 1755 "version": definition.version, 1756 "connector_builder_project_id": definition.connector_builder_project_id, 1757 "connector_builder_project_url": definition.connector_builder_project_url, 1758 "manifest": definition.manifest, 1759 } 1760 1761 if include_draft: 1762 result["has_draft"] = definition.has_draft 1763 result["draft_manifest"] = definition.draft_manifest 1764 1765 return result
Get a custom YAML source definition from Airbyte Cloud, including its manifest.
Returns the full definition details including the published manifest YAML content. Optionally includes the Connector Builder draft manifest (unpublished changes) when include_draft=True.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1768@mcp_tool( 1769 read_only=True, 1770 idempotent=True, 1771 open_world=True, 1772) 1773def get_connector_builder_draft_manifest( 1774 ctx: Context, 1775 definition_id: Annotated[ 1776 str, 1777 Field(description="The ID of the custom source definition to retrieve the draft for."), 1778 ], 1779 *, 1780 workspace_id: Annotated[ 1781 str | None, 1782 Field( 1783 description=WORKSPACE_ID_TIP_TEXT, 1784 default=None, 1785 ), 1786 ], 1787) -> dict[str, Any]: 1788 """Get the Connector Builder draft manifest for a custom source definition. 1789 1790 Returns the working draft manifest that has been saved in the Connector Builder UI 1791 but not yet published. This is useful for inspecting what a user is currently working 1792 on before they publish their changes. 1793 1794 If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. 1795 The published manifest is always included for comparison. 1796 """ 1797 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1798 definition = workspace.get_custom_source_definition( 1799 definition_id=definition_id, 1800 definition_type="yaml", 1801 ) 1802 1803 return { 1804 "definition_id": definition.definition_id, 1805 "name": definition.name, 1806 "connector_builder_project_id": definition.connector_builder_project_id, 1807 "connector_builder_project_url": definition.connector_builder_project_url, 1808 "has_draft": definition.has_draft, 1809 "draft_manifest": definition.draft_manifest, 1810 "published_manifest": definition.manifest, 1811 }
Get the Connector Builder draft manifest for a custom source definition.
Returns the working draft manifest that has been saved in the Connector Builder UI but not yet published. This is useful for inspecting what a user is currently working on before they publish their changes.
If no draft exists, 'has_draft' will be False and 'draft_manifest' will be None. The published manifest is always included for comparison.
1814@mcp_tool( 1815 destructive=True, 1816 open_world=True, 1817) 1818def update_custom_source_definition( 1819 ctx: Context, 1820 definition_id: Annotated[ 1821 str, 1822 Field(description="The ID of the definition to update."), 1823 ], 1824 manifest_yaml: Annotated[ 1825 str | Path | None, 1826 Field( 1827 description=( 1828 "New manifest as YAML string or file path. " 1829 "Optional; omit to update only testing values." 1830 ), 1831 default=None, 1832 ), 1833 ] = None, 1834 *, 1835 workspace_id: Annotated[ 1836 str | None, 1837 Field( 1838 description=WORKSPACE_ID_TIP_TEXT, 1839 default=None, 1840 ), 1841 ], 1842 pre_validate: Annotated[ 1843 bool, 1844 Field( 1845 description="Whether to validate the manifest client-side before updating.", 1846 default=True, 1847 ), 1848 ] = True, 1849 testing_values: Annotated[ 1850 dict | str | None, 1851 Field( 1852 description=( 1853 "Optional testing configuration values for the Builder UI. " 1854 "Can be provided as a JSON object or JSON string. " 1855 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1856 "If provided, these values replace any existing testing values " 1857 "for the connector builder project. The entire testing values object " 1858 "is overwritten, so pass the full set of values you want to persist." 1859 ), 1860 default=None, 1861 ), 1862 ], 1863 testing_values_secret_name: Annotated[ 1864 str | None, 1865 Field( 1866 description=( 1867 "Optional name of a secret containing testing configuration values " 1868 "in JSON or YAML format. The secret will be resolved by the MCP " 1869 "server and merged into testing_values, with secret values taking " 1870 "precedence. This lets the agent reference secrets without sending " 1871 "raw values as tool arguments." 1872 ), 1873 default=None, 1874 ), 1875 ], 1876) -> str: 1877 """Update a custom YAML source definition in Airbyte Cloud. 1878 1879 Updates the manifest and/or testing values for an existing custom source definition. 1880 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1881 """ 1882 check_guid_created_in_session(definition_id) 1883 1884 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1885 1886 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1887 raise PyAirbyteInputError( 1888 message=( 1889 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1890 "must be provided to update a custom source definition." 1891 ), 1892 context={ 1893 "definition_id": definition_id, 1894 "workspace_id": workspace.workspace_id, 1895 }, 1896 ) 1897 1898 processed_manifest: str | Path | None = manifest_yaml 1899 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1900 processed_manifest = Path(manifest_yaml) 1901 1902 # Resolve testing values from inline config and/or secret 1903 testing_values_dict: dict[str, Any] | None = None 1904 if testing_values is not None or testing_values_secret_name is not None: 1905 testing_values_dict = ( 1906 resolve_connector_config( 1907 config=testing_values, 1908 config_secret_name=testing_values_secret_name, 1909 ) 1910 or None 1911 ) 1912 1913 definition = workspace.get_custom_source_definition( 1914 definition_id=definition_id, 1915 definition_type="yaml", 1916 ) 1917 custom_source: CustomCloudSourceDefinition = definition 1918 1919 if processed_manifest is not None: 1920 custom_source = definition.update_definition( 1921 manifest_yaml=processed_manifest, 1922 pre_validate=pre_validate, 1923 ) 1924 1925 if testing_values_dict is not None: 1926 custom_source.set_testing_values(testing_values_dict) 1927 1928 return ( 1929 "Successfully updated custom YAML source definition:\n" 1930 + _get_custom_source_definition_description( 1931 custom_source=custom_source, 1932 ) 1933 )
Update a custom YAML source definition in Airbyte Cloud.
Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1936@mcp_tool( 1937 destructive=True, 1938 open_world=True, 1939) 1940def permanently_delete_custom_source_definition( 1941 ctx: Context, 1942 definition_id: Annotated[ 1943 str, 1944 Field(description="The ID of the custom source definition to delete."), 1945 ], 1946 name: Annotated[ 1947 str, 1948 Field(description="The expected name of the custom source definition (for verification)."), 1949 ], 1950 *, 1951 workspace_id: Annotated[ 1952 str | None, 1953 Field( 1954 description=WORKSPACE_ID_TIP_TEXT, 1955 default=None, 1956 ), 1957 ], 1958) -> str: 1959 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1960 1961 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1962 (case insensitive). 1963 1964 If the connector does not meet this requirement, the deletion will be rejected with a 1965 helpful error message. Instruct the user to rename the connector appropriately to authorize 1966 the deletion. 1967 1968 The provided name must match the actual name of the definition for the operation to proceed. 1969 This is a safety measure to ensure you are deleting the correct resource. 1970 1971 Note: Only YAML (declarative) connectors are currently supported. 1972 Docker-based custom sources are not yet available. 1973 """ 1974 check_guid_created_in_session(definition_id) 1975 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1976 definition = workspace.get_custom_source_definition( 1977 definition_id=definition_id, 1978 definition_type="yaml", 1979 ) 1980 actual_name: str = definition.name 1981 1982 # Verify the name matches 1983 if actual_name != name: 1984 raise PyAirbyteInputError( 1985 message=( 1986 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1987 "The provided name must exactly match the definition's actual name. " 1988 "This is a safety measure to prevent accidental deletion." 1989 ), 1990 context={ 1991 "definition_id": definition_id, 1992 "expected_name": name, 1993 "actual_name": actual_name, 1994 }, 1995 ) 1996 1997 definition.permanently_delete( 1998 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1999 ) 2000 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
2003@mcp_tool( 2004 destructive=True, 2005 open_world=True, 2006 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2007) 2008def permanently_delete_cloud_source( 2009 ctx: Context, 2010 source_id: Annotated[ 2011 str, 2012 Field(description="The ID of the deployed source to delete."), 2013 ], 2014 name: Annotated[ 2015 str, 2016 Field(description="The expected name of the source (for verification)."), 2017 ], 2018) -> str: 2019 """Permanently delete a deployed source connector from Airbyte Cloud. 2020 2021 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 2022 (case insensitive). 2023 2024 If the source does not meet this requirement, the deletion will be rejected with a 2025 helpful error message. Instruct the user to rename the source appropriately to authorize 2026 the deletion. 2027 2028 The provided name must match the actual name of the source for the operation to proceed. 2029 This is a safety measure to ensure you are deleting the correct resource. 2030 """ 2031 check_guid_created_in_session(source_id) 2032 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2033 source = workspace.get_source(source_id=source_id) 2034 actual_name: str = cast(str, source.name) 2035 2036 # Verify the name matches 2037 if actual_name != name: 2038 raise PyAirbyteInputError( 2039 message=( 2040 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2041 "The provided name must exactly match the source's actual name. " 2042 "This is a safety measure to prevent accidental deletion." 2043 ), 2044 context={ 2045 "source_id": source_id, 2046 "expected_name": name, 2047 "actual_name": actual_name, 2048 }, 2049 ) 2050 2051 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2052 workspace.permanently_delete_source( 2053 source=source_id, 2054 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 2055 ) 2056 return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.
The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2059@mcp_tool( 2060 destructive=True, 2061 open_world=True, 2062 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2063) 2064def permanently_delete_cloud_destination( 2065 ctx: Context, 2066 destination_id: Annotated[ 2067 str, 2068 Field(description="The ID of the deployed destination to delete."), 2069 ], 2070 name: Annotated[ 2071 str, 2072 Field(description="The expected name of the destination (for verification)."), 2073 ], 2074) -> str: 2075 """Permanently delete a deployed destination connector from Airbyte Cloud. 2076 2077 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 2078 (case insensitive). 2079 2080 If the destination does not meet this requirement, the deletion will be rejected with a 2081 helpful error message. Instruct the user to rename the destination appropriately to authorize 2082 the deletion. 2083 2084 The provided name must match the actual name of the destination for the operation to proceed. 2085 This is a safety measure to ensure you are deleting the correct resource. 2086 """ 2087 check_guid_created_in_session(destination_id) 2088 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2089 destination = workspace.get_destination(destination_id=destination_id) 2090 actual_name: str = cast(str, destination.name) 2091 2092 # Verify the name matches 2093 if actual_name != name: 2094 raise PyAirbyteInputError( 2095 message=( 2096 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2097 "The provided name must exactly match the destination's actual name. " 2098 "This is a safety measure to prevent accidental deletion." 2099 ), 2100 context={ 2101 "destination_id": destination_id, 2102 "expected_name": name, 2103 "actual_name": actual_name, 2104 }, 2105 ) 2106 2107 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2108 workspace.permanently_delete_destination( 2109 destination=destination_id, 2110 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2111 ) 2112 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.
The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2115@mcp_tool( 2116 destructive=True, 2117 open_world=True, 2118 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2119) 2120def permanently_delete_cloud_connection( 2121 ctx: Context, 2122 connection_id: Annotated[ 2123 str, 2124 Field(description="The ID of the connection to delete."), 2125 ], 2126 name: Annotated[ 2127 str, 2128 Field(description="The expected name of the connection (for verification)."), 2129 ], 2130 *, 2131 cascade_delete_source: Annotated[ 2132 bool, 2133 Field( 2134 description=( 2135 "Whether to also delete the source connector associated with this connection." 2136 ), 2137 default=False, 2138 ), 2139 ] = False, 2140 cascade_delete_destination: Annotated[ 2141 bool, 2142 Field( 2143 description=( 2144 "Whether to also delete the destination connector associated with this connection." 2145 ), 2146 default=False, 2147 ), 2148 ] = False, 2149) -> str: 2150 """Permanently delete a connection from Airbyte Cloud. 2151 2152 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2153 (case insensitive). 2154 2155 If the connection does not meet this requirement, the deletion will be rejected with a 2156 helpful error message. Instruct the user to rename the connection appropriately to authorize 2157 the deletion. 2158 2159 The provided name must match the actual name of the connection for the operation to proceed. 2160 This is a safety measure to ensure you are deleting the correct resource. 2161 """ 2162 check_guid_created_in_session(connection_id) 2163 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2164 connection = workspace.get_connection(connection_id=connection_id) 2165 actual_name: str = cast(str, connection.name) 2166 2167 # Verify the name matches 2168 if actual_name != name: 2169 raise PyAirbyteInputError( 2170 message=( 2171 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2172 "The provided name must exactly match the connection's actual name. " 2173 "This is a safety measure to prevent accidental deletion." 2174 ), 2175 context={ 2176 "connection_id": connection_id, 2177 "expected_name": name, 2178 "actual_name": actual_name, 2179 }, 2180 ) 2181 2182 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2183 workspace.permanently_delete_connection( 2184 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2185 connection=connection_id, 2186 cascade_delete_source=cascade_delete_source, 2187 cascade_delete_destination=cascade_delete_destination, 2188 ) 2189 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.
The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2192@mcp_tool( 2193 open_world=True, 2194 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2195) 2196def rename_cloud_source( 2197 ctx: Context, 2198 source_id: Annotated[ 2199 str, 2200 Field(description="The ID of the deployed source to rename."), 2201 ], 2202 name: Annotated[ 2203 str, 2204 Field(description="New name for the source."), 2205 ], 2206 *, 2207 workspace_id: Annotated[ 2208 str | None, 2209 Field( 2210 description=WORKSPACE_ID_TIP_TEXT, 2211 default=None, 2212 ), 2213 ], 2214) -> str: 2215 """Rename a deployed source connector on Airbyte Cloud.""" 2216 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2217 source = workspace.get_source(source_id=source_id) 2218 source.rename(name=name) 2219 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
Rename a deployed source connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2222@mcp_tool( 2223 destructive=True, 2224 open_world=True, 2225 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2226) 2227def update_cloud_source_config( 2228 ctx: Context, 2229 source_id: Annotated[ 2230 str, 2231 Field(description="The ID of the deployed source to update."), 2232 ], 2233 config: Annotated[ 2234 dict | str, 2235 Field( 2236 description="New configuration for the source connector.", 2237 ), 2238 ], 2239 config_secret_name: Annotated[ 2240 str | None, 2241 Field( 2242 description="The name of the secret containing the configuration.", 2243 default=None, 2244 ), 2245 ] = None, 2246 *, 2247 workspace_id: Annotated[ 2248 str | None, 2249 Field( 2250 description=WORKSPACE_ID_TIP_TEXT, 2251 default=None, 2252 ), 2253 ], 2254) -> str: 2255 """Update a deployed source connector's configuration on Airbyte Cloud. 2256 2257 This is a destructive operation that can break existing connections if the 2258 configuration is changed incorrectly. Use with caution. 2259 """ 2260 check_guid_created_in_session(source_id) 2261 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2262 source = workspace.get_source(source_id=source_id) 2263 2264 config_dict = resolve_connector_config( 2265 config=config, 2266 config_secret_name=config_secret_name, 2267 config_spec_jsonschema=None, # We don't have the spec here 2268 ) 2269 2270 source.update_config(config=config_dict) 2271 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2274@mcp_tool( 2275 open_world=True, 2276 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2277) 2278def rename_cloud_destination( 2279 ctx: Context, 2280 destination_id: Annotated[ 2281 str, 2282 Field(description="The ID of the deployed destination to rename."), 2283 ], 2284 name: Annotated[ 2285 str, 2286 Field(description="New name for the destination."), 2287 ], 2288 *, 2289 workspace_id: Annotated[ 2290 str | None, 2291 Field( 2292 description=WORKSPACE_ID_TIP_TEXT, 2293 default=None, 2294 ), 2295 ], 2296) -> str: 2297 """Rename a deployed destination connector on Airbyte Cloud.""" 2298 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2299 destination = workspace.get_destination(destination_id=destination_id) 2300 destination.rename(name=name) 2301 return ( 2302 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2303 f"URL: {destination.connector_url}" 2304 )
Rename a deployed destination connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2307@mcp_tool( 2308 destructive=True, 2309 open_world=True, 2310 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2311) 2312def update_cloud_destination_config( 2313 ctx: Context, 2314 destination_id: Annotated[ 2315 str, 2316 Field(description="The ID of the deployed destination to update."), 2317 ], 2318 config: Annotated[ 2319 dict | str, 2320 Field( 2321 description="New configuration for the destination connector.", 2322 ), 2323 ], 2324 config_secret_name: Annotated[ 2325 str | None, 2326 Field( 2327 description="The name of the secret containing the configuration.", 2328 default=None, 2329 ), 2330 ], 2331 *, 2332 workspace_id: Annotated[ 2333 str | None, 2334 Field( 2335 description=WORKSPACE_ID_TIP_TEXT, 2336 default=None, 2337 ), 2338 ], 2339) -> str: 2340 """Update a deployed destination connector's configuration on Airbyte Cloud. 2341 2342 This is a destructive operation that can break existing connections if the 2343 configuration is changed incorrectly. Use with caution. 2344 """ 2345 check_guid_created_in_session(destination_id) 2346 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2347 destination = workspace.get_destination(destination_id=destination_id) 2348 2349 config_dict = resolve_connector_config( 2350 config=config, 2351 config_secret_name=config_secret_name, 2352 config_spec_jsonschema=None, # We don't have the spec here 2353 ) 2354 2355 destination.update_config(config=config_dict) 2356 return ( 2357 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2358 )
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2361@mcp_tool( 2362 open_world=True, 2363 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2364) 2365def rename_cloud_connection( 2366 ctx: Context, 2367 connection_id: Annotated[ 2368 str, 2369 Field(description="The ID of the connection to rename."), 2370 ], 2371 name: Annotated[ 2372 str, 2373 Field(description="New name for the connection."), 2374 ], 2375 *, 2376 workspace_id: Annotated[ 2377 str | None, 2378 Field( 2379 description=WORKSPACE_ID_TIP_TEXT, 2380 default=None, 2381 ), 2382 ], 2383) -> str: 2384 """Rename a connection on Airbyte Cloud.""" 2385 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2386 connection = workspace.get_connection(connection_id=connection_id) 2387 connection.rename(name=name) 2388 return ( 2389 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2390 f"URL: {connection.connection_url}" 2391 )
Rename a connection on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2394@mcp_tool( 2395 destructive=True, 2396 open_world=True, 2397 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2398) 2399def set_cloud_connection_table_prefix( 2400 ctx: Context, 2401 connection_id: Annotated[ 2402 str, 2403 Field(description="The ID of the connection to update."), 2404 ], 2405 prefix: Annotated[ 2406 str, 2407 Field(description="New table prefix to use when syncing to the destination."), 2408 ], 2409 *, 2410 workspace_id: Annotated[ 2411 str | None, 2412 Field( 2413 description=WORKSPACE_ID_TIP_TEXT, 2414 default=None, 2415 ), 2416 ], 2417) -> str: 2418 """Set the table prefix for a connection on Airbyte Cloud. 2419 2420 This is a destructive operation that can break downstream dependencies if the 2421 table prefix is changed incorrectly. Use with caution. 2422 """ 2423 check_guid_created_in_session(connection_id) 2424 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2425 connection = workspace.get_connection(connection_id=connection_id) 2426 connection.set_table_prefix(prefix=prefix) 2427 return ( 2428 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2429 f"URL: {connection.connection_url}" 2430 )
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2433@mcp_tool( 2434 destructive=True, 2435 open_world=True, 2436 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2437) 2438def set_cloud_connection_selected_streams( 2439 ctx: Context, 2440 connection_id: Annotated[ 2441 str, 2442 Field(description="The ID of the connection to update."), 2443 ], 2444 stream_names: Annotated[ 2445 str | list[str], 2446 Field( 2447 description=( 2448 "The selected stream names to sync within the connection. " 2449 "Must be an explicit stream name or list of streams." 2450 ) 2451 ), 2452 ], 2453 *, 2454 workspace_id: Annotated[ 2455 str | None, 2456 Field( 2457 description=WORKSPACE_ID_TIP_TEXT, 2458 default=None, 2459 ), 2460 ], 2461) -> str: 2462 """Set the selected streams for a connection on Airbyte Cloud. 2463 2464 This is a destructive operation that can break existing connections if the 2465 stream selection is changed incorrectly. Use with caution. 2466 """ 2467 check_guid_created_in_session(connection_id) 2468 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2469 connection = workspace.get_connection(connection_id=connection_id) 2470 2471 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2472 connection.set_selected_streams(stream_names=resolved_streams_list) 2473 2474 return ( 2475 f"Successfully set selected streams for connection '{connection_id}' " 2476 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2477 )
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2480@mcp_tool( 2481 open_world=True, 2482 destructive=True, 2483 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2484) 2485def update_cloud_connection( 2486 ctx: Context, 2487 connection_id: Annotated[ 2488 str, 2489 Field(description="The ID of the connection to update."), 2490 ], 2491 *, 2492 enabled: Annotated[ 2493 bool | None, 2494 Field( 2495 description=( 2496 "Set the connection's enabled status. " 2497 "True enables the connection (status='active'), " 2498 "False disables it (status='inactive'). " 2499 "Leave unset to keep the current status." 2500 ), 2501 default=None, 2502 ), 2503 ], 2504 cron_expression: Annotated[ 2505 str | None, 2506 Field( 2507 description=( 2508 "A cron expression defining when syncs should run. " 2509 "Examples: '0 0 * * *' (daily at midnight UTC), " 2510 "'0 */6 * * *' (every 6 hours), " 2511 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2512 "Leave unset to keep the current schedule. " 2513 "Cannot be used together with 'manual_schedule'." 2514 ), 2515 default=None, 2516 ), 2517 ], 2518 manual_schedule: Annotated[ 2519 bool | None, 2520 Field( 2521 description=( 2522 "Set to True to disable automatic syncs (manual scheduling only). " 2523 "Syncs will only run when manually triggered. " 2524 "Cannot be used together with 'cron_expression'." 2525 ), 2526 default=None, 2527 ), 2528 ], 2529 workspace_id: Annotated[ 2530 str | None, 2531 Field( 2532 description=WORKSPACE_ID_TIP_TEXT, 2533 default=None, 2534 ), 2535 ], 2536) -> str: 2537 """Update a connection's settings on Airbyte Cloud. 2538 2539 This tool allows updating multiple connection settings in a single call: 2540 - Enable or disable the connection 2541 - Set a cron schedule for automatic syncs 2542 - Switch to manual scheduling (no automatic syncs) 2543 2544 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2545 parameters are mutually exclusive. 2546 """ 2547 check_guid_created_in_session(connection_id) 2548 2549 # Validate that at least one setting is provided 2550 if enabled is None and cron_expression is None and manual_schedule is None: 2551 raise ValueError( 2552 "At least one setting must be provided: 'enabled', 'cron_expression', " 2553 "or 'manual_schedule'." 2554 ) 2555 2556 # Validate mutually exclusive schedule options 2557 if cron_expression is not None and manual_schedule is True: 2558 raise ValueError( 2559 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2560 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2561 "for manual-only syncs." 2562 ) 2563 2564 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2565 connection = workspace.get_connection(connection_id=connection_id) 2566 2567 changes_made: list[str] = [] 2568 2569 # Apply enabled status change 2570 if enabled is not None: 2571 connection.set_enabled(enabled=enabled) 2572 status_str = "enabled" if enabled else "disabled" 2573 changes_made.append(f"status set to '{status_str}'") 2574 2575 # Apply schedule change 2576 if cron_expression is not None: 2577 connection.set_schedule(cron_expression=cron_expression) 2578 changes_made.append(f"schedule set to '{cron_expression}'") 2579 elif manual_schedule is True: 2580 connection.set_manual_schedule() 2581 changes_made.append("schedule set to 'manual'") 2582 2583 changes_summary = ", ".join(changes_made) 2584 return ( 2585 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2586 f"URL: {connection.connection_url}" 2587 )
Update a connection's settings on Airbyte Cloud.
This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)
At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2590@mcp_tool( 2591 read_only=True, 2592 idempotent=True, 2593 open_world=True, 2594 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2595) 2596def get_connection_artifact( 2597 ctx: Context, 2598 connection_id: Annotated[ 2599 str, 2600 Field(description="The ID of the Airbyte Cloud connection."), 2601 ], 2602 artifact_type: Annotated[ 2603 Literal["state", "catalog"], 2604 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2605 ], 2606 *, 2607 workspace_id: Annotated[ 2608 str | None, 2609 Field( 2610 description=WORKSPACE_ID_TIP_TEXT, 2611 default=None, 2612 ), 2613 ], 2614) -> dict[str, Any]: 2615 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2616 2617 Retrieves the specified artifact for a connection: 2618 - 'state': Returns the full raw connection state including stateType and all 2619 state data, or {"ERROR": "..."} if no state is set. 2620 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2621 or {"ERROR": "..."} if not found. 2622 """ 2623 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2624 connection = workspace.get_connection(connection_id=connection_id) 2625 2626 if artifact_type == "state": 2627 result = connection.dump_raw_state() 2628 if result.get("stateType") == "not_set": 2629 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2630 return result 2631 2632 # artifact_type == "catalog" 2633 result = connection.dump_raw_catalog() 2634 if result is None: 2635 return {"ERROR": "No catalog found for this connection"} 2636 return result
Get a connection artifact (state or catalog) from Airbyte Cloud.
Retrieves the specified artifact for a connection:
- 'state': Returns the full raw connection state including stateType and all
state data, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
or {"ERROR": "..."} if not found.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2676def register_cloud_tools(app: FastMCP) -> None: 2677 """Register cloud tools with the FastMCP app. 2678 2679 Args: 2680 app: FastMCP application instance 2681 """ 2682 exclude_args = ["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None 2683 if exclude_args: 2684 _add_defaults_for_exclude_args(exclude_args) 2685 register_mcp_tools( 2686 app, 2687 mcp_module=__name__, 2688 exclude_args=exclude_args, 2689 )
Register cloud tools with the FastMCP app.
Arguments:
- app: FastMCP application instance