airbyte.mcp.cloud
Airbyte Cloud MCP operations.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Airbyte Cloud MCP operations.""" 3 4from pathlib import Path 5from typing import Annotated, Any, Literal, cast 6 7from airbyte_api.models import JobTypeEnum 8from fastmcp import Context, FastMCP 9from fastmcp_extensions import get_mcp_config, mcp_tool, register_mcp_tools 10from pydantic import BaseModel, Field 11 12from airbyte import cloud, get_destination, get_source 13from airbyte._util import api_util 14from airbyte.cloud.connectors import CustomCloudSourceDefinition 15from airbyte.cloud.constants import FAILED_STATUSES 16from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace 17from airbyte.constants import ( 18 MCP_CONFIG_API_URL, 19 MCP_CONFIG_BEARER_TOKEN, 20 MCP_CONFIG_CLIENT_ID, 21 MCP_CONFIG_CLIENT_SECRET, 22 MCP_CONFIG_WORKSPACE_ID, 23) 24from airbyte.destinations.util import get_noop_destination 25from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError 26from airbyte.mcp._arg_resolvers import resolve_connector_config, resolve_list_of_strings 27from airbyte.mcp._tool_utils import ( 28 AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET, 29 check_guid_created_in_session, 30 register_guid_created_in_session, 31) 32from airbyte.secrets import SecretString 33 34 35CLOUD_AUTH_TIP_TEXT = ( 36 "By default, the `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, " 37 "and `AIRBYTE_CLOUD_WORKSPACE_ID` environment variables " 38 "will be used to authenticate with the Airbyte Cloud API." 39) 40WORKSPACE_ID_TIP_TEXT = "Workspace ID. Defaults to `AIRBYTE_CLOUD_WORKSPACE_ID` env var." 41 42 43class CloudSourceResult(BaseModel): 44 """Information about a deployed source connector in Airbyte Cloud.""" 45 46 id: str 47 """The source ID.""" 48 name: str 49 """Display name of the source.""" 50 url: str 51 """Web URL for managing this source in Airbyte Cloud.""" 52 53 54class CloudDestinationResult(BaseModel): 55 """Information about a deployed destination connector in Airbyte Cloud.""" 56 57 id: str 58 """The destination ID.""" 59 name: str 60 """Display name of the destination.""" 61 url: str 62 """Web URL for managing this destination in Airbyte Cloud.""" 63 64 65class CloudConnectionResult(BaseModel): 66 """Information about a deployed connection in Airbyte Cloud.""" 67 68 id: str 69 """The connection ID.""" 70 name: str 71 """Display name of the connection.""" 72 url: str 73 """Web URL for managing this connection in Airbyte Cloud.""" 74 source_id: str 75 """ID of the source used by this connection.""" 76 destination_id: str 77 """ID of the destination used by this connection.""" 78 last_job_status: str | None = None 79 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 80 Only populated when with_connection_status=True.""" 81 last_job_id: int | None = None 82 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 83 last_job_time: str | None = None 84 """ISO 8601 timestamp of the most recent completed sync. 85 Only populated when with_connection_status=True.""" 86 currently_running_job_id: int | None = None 87 """Job ID of a currently running sync, if any. 88 Only populated when with_connection_status=True.""" 89 currently_running_job_start_time: str | None = None 90 """ISO 8601 timestamp of when the currently running sync started. 91 Only populated when with_connection_status=True.""" 92 93 94class CloudSourceDetails(BaseModel): 95 """Detailed information about a deployed source connector in Airbyte Cloud.""" 96 97 source_id: str 98 """The source ID.""" 99 source_name: str 100 """Display name of the source.""" 101 source_url: str 102 """Web URL for managing this source in Airbyte Cloud.""" 103 connector_definition_id: str 104 """The connector definition ID (e.g., the ID for 'source-postgres').""" 105 106 107class CloudDestinationDetails(BaseModel): 108 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 109 110 destination_id: str 111 """The destination ID.""" 112 destination_name: str 113 """Display name of the destination.""" 114 destination_url: str 115 """Web URL for managing this destination in Airbyte Cloud.""" 116 connector_definition_id: str 117 """The connector definition ID (e.g., the ID for 'destination-snowflake').""" 118 119 120class CloudConnectionDetails(BaseModel): 121 """Detailed information about a deployed connection in Airbyte Cloud.""" 122 123 connection_id: str 124 """The connection ID.""" 125 connection_name: str 126 """Display name of the connection.""" 127 connection_url: str 128 """Web URL for managing this connection in Airbyte Cloud.""" 129 source_id: str 130 """ID of the source used by this connection.""" 131 source_name: str 132 """Display name of the source.""" 133 destination_id: str 134 """ID of the destination used by this connection.""" 135 destination_name: str 136 """Display name of the destination.""" 137 selected_streams: list[str] 138 """List of stream names selected for syncing.""" 139 table_prefix: str | None 140 """Table prefix applied when syncing to the destination.""" 141 142 143class CloudOrganizationResult(BaseModel): 144 """Information about an organization in Airbyte Cloud.""" 145 146 id: str 147 """The organization ID.""" 148 name: str 149 """Display name of the organization.""" 150 email: str 151 """Email associated with the organization.""" 152 payment_status: str | None = None 153 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 154 When 'disabled', syncs are blocked due to unpaid invoices.""" 155 subscription_status: str | None = None 156 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 157 'unsubscribed').""" 158 is_account_locked: bool = False 159 """Whether the account is locked due to billing issues. 160 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 161 Defaults to False unless we have affirmative evidence of a locked state.""" 162 163 164class CloudWorkspaceResult(BaseModel): 165 """Information about a workspace in Airbyte Cloud.""" 166 167 workspace_id: str 168 """The workspace ID.""" 169 workspace_name: str 170 """Display name of the workspace.""" 171 workspace_url: str | None = None 172 """URL to access the workspace in Airbyte Cloud.""" 173 organization_id: str 174 """ID of the organization (requires ORGANIZATION_READER permission).""" 175 organization_name: str | None = None 176 """Name of the organization (requires ORGANIZATION_READER permission).""" 177 payment_status: str | None = None 178 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 179 When 'disabled', syncs are blocked due to unpaid invoices. 180 Requires ORGANIZATION_READER permission.""" 181 subscription_status: str | None = None 182 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 183 'unsubscribed'). Requires ORGANIZATION_READER permission.""" 184 is_account_locked: bool = False 185 """Whether the account is locked due to billing issues. 186 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 187 Defaults to False unless we have affirmative evidence of a locked state. 188 Requires ORGANIZATION_READER permission.""" 189 190 191class LogReadResult(BaseModel): 192 """Result of reading sync logs with pagination support.""" 193 194 job_id: int 195 """The job ID the logs belong to.""" 196 attempt_number: int 197 """The attempt number the logs belong to.""" 198 log_text: str 199 """The string containing the log text we are returning.""" 200 log_text_start_line: int 201 """1-based line index of the first line returned.""" 202 log_text_line_count: int 203 """Count of lines we are returning.""" 204 total_log_lines_available: int 205 """Total number of log lines available, shows if any lines were missed due to the limit.""" 206 207 208class SyncJobResult(BaseModel): 209 """Information about a sync job.""" 210 211 job_id: int 212 """The job ID.""" 213 status: str 214 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 215 bytes_synced: int 216 """Number of bytes synced in this job.""" 217 records_synced: int 218 """Number of records synced in this job.""" 219 start_time: str 220 """ISO 8601 timestamp of when the job started.""" 221 job_url: str 222 """URL to view the job in Airbyte Cloud.""" 223 224 225class SyncJobListResult(BaseModel): 226 """Result of listing sync jobs with pagination support.""" 227 228 jobs: list[SyncJobResult] 229 """List of sync jobs.""" 230 jobs_count: int 231 """Number of jobs returned in this response.""" 232 jobs_offset: int 233 """Offset used for this request (0 if not specified).""" 234 from_tail: bool 235 """Whether jobs are ordered newest-first (True) or oldest-first (False).""" 236 237 238def _get_cloud_workspace( 239 ctx: Context, 240 workspace_id: str | None = None, 241) -> CloudWorkspace: 242 """Get an authenticated CloudWorkspace. 243 244 Resolves credentials from multiple sources via MCP config args in order: 245 1. HTTP headers (when running as MCP server with HTTP/SSE transport) 246 2. Environment variables 247 248 The ctx parameter provides access to MCP config values that are resolved 249 from HTTP headers or environment variables based on the config args 250 defined in server.py. 251 """ 252 resolved_workspace_id = workspace_id or get_mcp_config(ctx, MCP_CONFIG_WORKSPACE_ID) 253 if not resolved_workspace_id: 254 raise PyAirbyteInputError( 255 message="Workspace ID is required but not provided.", 256 guidance="Set AIRBYTE_CLOUD_WORKSPACE_ID env var or pass workspace_id parameter.", 257 ) 258 259 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 260 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 261 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 262 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 263 264 return CloudWorkspace( 265 workspace_id=resolved_workspace_id, 266 client_id=SecretString(client_id) if client_id else None, 267 client_secret=SecretString(client_secret) if client_secret else None, 268 bearer_token=SecretString(bearer_token) if bearer_token else None, 269 api_root=api_url, 270 ) 271 272 273@mcp_tool( 274 open_world=True, 275 extra_help_text=CLOUD_AUTH_TIP_TEXT, 276) 277def deploy_source_to_cloud( 278 ctx: Context, 279 source_name: Annotated[ 280 str, 281 Field(description="The name to use when deploying the source."), 282 ], 283 source_connector_name: Annotated[ 284 str, 285 Field(description="The name of the source connector (e.g., 'source-faker')."), 286 ], 287 *, 288 workspace_id: Annotated[ 289 str | None, 290 Field( 291 description=WORKSPACE_ID_TIP_TEXT, 292 default=None, 293 ), 294 ], 295 config: Annotated[ 296 dict | str | None, 297 Field( 298 description="The configuration for the source connector.", 299 default=None, 300 ), 301 ], 302 config_secret_name: Annotated[ 303 str | None, 304 Field( 305 description="The name of the secret containing the configuration.", 306 default=None, 307 ), 308 ], 309 unique: Annotated[ 310 bool, 311 Field( 312 description="Whether to require a unique name.", 313 default=True, 314 ), 315 ], 316) -> str: 317 """Deploy a source connector to Airbyte Cloud.""" 318 source = get_source( 319 source_connector_name, 320 no_executor=True, 321 ) 322 config_dict = resolve_connector_config( 323 config=config, 324 config_secret_name=config_secret_name, 325 config_spec_jsonschema=source.config_spec, 326 ) 327 source.set_config(config_dict, validate=True) 328 329 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 330 deployed_source = workspace.deploy_source( 331 name=source_name, 332 source=source, 333 unique=unique, 334 ) 335 336 register_guid_created_in_session(deployed_source.connector_id) 337 return ( 338 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 339 f" and URL: {deployed_source.connector_url}" 340 ) 341 342 343@mcp_tool( 344 open_world=True, 345 extra_help_text=CLOUD_AUTH_TIP_TEXT, 346) 347def deploy_destination_to_cloud( 348 ctx: Context, 349 destination_name: Annotated[ 350 str, 351 Field(description="The name to use when deploying the destination."), 352 ], 353 destination_connector_name: Annotated[ 354 str, 355 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 356 ], 357 *, 358 workspace_id: Annotated[ 359 str | None, 360 Field( 361 description=WORKSPACE_ID_TIP_TEXT, 362 default=None, 363 ), 364 ], 365 config: Annotated[ 366 dict | str | None, 367 Field( 368 description="The configuration for the destination connector.", 369 default=None, 370 ), 371 ], 372 config_secret_name: Annotated[ 373 str | None, 374 Field( 375 description="The name of the secret containing the configuration.", 376 default=None, 377 ), 378 ], 379 unique: Annotated[ 380 bool, 381 Field( 382 description="Whether to require a unique name.", 383 default=True, 384 ), 385 ], 386) -> str: 387 """Deploy a destination connector to Airbyte Cloud.""" 388 destination = get_destination( 389 destination_connector_name, 390 no_executor=True, 391 ) 392 config_dict = resolve_connector_config( 393 config=config, 394 config_secret_name=config_secret_name, 395 config_spec_jsonschema=destination.config_spec, 396 ) 397 destination.set_config(config_dict, validate=True) 398 399 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 400 deployed_destination = workspace.deploy_destination( 401 name=destination_name, 402 destination=destination, 403 unique=unique, 404 ) 405 406 register_guid_created_in_session(deployed_destination.connector_id) 407 return ( 408 f"Successfully deployed destination '{destination_name}' " 409 f"with ID: {deployed_destination.connector_id}" 410 ) 411 412 413@mcp_tool( 414 open_world=True, 415 extra_help_text=CLOUD_AUTH_TIP_TEXT, 416) 417def create_connection_on_cloud( 418 ctx: Context, 419 connection_name: Annotated[ 420 str, 421 Field(description="The name of the connection."), 422 ], 423 source_id: Annotated[ 424 str, 425 Field(description="The ID of the deployed source."), 426 ], 427 destination_id: Annotated[ 428 str, 429 Field(description="The ID of the deployed destination."), 430 ], 431 selected_streams: Annotated[ 432 str | list[str], 433 Field( 434 description=( 435 "The selected stream names to sync within the connection. " 436 "Must be an explicit stream name or list of streams. " 437 "Cannot be empty or '*'." 438 ) 439 ), 440 ], 441 *, 442 workspace_id: Annotated[ 443 str | None, 444 Field( 445 description=WORKSPACE_ID_TIP_TEXT, 446 default=None, 447 ), 448 ], 449 table_prefix: Annotated[ 450 str | None, 451 Field( 452 description="Optional table prefix to use when syncing to the destination.", 453 default=None, 454 ), 455 ], 456) -> str: 457 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 458 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 459 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 460 deployed_connection = workspace.deploy_connection( 461 connection_name=connection_name, 462 source=source_id, 463 destination=destination_id, 464 selected_streams=resolved_streams_list, 465 table_prefix=table_prefix, 466 ) 467 468 register_guid_created_in_session(deployed_connection.connection_id) 469 return ( 470 f"Successfully created connection '{connection_name}' " 471 f"with ID '{deployed_connection.connection_id}' and " 472 f"URL: {deployed_connection.connection_url}" 473 ) 474 475 476@mcp_tool( 477 open_world=True, 478 extra_help_text=CLOUD_AUTH_TIP_TEXT, 479) 480def run_cloud_sync( 481 ctx: Context, 482 connection_id: Annotated[ 483 str, 484 Field(description="The ID of the Airbyte Cloud connection."), 485 ], 486 *, 487 workspace_id: Annotated[ 488 str | None, 489 Field( 490 description=WORKSPACE_ID_TIP_TEXT, 491 default=None, 492 ), 493 ], 494 wait: Annotated[ 495 bool, 496 Field( 497 description=( 498 "Whether to wait for the sync to complete. Since a sync can take between several " 499 "minutes and several hours, this option is not recommended for most " 500 "scenarios." 501 ), 502 default=False, 503 ), 504 ], 505 wait_timeout: Annotated[ 506 int, 507 Field( 508 description="Maximum time to wait for sync completion (seconds).", 509 default=300, 510 ), 511 ], 512) -> str: 513 """Run a sync job on Airbyte Cloud.""" 514 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 515 connection = workspace.get_connection(connection_id=connection_id) 516 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 517 518 if wait: 519 status = sync_result.get_job_status() 520 return ( 521 f"Sync completed with status: {status}. " 522 f"Job ID is '{sync_result.job_id}' and " 523 f"job URL is: {sync_result.job_url}" 524 ) 525 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" 526 527 528@mcp_tool( 529 read_only=True, 530 idempotent=True, 531 open_world=True, 532 extra_help_text=CLOUD_AUTH_TIP_TEXT, 533) 534def check_airbyte_cloud_workspace( 535 ctx: Context, 536 *, 537 workspace_id: Annotated[ 538 str | None, 539 Field( 540 description=WORKSPACE_ID_TIP_TEXT, 541 default=None, 542 ), 543 ], 544) -> CloudWorkspaceResult: 545 """Check if we have a valid Airbyte Cloud connection and return workspace info. 546 547 Returns workspace details including workspace ID, name, organization info, and billing status. 548 """ 549 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 550 551 # Get workspace details from the public API using workspace's credentials 552 workspace_response = api_util.get_workspace( 553 workspace_id=workspace.workspace_id, 554 api_root=workspace.api_root, 555 client_id=workspace.client_id, 556 client_secret=workspace.client_secret, 557 bearer_token=workspace.bearer_token, 558 ) 559 560 # Try to get organization info (including billing), but fail gracefully if we don't have 561 # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the 562 # organization, which may not be available with workspace-scoped credentials. 563 organization = workspace.get_organization(raise_on_error=False) 564 565 return CloudWorkspaceResult( 566 workspace_id=workspace_response.workspace_id, 567 workspace_name=workspace_response.name, 568 workspace_url=workspace.workspace_url, 569 organization_id=( 570 organization.organization_id 571 if organization 572 else "[unavailable - requires ORGANIZATION_READER permission]" 573 ), 574 organization_name=organization.organization_name if organization else None, 575 payment_status=organization.payment_status if organization else None, 576 subscription_status=organization.subscription_status if organization else None, 577 is_account_locked=organization.is_account_locked if organization else False, 578 ) 579 580 581@mcp_tool( 582 open_world=True, 583 extra_help_text=CLOUD_AUTH_TIP_TEXT, 584) 585def deploy_noop_destination_to_cloud( 586 ctx: Context, 587 name: str = "No-op Destination", 588 *, 589 workspace_id: Annotated[ 590 str | None, 591 Field( 592 description=WORKSPACE_ID_TIP_TEXT, 593 default=None, 594 ), 595 ], 596 unique: bool = True, 597) -> str: 598 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 599 destination = get_noop_destination() 600 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 601 deployed_destination = workspace.deploy_destination( 602 name=name, 603 destination=destination, 604 unique=unique, 605 ) 606 register_guid_created_in_session(deployed_destination.connector_id) 607 return ( 608 f"Successfully deployed No-op Destination " 609 f"with ID '{deployed_destination.connector_id}' and " 610 f"URL: {deployed_destination.connector_url}" 611 ) 612 613 614@mcp_tool( 615 read_only=True, 616 idempotent=True, 617 open_world=True, 618 extra_help_text=CLOUD_AUTH_TIP_TEXT, 619) 620def get_cloud_sync_status( 621 ctx: Context, 622 connection_id: Annotated[ 623 str, 624 Field( 625 description="The ID of the Airbyte Cloud connection.", 626 ), 627 ], 628 job_id: Annotated[ 629 int | None, 630 Field( 631 description="Optional job ID. If not provided, the latest job will be used.", 632 default=None, 633 ), 634 ], 635 *, 636 workspace_id: Annotated[ 637 str | None, 638 Field( 639 description=WORKSPACE_ID_TIP_TEXT, 640 default=None, 641 ), 642 ], 643 include_attempts: Annotated[ 644 bool, 645 Field( 646 description="Whether to include detailed attempts information.", 647 default=False, 648 ), 649 ], 650) -> dict[str, Any]: 651 """Get the status of a sync job from the Airbyte Cloud.""" 652 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 653 connection = workspace.get_connection(connection_id=connection_id) 654 655 # If a job ID is provided, get the job by ID. 656 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 657 658 if not sync_result: 659 return {"status": None, "job_id": None, "attempts": []} 660 661 result = { 662 "status": sync_result.get_job_status(), 663 "job_id": sync_result.job_id, 664 "bytes_synced": sync_result.bytes_synced, 665 "records_synced": sync_result.records_synced, 666 "start_time": sync_result.start_time.isoformat(), 667 "job_url": sync_result.job_url, 668 "attempts": [], 669 } 670 671 if include_attempts: 672 attempts = sync_result.get_attempts() 673 result["attempts"] = [ 674 { 675 "attempt_number": attempt.attempt_number, 676 "attempt_id": attempt.attempt_id, 677 "status": attempt.status, 678 "bytes_synced": attempt.bytes_synced, 679 "records_synced": attempt.records_synced, 680 "created_at": attempt.created_at.isoformat(), 681 } 682 for attempt in attempts 683 ] 684 685 return result 686 687 688@mcp_tool( 689 read_only=True, 690 idempotent=True, 691 open_world=True, 692 extra_help_text=CLOUD_AUTH_TIP_TEXT, 693) 694def list_cloud_sync_jobs( 695 ctx: Context, 696 connection_id: Annotated[ 697 str, 698 Field(description="The ID of the Airbyte Cloud connection."), 699 ], 700 *, 701 workspace_id: Annotated[ 702 str | None, 703 Field( 704 description=WORKSPACE_ID_TIP_TEXT, 705 default=None, 706 ), 707 ], 708 max_jobs: Annotated[ 709 int, 710 Field( 711 description=( 712 "Maximum number of jobs to return. " 713 "Defaults to 20 if not specified. " 714 "Maximum allowed value is 500." 715 ), 716 default=20, 717 ), 718 ], 719 from_tail: Annotated[ 720 bool | None, 721 Field( 722 description=( 723 "When True, jobs are ordered newest-first (createdAt DESC). " 724 "When False, jobs are ordered oldest-first (createdAt ASC). " 725 "Defaults to True if `jobs_offset` is not specified. " 726 "Cannot combine `from_tail=True` with `jobs_offset`." 727 ), 728 default=None, 729 ), 730 ], 731 jobs_offset: Annotated[ 732 int | None, 733 Field( 734 description=( 735 "Number of jobs to skip from the beginning. " 736 "Cannot be combined with `from_tail=True`." 737 ), 738 default=None, 739 ), 740 ], 741 job_type: Annotated[ 742 JobTypeEnum | None, 743 Field( 744 description=( 745 "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. " 746 "If not specified, defaults to sync and reset jobs only (API default). " 747 "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs." 748 ), 749 default=None, 750 ), 751 ], 752) -> SyncJobListResult: 753 """List sync jobs for a connection with pagination support. 754 755 This tool allows you to retrieve a list of sync jobs for a connection, 756 with control over ordering and pagination. By default, jobs are returned 757 newest-first (from_tail=True). 758 """ 759 # Validate that jobs_offset and from_tail are not both set 760 if jobs_offset is not None and from_tail is True: 761 raise PyAirbyteInputError( 762 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 763 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 764 ) 765 766 # Default to from_tail=True if neither is specified 767 if from_tail is None and jobs_offset is None: 768 from_tail = True 769 elif from_tail is None: 770 from_tail = False 771 772 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 773 connection = workspace.get_connection(connection_id=connection_id) 774 775 # Cap at 500 to avoid overloading agent context 776 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 777 778 sync_results = connection.get_previous_sync_logs( 779 limit=effective_limit, 780 offset=jobs_offset, 781 from_tail=from_tail, 782 job_type=job_type, 783 ) 784 785 jobs = [ 786 SyncJobResult( 787 job_id=sync_result.job_id, 788 status=str(sync_result.get_job_status()), 789 bytes_synced=sync_result.bytes_synced, 790 records_synced=sync_result.records_synced, 791 start_time=sync_result.start_time.isoformat(), 792 job_url=sync_result.job_url, 793 ) 794 for sync_result in sync_results 795 ] 796 797 return SyncJobListResult( 798 jobs=jobs, 799 jobs_count=len(jobs), 800 jobs_offset=jobs_offset or 0, 801 from_tail=from_tail, 802 ) 803 804 805@mcp_tool( 806 read_only=True, 807 idempotent=True, 808 open_world=True, 809 extra_help_text=CLOUD_AUTH_TIP_TEXT, 810) 811def list_deployed_cloud_source_connectors( 812 ctx: Context, 813 *, 814 workspace_id: Annotated[ 815 str | None, 816 Field( 817 description=WORKSPACE_ID_TIP_TEXT, 818 default=None, 819 ), 820 ], 821 name_contains: Annotated[ 822 str | None, 823 Field( 824 description="Optional case-insensitive substring to filter sources by name", 825 default=None, 826 ), 827 ], 828 max_items_limit: Annotated[ 829 int | None, 830 Field( 831 description="Optional maximum number of items to return (default: no limit)", 832 default=None, 833 ), 834 ], 835) -> list[CloudSourceResult]: 836 """List all deployed source connectors in the Airbyte Cloud workspace.""" 837 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 838 sources = workspace.list_sources() 839 840 # Filter by name if requested 841 if name_contains: 842 needle = name_contains.lower() 843 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 844 845 # Apply limit if requested 846 if max_items_limit is not None: 847 sources = sources[:max_items_limit] 848 849 # Note: name and url are guaranteed non-null from list API responses 850 return [ 851 CloudSourceResult( 852 id=source.source_id, 853 name=cast(str, source.name), 854 url=cast(str, source.connector_url), 855 ) 856 for source in sources 857 ] 858 859 860@mcp_tool( 861 read_only=True, 862 idempotent=True, 863 open_world=True, 864 extra_help_text=CLOUD_AUTH_TIP_TEXT, 865) 866def list_deployed_cloud_destination_connectors( 867 ctx: Context, 868 *, 869 workspace_id: Annotated[ 870 str | None, 871 Field( 872 description=WORKSPACE_ID_TIP_TEXT, 873 default=None, 874 ), 875 ], 876 name_contains: Annotated[ 877 str | None, 878 Field( 879 description="Optional case-insensitive substring to filter destinations by name", 880 default=None, 881 ), 882 ], 883 max_items_limit: Annotated[ 884 int | None, 885 Field( 886 description="Optional maximum number of items to return (default: no limit)", 887 default=None, 888 ), 889 ], 890) -> list[CloudDestinationResult]: 891 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 892 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 893 destinations = workspace.list_destinations() 894 895 # Filter by name if requested 896 if name_contains: 897 needle = name_contains.lower() 898 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 899 900 # Apply limit if requested 901 if max_items_limit is not None: 902 destinations = destinations[:max_items_limit] 903 904 # Note: name and url are guaranteed non-null from list API responses 905 return [ 906 CloudDestinationResult( 907 id=destination.destination_id, 908 name=cast(str, destination.name), 909 url=cast(str, destination.connector_url), 910 ) 911 for destination in destinations 912 ] 913 914 915@mcp_tool( 916 read_only=True, 917 idempotent=True, 918 open_world=True, 919 extra_help_text=CLOUD_AUTH_TIP_TEXT, 920) 921def describe_cloud_source( 922 ctx: Context, 923 source_id: Annotated[ 924 str, 925 Field(description="The ID of the source to describe."), 926 ], 927 *, 928 workspace_id: Annotated[ 929 str | None, 930 Field( 931 description=WORKSPACE_ID_TIP_TEXT, 932 default=None, 933 ), 934 ], 935) -> CloudSourceDetails: 936 """Get detailed information about a specific deployed source connector.""" 937 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 938 source = workspace.get_source(source_id=source_id) 939 940 # Access name property to ensure _connector_info is populated 941 source_name = cast(str, source.name) 942 943 return CloudSourceDetails( 944 source_id=source.source_id, 945 source_name=source_name, 946 source_url=source.connector_url, 947 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 948 ) 949 950 951@mcp_tool( 952 read_only=True, 953 idempotent=True, 954 open_world=True, 955 extra_help_text=CLOUD_AUTH_TIP_TEXT, 956) 957def describe_cloud_destination( 958 ctx: Context, 959 destination_id: Annotated[ 960 str, 961 Field(description="The ID of the destination to describe."), 962 ], 963 *, 964 workspace_id: Annotated[ 965 str | None, 966 Field( 967 description=WORKSPACE_ID_TIP_TEXT, 968 default=None, 969 ), 970 ], 971) -> CloudDestinationDetails: 972 """Get detailed information about a specific deployed destination connector.""" 973 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 974 destination = workspace.get_destination(destination_id=destination_id) 975 976 # Access name property to ensure _connector_info is populated 977 destination_name = cast(str, destination.name) 978 979 return CloudDestinationDetails( 980 destination_id=destination.destination_id, 981 destination_name=destination_name, 982 destination_url=destination.connector_url, 983 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 984 ) 985 986 987@mcp_tool( 988 read_only=True, 989 idempotent=True, 990 open_world=True, 991 extra_help_text=CLOUD_AUTH_TIP_TEXT, 992) 993def describe_cloud_connection( 994 ctx: Context, 995 connection_id: Annotated[ 996 str, 997 Field(description="The ID of the connection to describe."), 998 ], 999 *, 1000 workspace_id: Annotated[ 1001 str | None, 1002 Field( 1003 description=WORKSPACE_ID_TIP_TEXT, 1004 default=None, 1005 ), 1006 ], 1007) -> CloudConnectionDetails: 1008 """Get detailed information about a specific deployed connection.""" 1009 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1010 connection = workspace.get_connection(connection_id=connection_id) 1011 1012 return CloudConnectionDetails( 1013 connection_id=connection.connection_id, 1014 connection_name=cast(str, connection.name), 1015 connection_url=cast(str, connection.connection_url), 1016 source_id=connection.source_id, 1017 source_name=cast(str, connection.source.name), 1018 destination_id=connection.destination_id, 1019 destination_name=cast(str, connection.destination.name), 1020 selected_streams=connection.stream_names, 1021 table_prefix=connection.table_prefix, 1022 ) 1023 1024 1025@mcp_tool( 1026 read_only=True, 1027 idempotent=True, 1028 open_world=True, 1029 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1030) 1031def get_cloud_sync_logs( 1032 ctx: Context, 1033 connection_id: Annotated[ 1034 str, 1035 Field(description="The ID of the Airbyte Cloud connection."), 1036 ], 1037 job_id: Annotated[ 1038 int | None, 1039 Field(description="Optional job ID. If not provided, the latest job will be used."), 1040 ] = None, 1041 attempt_number: Annotated[ 1042 int | None, 1043 Field( 1044 description="Optional attempt number. If not provided, the latest attempt will be used." 1045 ), 1046 ] = None, 1047 *, 1048 workspace_id: Annotated[ 1049 str | None, 1050 Field( 1051 description=WORKSPACE_ID_TIP_TEXT, 1052 default=None, 1053 ), 1054 ], 1055 max_lines: Annotated[ 1056 int, 1057 Field( 1058 description=( 1059 "Maximum number of lines to return. " 1060 "Defaults to 4000 if not specified. " 1061 "If '0' is provided, no limit is applied." 1062 ), 1063 default=4000, 1064 ), 1065 ], 1066 from_tail: Annotated[ 1067 bool | None, 1068 Field( 1069 description=( 1070 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1071 "Defaults to True if `line_offset` is not specified. " 1072 "Cannot combine `from_tail=True` with `line_offset`." 1073 ), 1074 default=None, 1075 ), 1076 ], 1077 line_offset: Annotated[ 1078 int | None, 1079 Field( 1080 description=( 1081 "Number of lines to skip from the beginning of the logs. " 1082 "Cannot be combined with `from_tail=True`." 1083 ), 1084 default=None, 1085 ), 1086 ], 1087) -> LogReadResult: 1088 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1089 # Validate that line_offset and from_tail are not both set 1090 if line_offset is not None and from_tail: 1091 raise PyAirbyteInputError( 1092 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1093 context={"line_offset": line_offset, "from_tail": from_tail}, 1094 ) 1095 1096 if from_tail is None and line_offset is None: 1097 from_tail = True 1098 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1099 connection = workspace.get_connection(connection_id=connection_id) 1100 1101 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1102 1103 if not sync_result: 1104 raise AirbyteMissingResourceError( 1105 resource_type="sync job", 1106 resource_name_or_id=connection_id, 1107 ) 1108 1109 attempts = sync_result.get_attempts() 1110 1111 if not attempts: 1112 raise AirbyteMissingResourceError( 1113 resource_type="sync attempt", 1114 resource_name_or_id=str(sync_result.job_id), 1115 ) 1116 1117 if attempt_number is not None: 1118 target_attempt = None 1119 for attempt in attempts: 1120 if attempt.attempt_number == attempt_number: 1121 target_attempt = attempt 1122 break 1123 1124 if target_attempt is None: 1125 raise AirbyteMissingResourceError( 1126 resource_type="sync attempt", 1127 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1128 ) 1129 else: 1130 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1131 1132 logs = target_attempt.get_full_log_text() 1133 1134 if not logs: 1135 # Return empty result with zero lines 1136 return LogReadResult( 1137 log_text=( 1138 f"[No logs available for job '{sync_result.job_id}', " 1139 f"attempt {target_attempt.attempt_number}.]" 1140 ), 1141 log_text_start_line=1, 1142 log_text_line_count=0, 1143 total_log_lines_available=0, 1144 job_id=sync_result.job_id, 1145 attempt_number=target_attempt.attempt_number, 1146 ) 1147 1148 # Apply line limiting 1149 log_lines = logs.splitlines() 1150 total_lines = len(log_lines) 1151 1152 # Determine effective max_lines (0 means no limit) 1153 effective_max = total_lines if max_lines == 0 else max_lines 1154 1155 # Calculate start_index and slice based on from_tail or line_offset 1156 if from_tail: 1157 start_index = max(0, total_lines - effective_max) 1158 selected_lines = log_lines[start_index:][:effective_max] 1159 else: 1160 start_index = line_offset or 0 1161 selected_lines = log_lines[start_index : start_index + effective_max] 1162 1163 return LogReadResult( 1164 log_text="\n".join(selected_lines), 1165 log_text_start_line=start_index + 1, # Convert to 1-based index 1166 log_text_line_count=len(selected_lines), 1167 total_log_lines_available=total_lines, 1168 job_id=sync_result.job_id, 1169 attempt_number=target_attempt.attempt_number, 1170 ) 1171 1172 1173@mcp_tool( 1174 read_only=True, 1175 idempotent=True, 1176 open_world=True, 1177 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1178) 1179def list_deployed_cloud_connections( 1180 ctx: Context, 1181 *, 1182 workspace_id: Annotated[ 1183 str | None, 1184 Field( 1185 description=WORKSPACE_ID_TIP_TEXT, 1186 default=None, 1187 ), 1188 ], 1189 name_contains: Annotated[ 1190 str | None, 1191 Field( 1192 description="Optional case-insensitive substring to filter connections by name", 1193 default=None, 1194 ), 1195 ], 1196 max_items_limit: Annotated[ 1197 int | None, 1198 Field( 1199 description="Optional maximum number of items to return (default: no limit)", 1200 default=None, 1201 ), 1202 ], 1203 with_connection_status: Annotated[ 1204 bool | None, 1205 Field( 1206 description="If True, include status info for each connection's most recent sync job", 1207 default=False, 1208 ), 1209 ], 1210 failing_connections_only: Annotated[ 1211 bool | None, 1212 Field( 1213 description="If True, only return connections with failed/cancelled last sync", 1214 default=False, 1215 ), 1216 ], 1217) -> list[CloudConnectionResult]: 1218 """List all deployed connections in the Airbyte Cloud workspace. 1219 1220 When with_connection_status is True, each connection result will include 1221 information about the most recent sync job status, skipping over any 1222 currently in-progress syncs to find the last completed job. 1223 1224 When failing_connections_only is True, only connections where the most 1225 recent completed sync job failed or was cancelled will be returned. 1226 This implicitly enables with_connection_status. 1227 """ 1228 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1229 connections = workspace.list_connections() 1230 1231 # Filter by name if requested 1232 if name_contains: 1233 needle = name_contains.lower() 1234 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1235 1236 # If failing_connections_only is True, implicitly enable with_connection_status 1237 if failing_connections_only: 1238 with_connection_status = True 1239 1240 results: list[CloudConnectionResult] = [] 1241 1242 for connection in connections: 1243 last_job_status: str | None = None 1244 last_job_id: int | None = None 1245 last_job_time: str | None = None 1246 currently_running_job_id: int | None = None 1247 currently_running_job_start_time: str | None = None 1248 1249 if with_connection_status: 1250 sync_logs = connection.get_previous_sync_logs(limit=5) 1251 last_completed_job_status = None # Keep enum for comparison 1252 1253 for sync_result in sync_logs: 1254 job_status = sync_result.get_job_status() 1255 1256 if not sync_result.is_job_complete(): 1257 currently_running_job_id = sync_result.job_id 1258 currently_running_job_start_time = sync_result.start_time.isoformat() 1259 continue 1260 1261 last_completed_job_status = job_status 1262 last_job_status = str(job_status.value) if job_status else None 1263 last_job_id = sync_result.job_id 1264 last_job_time = sync_result.start_time.isoformat() 1265 break 1266 1267 if failing_connections_only and ( 1268 last_completed_job_status is None 1269 or last_completed_job_status not in FAILED_STATUSES 1270 ): 1271 continue 1272 1273 results.append( 1274 CloudConnectionResult( 1275 id=connection.connection_id, 1276 name=cast(str, connection.name), 1277 url=cast(str, connection.connection_url), 1278 source_id=connection.source_id, 1279 destination_id=connection.destination_id, 1280 last_job_status=last_job_status, 1281 last_job_id=last_job_id, 1282 last_job_time=last_job_time, 1283 currently_running_job_id=currently_running_job_id, 1284 currently_running_job_start_time=currently_running_job_start_time, 1285 ) 1286 ) 1287 1288 if max_items_limit is not None and len(results) >= max_items_limit: 1289 break 1290 1291 return results 1292 1293 1294def _resolve_organization( 1295 organization_id: str | None, 1296 organization_name: str | None, 1297 *, 1298 api_root: str, 1299 client_id: SecretString | None, 1300 client_secret: SecretString | None, 1301 bearer_token: SecretString | None = None, 1302) -> CloudOrganization: 1303 """Resolve organization from either ID or exact name match. 1304 1305 Args: 1306 organization_id: The organization ID (if provided directly) 1307 organization_name: The organization name (exact match required) 1308 api_root: The API root URL 1309 client_id: OAuth client ID (optional if bearer_token is provided) 1310 client_secret: OAuth client secret (optional if bearer_token is provided) 1311 bearer_token: Bearer token for authentication (optional if client credentials provided) 1312 1313 Returns: 1314 A CloudOrganization object with credentials for lazy loading of billing info. 1315 1316 Raises: 1317 PyAirbyteInputError: If neither or both parameters are provided, 1318 or if no organization matches the exact name 1319 AirbyteMissingResourceError: If the organization is not found 1320 """ 1321 if organization_id and organization_name: 1322 raise PyAirbyteInputError( 1323 message="Provide either 'organization_id' or 'organization_name', not both." 1324 ) 1325 if not organization_id and not organization_name: 1326 raise PyAirbyteInputError( 1327 message="Either 'organization_id' or 'organization_name' must be provided." 1328 ) 1329 1330 # Get all organizations for the user 1331 orgs = api_util.list_organizations_for_user( 1332 api_root=api_root, 1333 client_id=client_id, 1334 client_secret=client_secret, 1335 bearer_token=bearer_token, 1336 ) 1337 1338 org_response: api_util.models.OrganizationResponse | None = None 1339 1340 if organization_id: 1341 # Find by ID 1342 matching_orgs = [org for org in orgs if org.organization_id == organization_id] 1343 if not matching_orgs: 1344 raise AirbyteMissingResourceError( 1345 resource_type="organization", 1346 context={ 1347 "organization_id": organization_id, 1348 "message": f"No organization found with ID '{organization_id}' " 1349 "for the current user.", 1350 }, 1351 ) 1352 org_response = matching_orgs[0] 1353 else: 1354 # Find by exact name match (case-sensitive) 1355 matching_orgs = [org for org in orgs if org.organization_name == organization_name] 1356 1357 if not matching_orgs: 1358 raise AirbyteMissingResourceError( 1359 resource_type="organization", 1360 context={ 1361 "organization_name": organization_name, 1362 "message": f"No organization found with exact name '{organization_name}' " 1363 "for the current user.", 1364 }, 1365 ) 1366 1367 if len(matching_orgs) > 1: 1368 raise PyAirbyteInputError( 1369 message=f"Multiple organizations found with name '{organization_name}'. " 1370 "Please use 'organization_id' instead to specify the exact organization." 1371 ) 1372 1373 org_response = matching_orgs[0] 1374 1375 # Return a CloudOrganization with credentials for lazy loading of billing info 1376 return CloudOrganization( 1377 organization_id=org_response.organization_id, 1378 organization_name=org_response.organization_name, 1379 email=org_response.email, 1380 api_root=api_root, 1381 client_id=client_id, 1382 client_secret=client_secret, 1383 bearer_token=bearer_token, 1384 ) 1385 1386 1387def _resolve_organization_id( 1388 organization_id: str | None, 1389 organization_name: str | None, 1390 *, 1391 api_root: str, 1392 client_id: SecretString | None, 1393 client_secret: SecretString | None, 1394 bearer_token: SecretString | None = None, 1395) -> str: 1396 """Resolve organization ID from either ID or exact name match. 1397 1398 This is a convenience wrapper around _resolve_organization that returns just the ID. 1399 """ 1400 org = _resolve_organization( 1401 organization_id=organization_id, 1402 organization_name=organization_name, 1403 api_root=api_root, 1404 client_id=client_id, 1405 client_secret=client_secret, 1406 bearer_token=bearer_token, 1407 ) 1408 return org.organization_id 1409 1410 1411@mcp_tool( 1412 read_only=True, 1413 idempotent=True, 1414 open_world=True, 1415 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1416) 1417def list_cloud_workspaces( 1418 ctx: Context, 1419 *, 1420 organization_id: Annotated[ 1421 str | None, 1422 Field( 1423 description="Organization ID. Required if organization_name is not provided.", 1424 default=None, 1425 ), 1426 ], 1427 organization_name: Annotated[ 1428 str | None, 1429 Field( 1430 description=( 1431 "Organization name (exact match). " "Required if organization_id is not provided." 1432 ), 1433 default=None, 1434 ), 1435 ], 1436 name_contains: Annotated[ 1437 str | None, 1438 Field( 1439 description="Optional substring to filter workspaces by name (server-side filtering)", 1440 default=None, 1441 ), 1442 ], 1443 max_items_limit: Annotated[ 1444 int | None, 1445 Field( 1446 description="Optional maximum number of items to return (default: no limit)", 1447 default=None, 1448 ), 1449 ], 1450) -> list[CloudWorkspaceResult]: 1451 """List all workspaces in a specific organization. 1452 1453 Requires either organization_id OR organization_name (exact match) to be provided. 1454 This tool will NOT list workspaces across all organizations - you must specify 1455 which organization to list workspaces from. 1456 """ 1457 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1458 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1459 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1460 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1461 1462 resolved_org_id = _resolve_organization_id( 1463 organization_id=organization_id, 1464 organization_name=organization_name, 1465 api_root=api_url, 1466 client_id=SecretString(client_id) if client_id else None, 1467 client_secret=SecretString(client_secret) if client_secret else None, 1468 bearer_token=SecretString(bearer_token) if bearer_token else None, 1469 ) 1470 1471 workspaces = api_util.list_workspaces_in_organization( 1472 organization_id=resolved_org_id, 1473 api_root=api_url, 1474 client_id=SecretString(client_id) if client_id else None, 1475 client_secret=SecretString(client_secret) if client_secret else None, 1476 bearer_token=SecretString(bearer_token) if bearer_token else None, 1477 name_contains=name_contains, 1478 max_items_limit=max_items_limit, 1479 ) 1480 1481 return [ 1482 CloudWorkspaceResult( 1483 workspace_id=ws.get("workspaceId", ""), 1484 workspace_name=ws.get("name", ""), 1485 organization_id=ws.get("organizationId", ""), 1486 ) 1487 for ws in workspaces 1488 ] 1489 1490 1491@mcp_tool( 1492 read_only=True, 1493 idempotent=True, 1494 open_world=True, 1495 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1496) 1497def describe_cloud_organization( 1498 ctx: Context, 1499 *, 1500 organization_id: Annotated[ 1501 str | None, 1502 Field( 1503 description="Organization ID. Required if organization_name is not provided.", 1504 default=None, 1505 ), 1506 ], 1507 organization_name: Annotated[ 1508 str | None, 1509 Field( 1510 description=( 1511 "Organization name (exact match). " "Required if organization_id is not provided." 1512 ), 1513 default=None, 1514 ), 1515 ], 1516) -> CloudOrganizationResult: 1517 """Get details about a specific organization including billing status. 1518 1519 Requires either organization_id OR organization_name (exact match) to be provided. 1520 This tool is useful for looking up an organization's ID from its name, or vice versa. 1521 """ 1522 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1523 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1524 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1525 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1526 1527 org = _resolve_organization( 1528 organization_id=organization_id, 1529 organization_name=organization_name, 1530 api_root=api_url, 1531 client_id=SecretString(client_id) if client_id else None, 1532 client_secret=SecretString(client_secret) if client_secret else None, 1533 bearer_token=SecretString(bearer_token) if bearer_token else None, 1534 ) 1535 1536 # CloudOrganization has lazy loading of billing properties 1537 return CloudOrganizationResult( 1538 id=org.organization_id, 1539 name=org.organization_name, 1540 email=org.email, 1541 payment_status=org.payment_status, 1542 subscription_status=org.subscription_status, 1543 is_account_locked=org.is_account_locked, 1544 ) 1545 1546 1547def _get_custom_source_definition_description( 1548 custom_source: CustomCloudSourceDefinition, 1549) -> str: 1550 return "\n".join( 1551 [ 1552 f" - Custom Source Name: {custom_source.name}", 1553 f" - Definition ID: {custom_source.definition_id}", 1554 f" - Definition Version: {custom_source.version}", 1555 f" - Connector Builder Project ID: {custom_source.connector_builder_project_id}", 1556 f" - Connector Builder Project URL: {custom_source.connector_builder_project_url}", 1557 ] 1558 ) 1559 1560 1561@mcp_tool( 1562 open_world=True, 1563 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1564) 1565def publish_custom_source_definition( 1566 ctx: Context, 1567 name: Annotated[ 1568 str, 1569 Field(description="The name for the custom connector definition."), 1570 ], 1571 *, 1572 workspace_id: Annotated[ 1573 str | None, 1574 Field( 1575 description=WORKSPACE_ID_TIP_TEXT, 1576 default=None, 1577 ), 1578 ], 1579 manifest_yaml: Annotated[ 1580 str | Path | None, 1581 Field( 1582 description=( 1583 "The Low-code CDK manifest as a YAML string or file path. " 1584 "Required for YAML connectors." 1585 ), 1586 default=None, 1587 ), 1588 ] = None, 1589 unique: Annotated[ 1590 bool, 1591 Field( 1592 description="Whether to require a unique name.", 1593 default=True, 1594 ), 1595 ] = True, 1596 pre_validate: Annotated[ 1597 bool, 1598 Field( 1599 description="Whether to validate the manifest client-side before publishing.", 1600 default=True, 1601 ), 1602 ] = True, 1603 testing_values: Annotated[ 1604 dict | str | None, 1605 Field( 1606 description=( 1607 "Optional testing configuration values for the Builder UI. " 1608 "Can be provided as a JSON object or JSON string. " 1609 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1610 "If provided, these values replace any existing testing values " 1611 "for the connector builder project, allowing immediate test read operations." 1612 ), 1613 default=None, 1614 ), 1615 ], 1616 testing_values_secret_name: Annotated[ 1617 str | None, 1618 Field( 1619 description=( 1620 "Optional name of a secret containing testing configuration values " 1621 "in JSON or YAML format. The secret will be resolved by the MCP " 1622 "server and merged into testing_values, with secret values taking " 1623 "precedence. This lets the agent reference secrets without sending " 1624 "raw values as tool arguments." 1625 ), 1626 default=None, 1627 ), 1628 ], 1629) -> str: 1630 """Publish a custom YAML source connector definition to Airbyte Cloud. 1631 1632 Note: Only YAML (declarative) connectors are currently supported. 1633 Docker-based custom sources are not yet available. 1634 """ 1635 processed_manifest = manifest_yaml 1636 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1637 processed_manifest = Path(manifest_yaml) 1638 1639 # Resolve testing values from inline config and/or secret 1640 testing_values_dict: dict[str, Any] | None = None 1641 if testing_values is not None or testing_values_secret_name is not None: 1642 testing_values_dict = ( 1643 resolve_connector_config( 1644 config=testing_values, 1645 config_secret_name=testing_values_secret_name, 1646 ) 1647 or None 1648 ) 1649 1650 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1651 custom_source = workspace.publish_custom_source_definition( 1652 name=name, 1653 manifest_yaml=processed_manifest, 1654 unique=unique, 1655 pre_validate=pre_validate, 1656 testing_values=testing_values_dict, 1657 ) 1658 register_guid_created_in_session(custom_source.definition_id) 1659 return ( 1660 "Successfully published custom YAML source definition:\n" 1661 + _get_custom_source_definition_description( 1662 custom_source=custom_source, 1663 ) 1664 + "\n" 1665 ) 1666 1667 1668@mcp_tool( 1669 read_only=True, 1670 idempotent=True, 1671 open_world=True, 1672) 1673def list_custom_source_definitions( 1674 ctx: Context, 1675 *, 1676 workspace_id: Annotated[ 1677 str | None, 1678 Field( 1679 description=WORKSPACE_ID_TIP_TEXT, 1680 default=None, 1681 ), 1682 ], 1683) -> list[dict[str, Any]]: 1684 """List custom YAML source definitions in the Airbyte Cloud workspace. 1685 1686 Note: Only YAML (declarative) connectors are currently supported. 1687 Docker-based custom sources are not yet available. 1688 """ 1689 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1690 definitions = workspace.list_custom_source_definitions( 1691 definition_type="yaml", 1692 ) 1693 1694 return [ 1695 { 1696 "definition_id": d.definition_id, 1697 "name": d.name, 1698 "version": d.version, 1699 "connector_builder_project_url": d.connector_builder_project_url, 1700 } 1701 for d in definitions 1702 ] 1703 1704 1705@mcp_tool( 1706 read_only=True, 1707 idempotent=True, 1708 open_world=True, 1709) 1710def get_custom_source_definition( 1711 ctx: Context, 1712 definition_id: Annotated[ 1713 str, 1714 Field(description="The ID of the custom source definition to retrieve."), 1715 ], 1716 *, 1717 workspace_id: Annotated[ 1718 str | None, 1719 Field( 1720 description=WORKSPACE_ID_TIP_TEXT, 1721 default=None, 1722 ), 1723 ], 1724) -> dict[str, Any]: 1725 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1726 1727 Returns the full definition details including the manifest YAML content, 1728 which can be used to inspect or store the connector configuration locally. 1729 1730 Note: Only YAML (declarative) connectors are currently supported. 1731 Docker-based custom sources are not yet available. 1732 """ 1733 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1734 definition = workspace.get_custom_source_definition( 1735 definition_id=definition_id, 1736 definition_type="yaml", 1737 ) 1738 1739 return { 1740 "definition_id": definition.definition_id, 1741 "name": definition.name, 1742 "version": definition.version, 1743 "connector_builder_project_id": definition.connector_builder_project_id, 1744 "connector_builder_project_url": definition.connector_builder_project_url, 1745 "manifest": definition.manifest, 1746 } 1747 1748 1749@mcp_tool( 1750 destructive=True, 1751 open_world=True, 1752) 1753def update_custom_source_definition( 1754 ctx: Context, 1755 definition_id: Annotated[ 1756 str, 1757 Field(description="The ID of the definition to update."), 1758 ], 1759 manifest_yaml: Annotated[ 1760 str | Path | None, 1761 Field( 1762 description=( 1763 "New manifest as YAML string or file path. " 1764 "Optional; omit to update only testing values." 1765 ), 1766 default=None, 1767 ), 1768 ] = None, 1769 *, 1770 workspace_id: Annotated[ 1771 str | None, 1772 Field( 1773 description=WORKSPACE_ID_TIP_TEXT, 1774 default=None, 1775 ), 1776 ], 1777 pre_validate: Annotated[ 1778 bool, 1779 Field( 1780 description="Whether to validate the manifest client-side before updating.", 1781 default=True, 1782 ), 1783 ] = True, 1784 testing_values: Annotated[ 1785 dict | str | None, 1786 Field( 1787 description=( 1788 "Optional testing configuration values for the Builder UI. " 1789 "Can be provided as a JSON object or JSON string. " 1790 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1791 "If provided, these values replace any existing testing values " 1792 "for the connector builder project. The entire testing values object " 1793 "is overwritten, so pass the full set of values you want to persist." 1794 ), 1795 default=None, 1796 ), 1797 ], 1798 testing_values_secret_name: Annotated[ 1799 str | None, 1800 Field( 1801 description=( 1802 "Optional name of a secret containing testing configuration values " 1803 "in JSON or YAML format. The secret will be resolved by the MCP " 1804 "server and merged into testing_values, with secret values taking " 1805 "precedence. This lets the agent reference secrets without sending " 1806 "raw values as tool arguments." 1807 ), 1808 default=None, 1809 ), 1810 ], 1811) -> str: 1812 """Update a custom YAML source definition in Airbyte Cloud. 1813 1814 Updates the manifest and/or testing values for an existing custom source definition. 1815 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1816 """ 1817 check_guid_created_in_session(definition_id) 1818 1819 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1820 1821 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1822 raise PyAirbyteInputError( 1823 message=( 1824 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1825 "must be provided to update a custom source definition." 1826 ), 1827 context={ 1828 "definition_id": definition_id, 1829 "workspace_id": workspace.workspace_id, 1830 }, 1831 ) 1832 1833 processed_manifest: str | Path | None = manifest_yaml 1834 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1835 processed_manifest = Path(manifest_yaml) 1836 1837 # Resolve testing values from inline config and/or secret 1838 testing_values_dict: dict[str, Any] | None = None 1839 if testing_values is not None or testing_values_secret_name is not None: 1840 testing_values_dict = ( 1841 resolve_connector_config( 1842 config=testing_values, 1843 config_secret_name=testing_values_secret_name, 1844 ) 1845 or None 1846 ) 1847 1848 definition = workspace.get_custom_source_definition( 1849 definition_id=definition_id, 1850 definition_type="yaml", 1851 ) 1852 custom_source: CustomCloudSourceDefinition = definition 1853 1854 if processed_manifest is not None: 1855 custom_source = definition.update_definition( 1856 manifest_yaml=processed_manifest, 1857 pre_validate=pre_validate, 1858 ) 1859 1860 if testing_values_dict is not None: 1861 custom_source.set_testing_values(testing_values_dict) 1862 1863 return ( 1864 "Successfully updated custom YAML source definition:\n" 1865 + _get_custom_source_definition_description( 1866 custom_source=custom_source, 1867 ) 1868 ) 1869 1870 1871@mcp_tool( 1872 destructive=True, 1873 open_world=True, 1874) 1875def permanently_delete_custom_source_definition( 1876 ctx: Context, 1877 definition_id: Annotated[ 1878 str, 1879 Field(description="The ID of the custom source definition to delete."), 1880 ], 1881 name: Annotated[ 1882 str, 1883 Field(description="The expected name of the custom source definition (for verification)."), 1884 ], 1885 *, 1886 workspace_id: Annotated[ 1887 str | None, 1888 Field( 1889 description=WORKSPACE_ID_TIP_TEXT, 1890 default=None, 1891 ), 1892 ], 1893) -> str: 1894 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1895 1896 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1897 (case insensitive). 1898 1899 If the connector does not meet this requirement, the deletion will be rejected with a 1900 helpful error message. Instruct the user to rename the connector appropriately to authorize 1901 the deletion. 1902 1903 The provided name must match the actual name of the definition for the operation to proceed. 1904 This is a safety measure to ensure you are deleting the correct resource. 1905 1906 Note: Only YAML (declarative) connectors are currently supported. 1907 Docker-based custom sources are not yet available. 1908 """ 1909 check_guid_created_in_session(definition_id) 1910 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1911 definition = workspace.get_custom_source_definition( 1912 definition_id=definition_id, 1913 definition_type="yaml", 1914 ) 1915 actual_name: str = definition.name 1916 1917 # Verify the name matches 1918 if actual_name != name: 1919 raise PyAirbyteInputError( 1920 message=( 1921 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1922 "The provided name must exactly match the definition's actual name. " 1923 "This is a safety measure to prevent accidental deletion." 1924 ), 1925 context={ 1926 "definition_id": definition_id, 1927 "expected_name": name, 1928 "actual_name": actual_name, 1929 }, 1930 ) 1931 1932 definition.permanently_delete( 1933 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1934 ) 1935 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" 1936 1937 1938@mcp_tool( 1939 destructive=True, 1940 open_world=True, 1941 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1942) 1943def permanently_delete_cloud_source( 1944 ctx: Context, 1945 source_id: Annotated[ 1946 str, 1947 Field(description="The ID of the deployed source to delete."), 1948 ], 1949 name: Annotated[ 1950 str, 1951 Field(description="The expected name of the source (for verification)."), 1952 ], 1953) -> str: 1954 """Permanently delete a deployed source connector from Airbyte Cloud. 1955 1956 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1957 (case insensitive). 1958 1959 If the source does not meet this requirement, the deletion will be rejected with a 1960 helpful error message. Instruct the user to rename the source appropriately to authorize 1961 the deletion. 1962 1963 The provided name must match the actual name of the source for the operation to proceed. 1964 This is a safety measure to ensure you are deleting the correct resource. 1965 """ 1966 check_guid_created_in_session(source_id) 1967 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 1968 source = workspace.get_source(source_id=source_id) 1969 actual_name: str = cast(str, source.name) 1970 1971 # Verify the name matches 1972 if actual_name != name: 1973 raise PyAirbyteInputError( 1974 message=( 1975 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1976 "The provided name must exactly match the source's actual name. " 1977 "This is a safety measure to prevent accidental deletion." 1978 ), 1979 context={ 1980 "source_id": source_id, 1981 "expected_name": name, 1982 "actual_name": actual_name, 1983 }, 1984 ) 1985 1986 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1987 workspace.permanently_delete_source( 1988 source=source_id, 1989 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1990 ) 1991 return f"Successfully deleted source '{actual_name}' (ID: {source_id})" 1992 1993 1994@mcp_tool( 1995 destructive=True, 1996 open_world=True, 1997 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1998) 1999def permanently_delete_cloud_destination( 2000 ctx: Context, 2001 destination_id: Annotated[ 2002 str, 2003 Field(description="The ID of the deployed destination to delete."), 2004 ], 2005 name: Annotated[ 2006 str, 2007 Field(description="The expected name of the destination (for verification)."), 2008 ], 2009) -> str: 2010 """Permanently delete a deployed destination connector from Airbyte Cloud. 2011 2012 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 2013 (case insensitive). 2014 2015 If the destination does not meet this requirement, the deletion will be rejected with a 2016 helpful error message. Instruct the user to rename the destination appropriately to authorize 2017 the deletion. 2018 2019 The provided name must match the actual name of the destination for the operation to proceed. 2020 This is a safety measure to ensure you are deleting the correct resource. 2021 """ 2022 check_guid_created_in_session(destination_id) 2023 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2024 destination = workspace.get_destination(destination_id=destination_id) 2025 actual_name: str = cast(str, destination.name) 2026 2027 # Verify the name matches 2028 if actual_name != name: 2029 raise PyAirbyteInputError( 2030 message=( 2031 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2032 "The provided name must exactly match the destination's actual name. " 2033 "This is a safety measure to prevent accidental deletion." 2034 ), 2035 context={ 2036 "destination_id": destination_id, 2037 "expected_name": name, 2038 "actual_name": actual_name, 2039 }, 2040 ) 2041 2042 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2043 workspace.permanently_delete_destination( 2044 destination=destination_id, 2045 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2046 ) 2047 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" 2048 2049 2050@mcp_tool( 2051 destructive=True, 2052 open_world=True, 2053 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2054) 2055def permanently_delete_cloud_connection( 2056 ctx: Context, 2057 connection_id: Annotated[ 2058 str, 2059 Field(description="The ID of the connection to delete."), 2060 ], 2061 name: Annotated[ 2062 str, 2063 Field(description="The expected name of the connection (for verification)."), 2064 ], 2065 *, 2066 cascade_delete_source: Annotated[ 2067 bool, 2068 Field( 2069 description=( 2070 "Whether to also delete the source connector associated with this connection." 2071 ), 2072 default=False, 2073 ), 2074 ] = False, 2075 cascade_delete_destination: Annotated[ 2076 bool, 2077 Field( 2078 description=( 2079 "Whether to also delete the destination connector associated with this connection." 2080 ), 2081 default=False, 2082 ), 2083 ] = False, 2084) -> str: 2085 """Permanently delete a connection from Airbyte Cloud. 2086 2087 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2088 (case insensitive). 2089 2090 If the connection does not meet this requirement, the deletion will be rejected with a 2091 helpful error message. Instruct the user to rename the connection appropriately to authorize 2092 the deletion. 2093 2094 The provided name must match the actual name of the connection for the operation to proceed. 2095 This is a safety measure to ensure you are deleting the correct resource. 2096 """ 2097 check_guid_created_in_session(connection_id) 2098 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2099 connection = workspace.get_connection(connection_id=connection_id) 2100 actual_name: str = cast(str, connection.name) 2101 2102 # Verify the name matches 2103 if actual_name != name: 2104 raise PyAirbyteInputError( 2105 message=( 2106 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2107 "The provided name must exactly match the connection's actual name. " 2108 "This is a safety measure to prevent accidental deletion." 2109 ), 2110 context={ 2111 "connection_id": connection_id, 2112 "expected_name": name, 2113 "actual_name": actual_name, 2114 }, 2115 ) 2116 2117 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2118 workspace.permanently_delete_connection( 2119 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2120 connection=connection_id, 2121 cascade_delete_source=cascade_delete_source, 2122 cascade_delete_destination=cascade_delete_destination, 2123 ) 2124 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" 2125 2126 2127@mcp_tool( 2128 open_world=True, 2129 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2130) 2131def rename_cloud_source( 2132 ctx: Context, 2133 source_id: Annotated[ 2134 str, 2135 Field(description="The ID of the deployed source to rename."), 2136 ], 2137 name: Annotated[ 2138 str, 2139 Field(description="New name for the source."), 2140 ], 2141 *, 2142 workspace_id: Annotated[ 2143 str | None, 2144 Field( 2145 description=WORKSPACE_ID_TIP_TEXT, 2146 default=None, 2147 ), 2148 ], 2149) -> str: 2150 """Rename a deployed source connector on Airbyte Cloud.""" 2151 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2152 source = workspace.get_source(source_id=source_id) 2153 source.rename(name=name) 2154 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}" 2155 2156 2157@mcp_tool( 2158 destructive=True, 2159 open_world=True, 2160 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2161) 2162def update_cloud_source_config( 2163 ctx: Context, 2164 source_id: Annotated[ 2165 str, 2166 Field(description="The ID of the deployed source to update."), 2167 ], 2168 config: Annotated[ 2169 dict | str, 2170 Field( 2171 description="New configuration for the source connector.", 2172 ), 2173 ], 2174 config_secret_name: Annotated[ 2175 str | None, 2176 Field( 2177 description="The name of the secret containing the configuration.", 2178 default=None, 2179 ), 2180 ] = None, 2181 *, 2182 workspace_id: Annotated[ 2183 str | None, 2184 Field( 2185 description=WORKSPACE_ID_TIP_TEXT, 2186 default=None, 2187 ), 2188 ], 2189) -> str: 2190 """Update a deployed source connector's configuration on Airbyte Cloud. 2191 2192 This is a destructive operation that can break existing connections if the 2193 configuration is changed incorrectly. Use with caution. 2194 """ 2195 check_guid_created_in_session(source_id) 2196 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2197 source = workspace.get_source(source_id=source_id) 2198 2199 config_dict = resolve_connector_config( 2200 config=config, 2201 config_secret_name=config_secret_name, 2202 config_spec_jsonschema=None, # We don't have the spec here 2203 ) 2204 2205 source.update_config(config=config_dict) 2206 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}" 2207 2208 2209@mcp_tool( 2210 open_world=True, 2211 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2212) 2213def rename_cloud_destination( 2214 ctx: Context, 2215 destination_id: Annotated[ 2216 str, 2217 Field(description="The ID of the deployed destination to rename."), 2218 ], 2219 name: Annotated[ 2220 str, 2221 Field(description="New name for the destination."), 2222 ], 2223 *, 2224 workspace_id: Annotated[ 2225 str | None, 2226 Field( 2227 description=WORKSPACE_ID_TIP_TEXT, 2228 default=None, 2229 ), 2230 ], 2231) -> str: 2232 """Rename a deployed destination connector on Airbyte Cloud.""" 2233 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2234 destination = workspace.get_destination(destination_id=destination_id) 2235 destination.rename(name=name) 2236 return ( 2237 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2238 f"URL: {destination.connector_url}" 2239 ) 2240 2241 2242@mcp_tool( 2243 destructive=True, 2244 open_world=True, 2245 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2246) 2247def update_cloud_destination_config( 2248 ctx: Context, 2249 destination_id: Annotated[ 2250 str, 2251 Field(description="The ID of the deployed destination to update."), 2252 ], 2253 config: Annotated[ 2254 dict | str, 2255 Field( 2256 description="New configuration for the destination connector.", 2257 ), 2258 ], 2259 config_secret_name: Annotated[ 2260 str | None, 2261 Field( 2262 description="The name of the secret containing the configuration.", 2263 default=None, 2264 ), 2265 ], 2266 *, 2267 workspace_id: Annotated[ 2268 str | None, 2269 Field( 2270 description=WORKSPACE_ID_TIP_TEXT, 2271 default=None, 2272 ), 2273 ], 2274) -> str: 2275 """Update a deployed destination connector's configuration on Airbyte Cloud. 2276 2277 This is a destructive operation that can break existing connections if the 2278 configuration is changed incorrectly. Use with caution. 2279 """ 2280 check_guid_created_in_session(destination_id) 2281 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2282 destination = workspace.get_destination(destination_id=destination_id) 2283 2284 config_dict = resolve_connector_config( 2285 config=config, 2286 config_secret_name=config_secret_name, 2287 config_spec_jsonschema=None, # We don't have the spec here 2288 ) 2289 2290 destination.update_config(config=config_dict) 2291 return ( 2292 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2293 ) 2294 2295 2296@mcp_tool( 2297 open_world=True, 2298 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2299) 2300def rename_cloud_connection( 2301 ctx: Context, 2302 connection_id: Annotated[ 2303 str, 2304 Field(description="The ID of the connection to rename."), 2305 ], 2306 name: Annotated[ 2307 str, 2308 Field(description="New name for the connection."), 2309 ], 2310 *, 2311 workspace_id: Annotated[ 2312 str | None, 2313 Field( 2314 description=WORKSPACE_ID_TIP_TEXT, 2315 default=None, 2316 ), 2317 ], 2318) -> str: 2319 """Rename a connection on Airbyte Cloud.""" 2320 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2321 connection = workspace.get_connection(connection_id=connection_id) 2322 connection.rename(name=name) 2323 return ( 2324 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2325 f"URL: {connection.connection_url}" 2326 ) 2327 2328 2329@mcp_tool( 2330 destructive=True, 2331 open_world=True, 2332 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2333) 2334def set_cloud_connection_table_prefix( 2335 ctx: Context, 2336 connection_id: Annotated[ 2337 str, 2338 Field(description="The ID of the connection to update."), 2339 ], 2340 prefix: Annotated[ 2341 str, 2342 Field(description="New table prefix to use when syncing to the destination."), 2343 ], 2344 *, 2345 workspace_id: Annotated[ 2346 str | None, 2347 Field( 2348 description=WORKSPACE_ID_TIP_TEXT, 2349 default=None, 2350 ), 2351 ], 2352) -> str: 2353 """Set the table prefix for a connection on Airbyte Cloud. 2354 2355 This is a destructive operation that can break downstream dependencies if the 2356 table prefix is changed incorrectly. Use with caution. 2357 """ 2358 check_guid_created_in_session(connection_id) 2359 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2360 connection = workspace.get_connection(connection_id=connection_id) 2361 connection.set_table_prefix(prefix=prefix) 2362 return ( 2363 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2364 f"URL: {connection.connection_url}" 2365 ) 2366 2367 2368@mcp_tool( 2369 destructive=True, 2370 open_world=True, 2371 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2372) 2373def set_cloud_connection_selected_streams( 2374 ctx: Context, 2375 connection_id: Annotated[ 2376 str, 2377 Field(description="The ID of the connection to update."), 2378 ], 2379 stream_names: Annotated[ 2380 str | list[str], 2381 Field( 2382 description=( 2383 "The selected stream names to sync within the connection. " 2384 "Must be an explicit stream name or list of streams." 2385 ) 2386 ), 2387 ], 2388 *, 2389 workspace_id: Annotated[ 2390 str | None, 2391 Field( 2392 description=WORKSPACE_ID_TIP_TEXT, 2393 default=None, 2394 ), 2395 ], 2396) -> str: 2397 """Set the selected streams for a connection on Airbyte Cloud. 2398 2399 This is a destructive operation that can break existing connections if the 2400 stream selection is changed incorrectly. Use with caution. 2401 """ 2402 check_guid_created_in_session(connection_id) 2403 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2404 connection = workspace.get_connection(connection_id=connection_id) 2405 2406 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2407 connection.set_selected_streams(stream_names=resolved_streams_list) 2408 2409 return ( 2410 f"Successfully set selected streams for connection '{connection_id}' " 2411 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2412 ) 2413 2414 2415@mcp_tool( 2416 open_world=True, 2417 destructive=True, 2418 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2419) 2420def update_cloud_connection( 2421 ctx: Context, 2422 connection_id: Annotated[ 2423 str, 2424 Field(description="The ID of the connection to update."), 2425 ], 2426 *, 2427 enabled: Annotated[ 2428 bool | None, 2429 Field( 2430 description=( 2431 "Set the connection's enabled status. " 2432 "True enables the connection (status='active'), " 2433 "False disables it (status='inactive'). " 2434 "Leave unset to keep the current status." 2435 ), 2436 default=None, 2437 ), 2438 ], 2439 cron_expression: Annotated[ 2440 str | None, 2441 Field( 2442 description=( 2443 "A cron expression defining when syncs should run. " 2444 "Examples: '0 0 * * *' (daily at midnight UTC), " 2445 "'0 */6 * * *' (every 6 hours), " 2446 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2447 "Leave unset to keep the current schedule. " 2448 "Cannot be used together with 'manual_schedule'." 2449 ), 2450 default=None, 2451 ), 2452 ], 2453 manual_schedule: Annotated[ 2454 bool | None, 2455 Field( 2456 description=( 2457 "Set to True to disable automatic syncs (manual scheduling only). " 2458 "Syncs will only run when manually triggered. " 2459 "Cannot be used together with 'cron_expression'." 2460 ), 2461 default=None, 2462 ), 2463 ], 2464 workspace_id: Annotated[ 2465 str | None, 2466 Field( 2467 description=WORKSPACE_ID_TIP_TEXT, 2468 default=None, 2469 ), 2470 ], 2471) -> str: 2472 """Update a connection's settings on Airbyte Cloud. 2473 2474 This tool allows updating multiple connection settings in a single call: 2475 - Enable or disable the connection 2476 - Set a cron schedule for automatic syncs 2477 - Switch to manual scheduling (no automatic syncs) 2478 2479 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2480 parameters are mutually exclusive. 2481 """ 2482 check_guid_created_in_session(connection_id) 2483 2484 # Validate that at least one setting is provided 2485 if enabled is None and cron_expression is None and manual_schedule is None: 2486 raise ValueError( 2487 "At least one setting must be provided: 'enabled', 'cron_expression', " 2488 "or 'manual_schedule'." 2489 ) 2490 2491 # Validate mutually exclusive schedule options 2492 if cron_expression is not None and manual_schedule is True: 2493 raise ValueError( 2494 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2495 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2496 "for manual-only syncs." 2497 ) 2498 2499 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2500 connection = workspace.get_connection(connection_id=connection_id) 2501 2502 changes_made: list[str] = [] 2503 2504 # Apply enabled status change 2505 if enabled is not None: 2506 connection.set_enabled(enabled=enabled) 2507 status_str = "enabled" if enabled else "disabled" 2508 changes_made.append(f"status set to '{status_str}'") 2509 2510 # Apply schedule change 2511 if cron_expression is not None: 2512 connection.set_schedule(cron_expression=cron_expression) 2513 changes_made.append(f"schedule set to '{cron_expression}'") 2514 elif manual_schedule is True: 2515 connection.set_manual_schedule() 2516 changes_made.append("schedule set to 'manual'") 2517 2518 changes_summary = ", ".join(changes_made) 2519 return ( 2520 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2521 f"URL: {connection.connection_url}" 2522 ) 2523 2524 2525@mcp_tool( 2526 read_only=True, 2527 idempotent=True, 2528 open_world=True, 2529 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2530) 2531def get_connection_artifact( 2532 ctx: Context, 2533 connection_id: Annotated[ 2534 str, 2535 Field(description="The ID of the Airbyte Cloud connection."), 2536 ], 2537 artifact_type: Annotated[ 2538 Literal["state", "catalog"], 2539 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2540 ], 2541 *, 2542 workspace_id: Annotated[ 2543 str | None, 2544 Field( 2545 description=WORKSPACE_ID_TIP_TEXT, 2546 default=None, 2547 ), 2548 ], 2549) -> dict[str, Any]: 2550 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2551 2552 Retrieves the specified artifact for a connection: 2553 - 'state': Returns the full raw connection state including stateType and all 2554 state data, or {"ERROR": "..."} if no state is set. 2555 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2556 or {"ERROR": "..."} if not found. 2557 """ 2558 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2559 connection = workspace.get_connection(connection_id=connection_id) 2560 2561 if artifact_type == "state": 2562 result = connection.dump_raw_state() 2563 if result.get("stateType") == "not_set": 2564 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2565 return result 2566 2567 # artifact_type == "catalog" 2568 result = connection.dump_raw_catalog() 2569 if result is None: 2570 return {"ERROR": "No catalog found for this connection"} 2571 return result 2572 2573 2574def register_cloud_tools(app: FastMCP) -> None: 2575 """Register cloud tools with the FastMCP app. 2576 2577 Args: 2578 app: FastMCP application instance 2579 """ 2580 register_mcp_tools( 2581 app, 2582 mcp_module=__name__, 2583 exclude_args=["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None, 2584 )
44class CloudSourceResult(BaseModel): 45 """Information about a deployed source connector in Airbyte Cloud.""" 46 47 id: str 48 """The source ID.""" 49 name: str 50 """Display name of the source.""" 51 url: str 52 """Web URL for managing this source in Airbyte Cloud."""
Information about a deployed source connector in Airbyte Cloud.
55class CloudDestinationResult(BaseModel): 56 """Information about a deployed destination connector in Airbyte Cloud.""" 57 58 id: str 59 """The destination ID.""" 60 name: str 61 """Display name of the destination.""" 62 url: str 63 """Web URL for managing this destination in Airbyte Cloud."""
Information about a deployed destination connector in Airbyte Cloud.
66class CloudConnectionResult(BaseModel): 67 """Information about a deployed connection in Airbyte Cloud.""" 68 69 id: str 70 """The connection ID.""" 71 name: str 72 """Display name of the connection.""" 73 url: str 74 """Web URL for managing this connection in Airbyte Cloud.""" 75 source_id: str 76 """ID of the source used by this connection.""" 77 destination_id: str 78 """ID of the destination used by this connection.""" 79 last_job_status: str | None = None 80 """Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). 81 Only populated when with_connection_status=True.""" 82 last_job_id: int | None = None 83 """Job ID of the most recent completed sync. Only populated when with_connection_status=True.""" 84 last_job_time: str | None = None 85 """ISO 8601 timestamp of the most recent completed sync. 86 Only populated when with_connection_status=True.""" 87 currently_running_job_id: int | None = None 88 """Job ID of a currently running sync, if any. 89 Only populated when with_connection_status=True.""" 90 currently_running_job_start_time: str | None = None 91 """ISO 8601 timestamp of when the currently running sync started. 92 Only populated when with_connection_status=True."""
Information about a deployed connection in Airbyte Cloud.
Status of the most recent completed sync job (e.g., 'succeeded', 'failed', 'cancelled'). Only populated when with_connection_status=True.
Job ID of the most recent completed sync. Only populated when with_connection_status=True.
ISO 8601 timestamp of the most recent completed sync. Only populated when with_connection_status=True.
95class CloudSourceDetails(BaseModel): 96 """Detailed information about a deployed source connector in Airbyte Cloud.""" 97 98 source_id: str 99 """The source ID.""" 100 source_name: str 101 """Display name of the source.""" 102 source_url: str 103 """Web URL for managing this source in Airbyte Cloud.""" 104 connector_definition_id: str 105 """The connector definition ID (e.g., the ID for 'source-postgres')."""
Detailed information about a deployed source connector in Airbyte Cloud.
108class CloudDestinationDetails(BaseModel): 109 """Detailed information about a deployed destination connector in Airbyte Cloud.""" 110 111 destination_id: str 112 """The destination ID.""" 113 destination_name: str 114 """Display name of the destination.""" 115 destination_url: str 116 """Web URL for managing this destination in Airbyte Cloud.""" 117 connector_definition_id: str 118 """The connector definition ID (e.g., the ID for 'destination-snowflake')."""
Detailed information about a deployed destination connector in Airbyte Cloud.
121class CloudConnectionDetails(BaseModel): 122 """Detailed information about a deployed connection in Airbyte Cloud.""" 123 124 connection_id: str 125 """The connection ID.""" 126 connection_name: str 127 """Display name of the connection.""" 128 connection_url: str 129 """Web URL for managing this connection in Airbyte Cloud.""" 130 source_id: str 131 """ID of the source used by this connection.""" 132 source_name: str 133 """Display name of the source.""" 134 destination_id: str 135 """ID of the destination used by this connection.""" 136 destination_name: str 137 """Display name of the destination.""" 138 selected_streams: list[str] 139 """List of stream names selected for syncing.""" 140 table_prefix: str | None 141 """Table prefix applied when syncing to the destination."""
Detailed information about a deployed connection in Airbyte Cloud.
144class CloudOrganizationResult(BaseModel): 145 """Information about an organization in Airbyte Cloud.""" 146 147 id: str 148 """The organization ID.""" 149 name: str 150 """Display name of the organization.""" 151 email: str 152 """Email associated with the organization.""" 153 payment_status: str | None = None 154 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 155 When 'disabled', syncs are blocked due to unpaid invoices.""" 156 subscription_status: str | None = None 157 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 158 'unsubscribed').""" 159 is_account_locked: bool = False 160 """Whether the account is locked due to billing issues. 161 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 162 Defaults to False unless we have affirmative evidence of a locked state."""
Information about an organization in Airbyte Cloud.
Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices.
165class CloudWorkspaceResult(BaseModel): 166 """Information about a workspace in Airbyte Cloud.""" 167 168 workspace_id: str 169 """The workspace ID.""" 170 workspace_name: str 171 """Display name of the workspace.""" 172 workspace_url: str | None = None 173 """URL to access the workspace in Airbyte Cloud.""" 174 organization_id: str 175 """ID of the organization (requires ORGANIZATION_READER permission).""" 176 organization_name: str | None = None 177 """Name of the organization (requires ORGANIZATION_READER permission).""" 178 payment_status: str | None = None 179 """Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). 180 When 'disabled', syncs are blocked due to unpaid invoices. 181 Requires ORGANIZATION_READER permission.""" 182 subscription_status: str | None = None 183 """Subscription status of the organization (e.g., 'pre_subscription', 'subscribed', 184 'unsubscribed'). Requires ORGANIZATION_READER permission.""" 185 is_account_locked: bool = False 186 """Whether the account is locked due to billing issues. 187 True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. 188 Defaults to False unless we have affirmative evidence of a locked state. 189 Requires ORGANIZATION_READER permission."""
Information about a workspace in Airbyte Cloud.
ID of the organization (requires ORGANIZATION_READER permission).
Name of the organization (requires ORGANIZATION_READER permission).
Payment status of the organization (e.g., 'okay', 'grace_period', 'disabled', 'locked'). When 'disabled', syncs are blocked due to unpaid invoices. Requires ORGANIZATION_READER permission.
192class LogReadResult(BaseModel): 193 """Result of reading sync logs with pagination support.""" 194 195 job_id: int 196 """The job ID the logs belong to.""" 197 attempt_number: int 198 """The attempt number the logs belong to.""" 199 log_text: str 200 """The string containing the log text we are returning.""" 201 log_text_start_line: int 202 """1-based line index of the first line returned.""" 203 log_text_line_count: int 204 """Count of lines we are returning.""" 205 total_log_lines_available: int 206 """Total number of log lines available, shows if any lines were missed due to the limit."""
Result of reading sync logs with pagination support.
209class SyncJobResult(BaseModel): 210 """Information about a sync job.""" 211 212 job_id: int 213 """The job ID.""" 214 status: str 215 """The job status (e.g., 'succeeded', 'failed', 'running', 'pending').""" 216 bytes_synced: int 217 """Number of bytes synced in this job.""" 218 records_synced: int 219 """Number of records synced in this job.""" 220 start_time: str 221 """ISO 8601 timestamp of when the job started.""" 222 job_url: str 223 """URL to view the job in Airbyte Cloud."""
Information about a sync job.
226class SyncJobListResult(BaseModel): 227 """Result of listing sync jobs with pagination support.""" 228 229 jobs: list[SyncJobResult] 230 """List of sync jobs.""" 231 jobs_count: int 232 """Number of jobs returned in this response.""" 233 jobs_offset: int 234 """Offset used for this request (0 if not specified).""" 235 from_tail: bool 236 """Whether jobs are ordered newest-first (True) or oldest-first (False)."""
Result of listing sync jobs with pagination support.
274@mcp_tool( 275 open_world=True, 276 extra_help_text=CLOUD_AUTH_TIP_TEXT, 277) 278def deploy_source_to_cloud( 279 ctx: Context, 280 source_name: Annotated[ 281 str, 282 Field(description="The name to use when deploying the source."), 283 ], 284 source_connector_name: Annotated[ 285 str, 286 Field(description="The name of the source connector (e.g., 'source-faker')."), 287 ], 288 *, 289 workspace_id: Annotated[ 290 str | None, 291 Field( 292 description=WORKSPACE_ID_TIP_TEXT, 293 default=None, 294 ), 295 ], 296 config: Annotated[ 297 dict | str | None, 298 Field( 299 description="The configuration for the source connector.", 300 default=None, 301 ), 302 ], 303 config_secret_name: Annotated[ 304 str | None, 305 Field( 306 description="The name of the secret containing the configuration.", 307 default=None, 308 ), 309 ], 310 unique: Annotated[ 311 bool, 312 Field( 313 description="Whether to require a unique name.", 314 default=True, 315 ), 316 ], 317) -> str: 318 """Deploy a source connector to Airbyte Cloud.""" 319 source = get_source( 320 source_connector_name, 321 no_executor=True, 322 ) 323 config_dict = resolve_connector_config( 324 config=config, 325 config_secret_name=config_secret_name, 326 config_spec_jsonschema=source.config_spec, 327 ) 328 source.set_config(config_dict, validate=True) 329 330 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 331 deployed_source = workspace.deploy_source( 332 name=source_name, 333 source=source, 334 unique=unique, 335 ) 336 337 register_guid_created_in_session(deployed_source.connector_id) 338 return ( 339 f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" 340 f" and URL: {deployed_source.connector_url}" 341 )
Deploy a source connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
344@mcp_tool( 345 open_world=True, 346 extra_help_text=CLOUD_AUTH_TIP_TEXT, 347) 348def deploy_destination_to_cloud( 349 ctx: Context, 350 destination_name: Annotated[ 351 str, 352 Field(description="The name to use when deploying the destination."), 353 ], 354 destination_connector_name: Annotated[ 355 str, 356 Field(description="The name of the destination connector (e.g., 'destination-postgres')."), 357 ], 358 *, 359 workspace_id: Annotated[ 360 str | None, 361 Field( 362 description=WORKSPACE_ID_TIP_TEXT, 363 default=None, 364 ), 365 ], 366 config: Annotated[ 367 dict | str | None, 368 Field( 369 description="The configuration for the destination connector.", 370 default=None, 371 ), 372 ], 373 config_secret_name: Annotated[ 374 str | None, 375 Field( 376 description="The name of the secret containing the configuration.", 377 default=None, 378 ), 379 ], 380 unique: Annotated[ 381 bool, 382 Field( 383 description="Whether to require a unique name.", 384 default=True, 385 ), 386 ], 387) -> str: 388 """Deploy a destination connector to Airbyte Cloud.""" 389 destination = get_destination( 390 destination_connector_name, 391 no_executor=True, 392 ) 393 config_dict = resolve_connector_config( 394 config=config, 395 config_secret_name=config_secret_name, 396 config_spec_jsonschema=destination.config_spec, 397 ) 398 destination.set_config(config_dict, validate=True) 399 400 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 401 deployed_destination = workspace.deploy_destination( 402 name=destination_name, 403 destination=destination, 404 unique=unique, 405 ) 406 407 register_guid_created_in_session(deployed_destination.connector_id) 408 return ( 409 f"Successfully deployed destination '{destination_name}' " 410 f"with ID: {deployed_destination.connector_id}" 411 )
Deploy a destination connector to Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
414@mcp_tool( 415 open_world=True, 416 extra_help_text=CLOUD_AUTH_TIP_TEXT, 417) 418def create_connection_on_cloud( 419 ctx: Context, 420 connection_name: Annotated[ 421 str, 422 Field(description="The name of the connection."), 423 ], 424 source_id: Annotated[ 425 str, 426 Field(description="The ID of the deployed source."), 427 ], 428 destination_id: Annotated[ 429 str, 430 Field(description="The ID of the deployed destination."), 431 ], 432 selected_streams: Annotated[ 433 str | list[str], 434 Field( 435 description=( 436 "The selected stream names to sync within the connection. " 437 "Must be an explicit stream name or list of streams. " 438 "Cannot be empty or '*'." 439 ) 440 ), 441 ], 442 *, 443 workspace_id: Annotated[ 444 str | None, 445 Field( 446 description=WORKSPACE_ID_TIP_TEXT, 447 default=None, 448 ), 449 ], 450 table_prefix: Annotated[ 451 str | None, 452 Field( 453 description="Optional table prefix to use when syncing to the destination.", 454 default=None, 455 ), 456 ], 457) -> str: 458 """Create a connection between a deployed source and destination on Airbyte Cloud.""" 459 resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) 460 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 461 deployed_connection = workspace.deploy_connection( 462 connection_name=connection_name, 463 source=source_id, 464 destination=destination_id, 465 selected_streams=resolved_streams_list, 466 table_prefix=table_prefix, 467 ) 468 469 register_guid_created_in_session(deployed_connection.connection_id) 470 return ( 471 f"Successfully created connection '{connection_name}' " 472 f"with ID '{deployed_connection.connection_id}' and " 473 f"URL: {deployed_connection.connection_url}" 474 )
Create a connection between a deployed source and destination on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
477@mcp_tool( 478 open_world=True, 479 extra_help_text=CLOUD_AUTH_TIP_TEXT, 480) 481def run_cloud_sync( 482 ctx: Context, 483 connection_id: Annotated[ 484 str, 485 Field(description="The ID of the Airbyte Cloud connection."), 486 ], 487 *, 488 workspace_id: Annotated[ 489 str | None, 490 Field( 491 description=WORKSPACE_ID_TIP_TEXT, 492 default=None, 493 ), 494 ], 495 wait: Annotated[ 496 bool, 497 Field( 498 description=( 499 "Whether to wait for the sync to complete. Since a sync can take between several " 500 "minutes and several hours, this option is not recommended for most " 501 "scenarios." 502 ), 503 default=False, 504 ), 505 ], 506 wait_timeout: Annotated[ 507 int, 508 Field( 509 description="Maximum time to wait for sync completion (seconds).", 510 default=300, 511 ), 512 ], 513) -> str: 514 """Run a sync job on Airbyte Cloud.""" 515 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 516 connection = workspace.get_connection(connection_id=connection_id) 517 sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) 518 519 if wait: 520 status = sync_result.get_job_status() 521 return ( 522 f"Sync completed with status: {status}. " 523 f"Job ID is '{sync_result.job_id}' and " 524 f"job URL is: {sync_result.job_url}" 525 ) 526 return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}"
Run a sync job on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
529@mcp_tool( 530 read_only=True, 531 idempotent=True, 532 open_world=True, 533 extra_help_text=CLOUD_AUTH_TIP_TEXT, 534) 535def check_airbyte_cloud_workspace( 536 ctx: Context, 537 *, 538 workspace_id: Annotated[ 539 str | None, 540 Field( 541 description=WORKSPACE_ID_TIP_TEXT, 542 default=None, 543 ), 544 ], 545) -> CloudWorkspaceResult: 546 """Check if we have a valid Airbyte Cloud connection and return workspace info. 547 548 Returns workspace details including workspace ID, name, organization info, and billing status. 549 """ 550 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 551 552 # Get workspace details from the public API using workspace's credentials 553 workspace_response = api_util.get_workspace( 554 workspace_id=workspace.workspace_id, 555 api_root=workspace.api_root, 556 client_id=workspace.client_id, 557 client_secret=workspace.client_secret, 558 bearer_token=workspace.bearer_token, 559 ) 560 561 # Try to get organization info (including billing), but fail gracefully if we don't have 562 # permissions. Fetching organization info requires ORGANIZATION_READER permissions on the 563 # organization, which may not be available with workspace-scoped credentials. 564 organization = workspace.get_organization(raise_on_error=False) 565 566 return CloudWorkspaceResult( 567 workspace_id=workspace_response.workspace_id, 568 workspace_name=workspace_response.name, 569 workspace_url=workspace.workspace_url, 570 organization_id=( 571 organization.organization_id 572 if organization 573 else "[unavailable - requires ORGANIZATION_READER permission]" 574 ), 575 organization_name=organization.organization_name if organization else None, 576 payment_status=organization.payment_status if organization else None, 577 subscription_status=organization.subscription_status if organization else None, 578 is_account_locked=organization.is_account_locked if organization else False, 579 )
Check if we have a valid Airbyte Cloud connection and return workspace info.
Returns workspace details including workspace ID, name, organization info, and billing status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
582@mcp_tool( 583 open_world=True, 584 extra_help_text=CLOUD_AUTH_TIP_TEXT, 585) 586def deploy_noop_destination_to_cloud( 587 ctx: Context, 588 name: str = "No-op Destination", 589 *, 590 workspace_id: Annotated[ 591 str | None, 592 Field( 593 description=WORKSPACE_ID_TIP_TEXT, 594 default=None, 595 ), 596 ], 597 unique: bool = True, 598) -> str: 599 """Deploy the No-op destination to Airbyte Cloud for testing purposes.""" 600 destination = get_noop_destination() 601 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 602 deployed_destination = workspace.deploy_destination( 603 name=name, 604 destination=destination, 605 unique=unique, 606 ) 607 register_guid_created_in_session(deployed_destination.connector_id) 608 return ( 609 f"Successfully deployed No-op Destination " 610 f"with ID '{deployed_destination.connector_id}' and " 611 f"URL: {deployed_destination.connector_url}" 612 )
Deploy the No-op destination to Airbyte Cloud for testing purposes.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
615@mcp_tool( 616 read_only=True, 617 idempotent=True, 618 open_world=True, 619 extra_help_text=CLOUD_AUTH_TIP_TEXT, 620) 621def get_cloud_sync_status( 622 ctx: Context, 623 connection_id: Annotated[ 624 str, 625 Field( 626 description="The ID of the Airbyte Cloud connection.", 627 ), 628 ], 629 job_id: Annotated[ 630 int | None, 631 Field( 632 description="Optional job ID. If not provided, the latest job will be used.", 633 default=None, 634 ), 635 ], 636 *, 637 workspace_id: Annotated[ 638 str | None, 639 Field( 640 description=WORKSPACE_ID_TIP_TEXT, 641 default=None, 642 ), 643 ], 644 include_attempts: Annotated[ 645 bool, 646 Field( 647 description="Whether to include detailed attempts information.", 648 default=False, 649 ), 650 ], 651) -> dict[str, Any]: 652 """Get the status of a sync job from the Airbyte Cloud.""" 653 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 654 connection = workspace.get_connection(connection_id=connection_id) 655 656 # If a job ID is provided, get the job by ID. 657 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 658 659 if not sync_result: 660 return {"status": None, "job_id": None, "attempts": []} 661 662 result = { 663 "status": sync_result.get_job_status(), 664 "job_id": sync_result.job_id, 665 "bytes_synced": sync_result.bytes_synced, 666 "records_synced": sync_result.records_synced, 667 "start_time": sync_result.start_time.isoformat(), 668 "job_url": sync_result.job_url, 669 "attempts": [], 670 } 671 672 if include_attempts: 673 attempts = sync_result.get_attempts() 674 result["attempts"] = [ 675 { 676 "attempt_number": attempt.attempt_number, 677 "attempt_id": attempt.attempt_id, 678 "status": attempt.status, 679 "bytes_synced": attempt.bytes_synced, 680 "records_synced": attempt.records_synced, 681 "created_at": attempt.created_at.isoformat(), 682 } 683 for attempt in attempts 684 ] 685 686 return result
Get the status of a sync job from the Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
689@mcp_tool( 690 read_only=True, 691 idempotent=True, 692 open_world=True, 693 extra_help_text=CLOUD_AUTH_TIP_TEXT, 694) 695def list_cloud_sync_jobs( 696 ctx: Context, 697 connection_id: Annotated[ 698 str, 699 Field(description="The ID of the Airbyte Cloud connection."), 700 ], 701 *, 702 workspace_id: Annotated[ 703 str | None, 704 Field( 705 description=WORKSPACE_ID_TIP_TEXT, 706 default=None, 707 ), 708 ], 709 max_jobs: Annotated[ 710 int, 711 Field( 712 description=( 713 "Maximum number of jobs to return. " 714 "Defaults to 20 if not specified. " 715 "Maximum allowed value is 500." 716 ), 717 default=20, 718 ), 719 ], 720 from_tail: Annotated[ 721 bool | None, 722 Field( 723 description=( 724 "When True, jobs are ordered newest-first (createdAt DESC). " 725 "When False, jobs are ordered oldest-first (createdAt ASC). " 726 "Defaults to True if `jobs_offset` is not specified. " 727 "Cannot combine `from_tail=True` with `jobs_offset`." 728 ), 729 default=None, 730 ), 731 ], 732 jobs_offset: Annotated[ 733 int | None, 734 Field( 735 description=( 736 "Number of jobs to skip from the beginning. " 737 "Cannot be combined with `from_tail=True`." 738 ), 739 default=None, 740 ), 741 ], 742 job_type: Annotated[ 743 JobTypeEnum | None, 744 Field( 745 description=( 746 "Filter by job type. Options: 'sync', 'reset', 'refresh', 'clear'. " 747 "If not specified, defaults to sync and reset jobs only (API default). " 748 "Use 'refresh' to find refresh jobs or 'clear' to find clear jobs." 749 ), 750 default=None, 751 ), 752 ], 753) -> SyncJobListResult: 754 """List sync jobs for a connection with pagination support. 755 756 This tool allows you to retrieve a list of sync jobs for a connection, 757 with control over ordering and pagination. By default, jobs are returned 758 newest-first (from_tail=True). 759 """ 760 # Validate that jobs_offset and from_tail are not both set 761 if jobs_offset is not None and from_tail is True: 762 raise PyAirbyteInputError( 763 message="Cannot specify both 'jobs_offset' and 'from_tail=True' parameters.", 764 context={"jobs_offset": jobs_offset, "from_tail": from_tail}, 765 ) 766 767 # Default to from_tail=True if neither is specified 768 if from_tail is None and jobs_offset is None: 769 from_tail = True 770 elif from_tail is None: 771 from_tail = False 772 773 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 774 connection = workspace.get_connection(connection_id=connection_id) 775 776 # Cap at 500 to avoid overloading agent context 777 effective_limit = min(max_jobs, 500) if max_jobs > 0 else 20 778 779 sync_results = connection.get_previous_sync_logs( 780 limit=effective_limit, 781 offset=jobs_offset, 782 from_tail=from_tail, 783 job_type=job_type, 784 ) 785 786 jobs = [ 787 SyncJobResult( 788 job_id=sync_result.job_id, 789 status=str(sync_result.get_job_status()), 790 bytes_synced=sync_result.bytes_synced, 791 records_synced=sync_result.records_synced, 792 start_time=sync_result.start_time.isoformat(), 793 job_url=sync_result.job_url, 794 ) 795 for sync_result in sync_results 796 ] 797 798 return SyncJobListResult( 799 jobs=jobs, 800 jobs_count=len(jobs), 801 jobs_offset=jobs_offset or 0, 802 from_tail=from_tail, 803 )
List sync jobs for a connection with pagination support.
This tool allows you to retrieve a list of sync jobs for a connection,
with control over ordering and pagination. By default, jobs are returned
newest-first (from_tail=True).
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
806@mcp_tool( 807 read_only=True, 808 idempotent=True, 809 open_world=True, 810 extra_help_text=CLOUD_AUTH_TIP_TEXT, 811) 812def list_deployed_cloud_source_connectors( 813 ctx: Context, 814 *, 815 workspace_id: Annotated[ 816 str | None, 817 Field( 818 description=WORKSPACE_ID_TIP_TEXT, 819 default=None, 820 ), 821 ], 822 name_contains: Annotated[ 823 str | None, 824 Field( 825 description="Optional case-insensitive substring to filter sources by name", 826 default=None, 827 ), 828 ], 829 max_items_limit: Annotated[ 830 int | None, 831 Field( 832 description="Optional maximum number of items to return (default: no limit)", 833 default=None, 834 ), 835 ], 836) -> list[CloudSourceResult]: 837 """List all deployed source connectors in the Airbyte Cloud workspace.""" 838 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 839 sources = workspace.list_sources() 840 841 # Filter by name if requested 842 if name_contains: 843 needle = name_contains.lower() 844 sources = [s for s in sources if s.name is not None and needle in s.name.lower()] 845 846 # Apply limit if requested 847 if max_items_limit is not None: 848 sources = sources[:max_items_limit] 849 850 # Note: name and url are guaranteed non-null from list API responses 851 return [ 852 CloudSourceResult( 853 id=source.source_id, 854 name=cast(str, source.name), 855 url=cast(str, source.connector_url), 856 ) 857 for source in sources 858 ]
List all deployed source connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
861@mcp_tool( 862 read_only=True, 863 idempotent=True, 864 open_world=True, 865 extra_help_text=CLOUD_AUTH_TIP_TEXT, 866) 867def list_deployed_cloud_destination_connectors( 868 ctx: Context, 869 *, 870 workspace_id: Annotated[ 871 str | None, 872 Field( 873 description=WORKSPACE_ID_TIP_TEXT, 874 default=None, 875 ), 876 ], 877 name_contains: Annotated[ 878 str | None, 879 Field( 880 description="Optional case-insensitive substring to filter destinations by name", 881 default=None, 882 ), 883 ], 884 max_items_limit: Annotated[ 885 int | None, 886 Field( 887 description="Optional maximum number of items to return (default: no limit)", 888 default=None, 889 ), 890 ], 891) -> list[CloudDestinationResult]: 892 """List all deployed destination connectors in the Airbyte Cloud workspace.""" 893 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 894 destinations = workspace.list_destinations() 895 896 # Filter by name if requested 897 if name_contains: 898 needle = name_contains.lower() 899 destinations = [d for d in destinations if d.name is not None and needle in d.name.lower()] 900 901 # Apply limit if requested 902 if max_items_limit is not None: 903 destinations = destinations[:max_items_limit] 904 905 # Note: name and url are guaranteed non-null from list API responses 906 return [ 907 CloudDestinationResult( 908 id=destination.destination_id, 909 name=cast(str, destination.name), 910 url=cast(str, destination.connector_url), 911 ) 912 for destination in destinations 913 ]
List all deployed destination connectors in the Airbyte Cloud workspace.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
916@mcp_tool( 917 read_only=True, 918 idempotent=True, 919 open_world=True, 920 extra_help_text=CLOUD_AUTH_TIP_TEXT, 921) 922def describe_cloud_source( 923 ctx: Context, 924 source_id: Annotated[ 925 str, 926 Field(description="The ID of the source to describe."), 927 ], 928 *, 929 workspace_id: Annotated[ 930 str | None, 931 Field( 932 description=WORKSPACE_ID_TIP_TEXT, 933 default=None, 934 ), 935 ], 936) -> CloudSourceDetails: 937 """Get detailed information about a specific deployed source connector.""" 938 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 939 source = workspace.get_source(source_id=source_id) 940 941 # Access name property to ensure _connector_info is populated 942 source_name = cast(str, source.name) 943 944 return CloudSourceDetails( 945 source_id=source.source_id, 946 source_name=source_name, 947 source_url=source.connector_url, 948 connector_definition_id=source._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 949 )
Get detailed information about a specific deployed source connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
952@mcp_tool( 953 read_only=True, 954 idempotent=True, 955 open_world=True, 956 extra_help_text=CLOUD_AUTH_TIP_TEXT, 957) 958def describe_cloud_destination( 959 ctx: Context, 960 destination_id: Annotated[ 961 str, 962 Field(description="The ID of the destination to describe."), 963 ], 964 *, 965 workspace_id: Annotated[ 966 str | None, 967 Field( 968 description=WORKSPACE_ID_TIP_TEXT, 969 default=None, 970 ), 971 ], 972) -> CloudDestinationDetails: 973 """Get detailed information about a specific deployed destination connector.""" 974 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 975 destination = workspace.get_destination(destination_id=destination_id) 976 977 # Access name property to ensure _connector_info is populated 978 destination_name = cast(str, destination.name) 979 980 return CloudDestinationDetails( 981 destination_id=destination.destination_id, 982 destination_name=destination_name, 983 destination_url=destination.connector_url, 984 connector_definition_id=destination._connector_info.definition_id, # noqa: SLF001 # type: ignore[union-attr] 985 )
Get detailed information about a specific deployed destination connector.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
988@mcp_tool( 989 read_only=True, 990 idempotent=True, 991 open_world=True, 992 extra_help_text=CLOUD_AUTH_TIP_TEXT, 993) 994def describe_cloud_connection( 995 ctx: Context, 996 connection_id: Annotated[ 997 str, 998 Field(description="The ID of the connection to describe."), 999 ], 1000 *, 1001 workspace_id: Annotated[ 1002 str | None, 1003 Field( 1004 description=WORKSPACE_ID_TIP_TEXT, 1005 default=None, 1006 ), 1007 ], 1008) -> CloudConnectionDetails: 1009 """Get detailed information about a specific deployed connection.""" 1010 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1011 connection = workspace.get_connection(connection_id=connection_id) 1012 1013 return CloudConnectionDetails( 1014 connection_id=connection.connection_id, 1015 connection_name=cast(str, connection.name), 1016 connection_url=cast(str, connection.connection_url), 1017 source_id=connection.source_id, 1018 source_name=cast(str, connection.source.name), 1019 destination_id=connection.destination_id, 1020 destination_name=cast(str, connection.destination.name), 1021 selected_streams=connection.stream_names, 1022 table_prefix=connection.table_prefix, 1023 )
Get detailed information about a specific deployed connection.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1026@mcp_tool( 1027 read_only=True, 1028 idempotent=True, 1029 open_world=True, 1030 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1031) 1032def get_cloud_sync_logs( 1033 ctx: Context, 1034 connection_id: Annotated[ 1035 str, 1036 Field(description="The ID of the Airbyte Cloud connection."), 1037 ], 1038 job_id: Annotated[ 1039 int | None, 1040 Field(description="Optional job ID. If not provided, the latest job will be used."), 1041 ] = None, 1042 attempt_number: Annotated[ 1043 int | None, 1044 Field( 1045 description="Optional attempt number. If not provided, the latest attempt will be used." 1046 ), 1047 ] = None, 1048 *, 1049 workspace_id: Annotated[ 1050 str | None, 1051 Field( 1052 description=WORKSPACE_ID_TIP_TEXT, 1053 default=None, 1054 ), 1055 ], 1056 max_lines: Annotated[ 1057 int, 1058 Field( 1059 description=( 1060 "Maximum number of lines to return. " 1061 "Defaults to 4000 if not specified. " 1062 "If '0' is provided, no limit is applied." 1063 ), 1064 default=4000, 1065 ), 1066 ], 1067 from_tail: Annotated[ 1068 bool | None, 1069 Field( 1070 description=( 1071 "Pull from the end of the log text if total lines is greater than 'max_lines'. " 1072 "Defaults to True if `line_offset` is not specified. " 1073 "Cannot combine `from_tail=True` with `line_offset`." 1074 ), 1075 default=None, 1076 ), 1077 ], 1078 line_offset: Annotated[ 1079 int | None, 1080 Field( 1081 description=( 1082 "Number of lines to skip from the beginning of the logs. " 1083 "Cannot be combined with `from_tail=True`." 1084 ), 1085 default=None, 1086 ), 1087 ], 1088) -> LogReadResult: 1089 """Get the logs from a sync job attempt on Airbyte Cloud.""" 1090 # Validate that line_offset and from_tail are not both set 1091 if line_offset is not None and from_tail: 1092 raise PyAirbyteInputError( 1093 message="Cannot specify both 'line_offset' and 'from_tail' parameters.", 1094 context={"line_offset": line_offset, "from_tail": from_tail}, 1095 ) 1096 1097 if from_tail is None and line_offset is None: 1098 from_tail = True 1099 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1100 connection = workspace.get_connection(connection_id=connection_id) 1101 1102 sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) 1103 1104 if not sync_result: 1105 raise AirbyteMissingResourceError( 1106 resource_type="sync job", 1107 resource_name_or_id=connection_id, 1108 ) 1109 1110 attempts = sync_result.get_attempts() 1111 1112 if not attempts: 1113 raise AirbyteMissingResourceError( 1114 resource_type="sync attempt", 1115 resource_name_or_id=str(sync_result.job_id), 1116 ) 1117 1118 if attempt_number is not None: 1119 target_attempt = None 1120 for attempt in attempts: 1121 if attempt.attempt_number == attempt_number: 1122 target_attempt = attempt 1123 break 1124 1125 if target_attempt is None: 1126 raise AirbyteMissingResourceError( 1127 resource_type="sync attempt", 1128 resource_name_or_id=f"job {sync_result.job_id}, attempt {attempt_number}", 1129 ) 1130 else: 1131 target_attempt = max(attempts, key=lambda a: a.attempt_number) 1132 1133 logs = target_attempt.get_full_log_text() 1134 1135 if not logs: 1136 # Return empty result with zero lines 1137 return LogReadResult( 1138 log_text=( 1139 f"[No logs available for job '{sync_result.job_id}', " 1140 f"attempt {target_attempt.attempt_number}.]" 1141 ), 1142 log_text_start_line=1, 1143 log_text_line_count=0, 1144 total_log_lines_available=0, 1145 job_id=sync_result.job_id, 1146 attempt_number=target_attempt.attempt_number, 1147 ) 1148 1149 # Apply line limiting 1150 log_lines = logs.splitlines() 1151 total_lines = len(log_lines) 1152 1153 # Determine effective max_lines (0 means no limit) 1154 effective_max = total_lines if max_lines == 0 else max_lines 1155 1156 # Calculate start_index and slice based on from_tail or line_offset 1157 if from_tail: 1158 start_index = max(0, total_lines - effective_max) 1159 selected_lines = log_lines[start_index:][:effective_max] 1160 else: 1161 start_index = line_offset or 0 1162 selected_lines = log_lines[start_index : start_index + effective_max] 1163 1164 return LogReadResult( 1165 log_text="\n".join(selected_lines), 1166 log_text_start_line=start_index + 1, # Convert to 1-based index 1167 log_text_line_count=len(selected_lines), 1168 total_log_lines_available=total_lines, 1169 job_id=sync_result.job_id, 1170 attempt_number=target_attempt.attempt_number, 1171 )
Get the logs from a sync job attempt on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1174@mcp_tool( 1175 read_only=True, 1176 idempotent=True, 1177 open_world=True, 1178 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1179) 1180def list_deployed_cloud_connections( 1181 ctx: Context, 1182 *, 1183 workspace_id: Annotated[ 1184 str | None, 1185 Field( 1186 description=WORKSPACE_ID_TIP_TEXT, 1187 default=None, 1188 ), 1189 ], 1190 name_contains: Annotated[ 1191 str | None, 1192 Field( 1193 description="Optional case-insensitive substring to filter connections by name", 1194 default=None, 1195 ), 1196 ], 1197 max_items_limit: Annotated[ 1198 int | None, 1199 Field( 1200 description="Optional maximum number of items to return (default: no limit)", 1201 default=None, 1202 ), 1203 ], 1204 with_connection_status: Annotated[ 1205 bool | None, 1206 Field( 1207 description="If True, include status info for each connection's most recent sync job", 1208 default=False, 1209 ), 1210 ], 1211 failing_connections_only: Annotated[ 1212 bool | None, 1213 Field( 1214 description="If True, only return connections with failed/cancelled last sync", 1215 default=False, 1216 ), 1217 ], 1218) -> list[CloudConnectionResult]: 1219 """List all deployed connections in the Airbyte Cloud workspace. 1220 1221 When with_connection_status is True, each connection result will include 1222 information about the most recent sync job status, skipping over any 1223 currently in-progress syncs to find the last completed job. 1224 1225 When failing_connections_only is True, only connections where the most 1226 recent completed sync job failed or was cancelled will be returned. 1227 This implicitly enables with_connection_status. 1228 """ 1229 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1230 connections = workspace.list_connections() 1231 1232 # Filter by name if requested 1233 if name_contains: 1234 needle = name_contains.lower() 1235 connections = [c for c in connections if c.name is not None and needle in c.name.lower()] 1236 1237 # If failing_connections_only is True, implicitly enable with_connection_status 1238 if failing_connections_only: 1239 with_connection_status = True 1240 1241 results: list[CloudConnectionResult] = [] 1242 1243 for connection in connections: 1244 last_job_status: str | None = None 1245 last_job_id: int | None = None 1246 last_job_time: str | None = None 1247 currently_running_job_id: int | None = None 1248 currently_running_job_start_time: str | None = None 1249 1250 if with_connection_status: 1251 sync_logs = connection.get_previous_sync_logs(limit=5) 1252 last_completed_job_status = None # Keep enum for comparison 1253 1254 for sync_result in sync_logs: 1255 job_status = sync_result.get_job_status() 1256 1257 if not sync_result.is_job_complete(): 1258 currently_running_job_id = sync_result.job_id 1259 currently_running_job_start_time = sync_result.start_time.isoformat() 1260 continue 1261 1262 last_completed_job_status = job_status 1263 last_job_status = str(job_status.value) if job_status else None 1264 last_job_id = sync_result.job_id 1265 last_job_time = sync_result.start_time.isoformat() 1266 break 1267 1268 if failing_connections_only and ( 1269 last_completed_job_status is None 1270 or last_completed_job_status not in FAILED_STATUSES 1271 ): 1272 continue 1273 1274 results.append( 1275 CloudConnectionResult( 1276 id=connection.connection_id, 1277 name=cast(str, connection.name), 1278 url=cast(str, connection.connection_url), 1279 source_id=connection.source_id, 1280 destination_id=connection.destination_id, 1281 last_job_status=last_job_status, 1282 last_job_id=last_job_id, 1283 last_job_time=last_job_time, 1284 currently_running_job_id=currently_running_job_id, 1285 currently_running_job_start_time=currently_running_job_start_time, 1286 ) 1287 ) 1288 1289 if max_items_limit is not None and len(results) >= max_items_limit: 1290 break 1291 1292 return results
List all deployed connections in the Airbyte Cloud workspace.
When with_connection_status is True, each connection result will include
information about the most recent sync job status, skipping over any
currently in-progress syncs to find the last completed job.
When failing_connections_only is True, only connections where the most
recent completed sync job failed or was cancelled will be returned.
This implicitly enables with_connection_status.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1412@mcp_tool( 1413 read_only=True, 1414 idempotent=True, 1415 open_world=True, 1416 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1417) 1418def list_cloud_workspaces( 1419 ctx: Context, 1420 *, 1421 organization_id: Annotated[ 1422 str | None, 1423 Field( 1424 description="Organization ID. Required if organization_name is not provided.", 1425 default=None, 1426 ), 1427 ], 1428 organization_name: Annotated[ 1429 str | None, 1430 Field( 1431 description=( 1432 "Organization name (exact match). " "Required if organization_id is not provided." 1433 ), 1434 default=None, 1435 ), 1436 ], 1437 name_contains: Annotated[ 1438 str | None, 1439 Field( 1440 description="Optional substring to filter workspaces by name (server-side filtering)", 1441 default=None, 1442 ), 1443 ], 1444 max_items_limit: Annotated[ 1445 int | None, 1446 Field( 1447 description="Optional maximum number of items to return (default: no limit)", 1448 default=None, 1449 ), 1450 ], 1451) -> list[CloudWorkspaceResult]: 1452 """List all workspaces in a specific organization. 1453 1454 Requires either organization_id OR organization_name (exact match) to be provided. 1455 This tool will NOT list workspaces across all organizations - you must specify 1456 which organization to list workspaces from. 1457 """ 1458 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1459 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1460 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1461 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1462 1463 resolved_org_id = _resolve_organization_id( 1464 organization_id=organization_id, 1465 organization_name=organization_name, 1466 api_root=api_url, 1467 client_id=SecretString(client_id) if client_id else None, 1468 client_secret=SecretString(client_secret) if client_secret else None, 1469 bearer_token=SecretString(bearer_token) if bearer_token else None, 1470 ) 1471 1472 workspaces = api_util.list_workspaces_in_organization( 1473 organization_id=resolved_org_id, 1474 api_root=api_url, 1475 client_id=SecretString(client_id) if client_id else None, 1476 client_secret=SecretString(client_secret) if client_secret else None, 1477 bearer_token=SecretString(bearer_token) if bearer_token else None, 1478 name_contains=name_contains, 1479 max_items_limit=max_items_limit, 1480 ) 1481 1482 return [ 1483 CloudWorkspaceResult( 1484 workspace_id=ws.get("workspaceId", ""), 1485 workspace_name=ws.get("name", ""), 1486 organization_id=ws.get("organizationId", ""), 1487 ) 1488 for ws in workspaces 1489 ]
List all workspaces in a specific organization.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1492@mcp_tool( 1493 read_only=True, 1494 idempotent=True, 1495 open_world=True, 1496 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1497) 1498def describe_cloud_organization( 1499 ctx: Context, 1500 *, 1501 organization_id: Annotated[ 1502 str | None, 1503 Field( 1504 description="Organization ID. Required if organization_name is not provided.", 1505 default=None, 1506 ), 1507 ], 1508 organization_name: Annotated[ 1509 str | None, 1510 Field( 1511 description=( 1512 "Organization name (exact match). " "Required if organization_id is not provided." 1513 ), 1514 default=None, 1515 ), 1516 ], 1517) -> CloudOrganizationResult: 1518 """Get details about a specific organization including billing status. 1519 1520 Requires either organization_id OR organization_name (exact match) to be provided. 1521 This tool is useful for looking up an organization's ID from its name, or vice versa. 1522 """ 1523 bearer_token = get_mcp_config(ctx, MCP_CONFIG_BEARER_TOKEN) 1524 client_id = get_mcp_config(ctx, MCP_CONFIG_CLIENT_ID) 1525 client_secret = get_mcp_config(ctx, MCP_CONFIG_CLIENT_SECRET) 1526 api_url = get_mcp_config(ctx, MCP_CONFIG_API_URL) or api_util.CLOUD_API_ROOT 1527 1528 org = _resolve_organization( 1529 organization_id=organization_id, 1530 organization_name=organization_name, 1531 api_root=api_url, 1532 client_id=SecretString(client_id) if client_id else None, 1533 client_secret=SecretString(client_secret) if client_secret else None, 1534 bearer_token=SecretString(bearer_token) if bearer_token else None, 1535 ) 1536 1537 # CloudOrganization has lazy loading of billing properties 1538 return CloudOrganizationResult( 1539 id=org.organization_id, 1540 name=org.organization_name, 1541 email=org.email, 1542 payment_status=org.payment_status, 1543 subscription_status=org.subscription_status, 1544 is_account_locked=org.is_account_locked, 1545 )
Get details about a specific organization including billing status.
Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1562@mcp_tool( 1563 open_world=True, 1564 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1565) 1566def publish_custom_source_definition( 1567 ctx: Context, 1568 name: Annotated[ 1569 str, 1570 Field(description="The name for the custom connector definition."), 1571 ], 1572 *, 1573 workspace_id: Annotated[ 1574 str | None, 1575 Field( 1576 description=WORKSPACE_ID_TIP_TEXT, 1577 default=None, 1578 ), 1579 ], 1580 manifest_yaml: Annotated[ 1581 str | Path | None, 1582 Field( 1583 description=( 1584 "The Low-code CDK manifest as a YAML string or file path. " 1585 "Required for YAML connectors." 1586 ), 1587 default=None, 1588 ), 1589 ] = None, 1590 unique: Annotated[ 1591 bool, 1592 Field( 1593 description="Whether to require a unique name.", 1594 default=True, 1595 ), 1596 ] = True, 1597 pre_validate: Annotated[ 1598 bool, 1599 Field( 1600 description="Whether to validate the manifest client-side before publishing.", 1601 default=True, 1602 ), 1603 ] = True, 1604 testing_values: Annotated[ 1605 dict | str | None, 1606 Field( 1607 description=( 1608 "Optional testing configuration values for the Builder UI. " 1609 "Can be provided as a JSON object or JSON string. " 1610 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1611 "If provided, these values replace any existing testing values " 1612 "for the connector builder project, allowing immediate test read operations." 1613 ), 1614 default=None, 1615 ), 1616 ], 1617 testing_values_secret_name: Annotated[ 1618 str | None, 1619 Field( 1620 description=( 1621 "Optional name of a secret containing testing configuration values " 1622 "in JSON or YAML format. The secret will be resolved by the MCP " 1623 "server and merged into testing_values, with secret values taking " 1624 "precedence. This lets the agent reference secrets without sending " 1625 "raw values as tool arguments." 1626 ), 1627 default=None, 1628 ), 1629 ], 1630) -> str: 1631 """Publish a custom YAML source connector definition to Airbyte Cloud. 1632 1633 Note: Only YAML (declarative) connectors are currently supported. 1634 Docker-based custom sources are not yet available. 1635 """ 1636 processed_manifest = manifest_yaml 1637 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1638 processed_manifest = Path(manifest_yaml) 1639 1640 # Resolve testing values from inline config and/or secret 1641 testing_values_dict: dict[str, Any] | None = None 1642 if testing_values is not None or testing_values_secret_name is not None: 1643 testing_values_dict = ( 1644 resolve_connector_config( 1645 config=testing_values, 1646 config_secret_name=testing_values_secret_name, 1647 ) 1648 or None 1649 ) 1650 1651 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1652 custom_source = workspace.publish_custom_source_definition( 1653 name=name, 1654 manifest_yaml=processed_manifest, 1655 unique=unique, 1656 pre_validate=pre_validate, 1657 testing_values=testing_values_dict, 1658 ) 1659 register_guid_created_in_session(custom_source.definition_id) 1660 return ( 1661 "Successfully published custom YAML source definition:\n" 1662 + _get_custom_source_definition_description( 1663 custom_source=custom_source, 1664 ) 1665 + "\n" 1666 )
Publish a custom YAML source connector definition to Airbyte Cloud.
Note: Only YAML (declarative) connectors are currently supported.
Docker-based custom sources are not yet available.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1669@mcp_tool( 1670 read_only=True, 1671 idempotent=True, 1672 open_world=True, 1673) 1674def list_custom_source_definitions( 1675 ctx: Context, 1676 *, 1677 workspace_id: Annotated[ 1678 str | None, 1679 Field( 1680 description=WORKSPACE_ID_TIP_TEXT, 1681 default=None, 1682 ), 1683 ], 1684) -> list[dict[str, Any]]: 1685 """List custom YAML source definitions in the Airbyte Cloud workspace. 1686 1687 Note: Only YAML (declarative) connectors are currently supported. 1688 Docker-based custom sources are not yet available. 1689 """ 1690 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1691 definitions = workspace.list_custom_source_definitions( 1692 definition_type="yaml", 1693 ) 1694 1695 return [ 1696 { 1697 "definition_id": d.definition_id, 1698 "name": d.name, 1699 "version": d.version, 1700 "connector_builder_project_url": d.connector_builder_project_url, 1701 } 1702 for d in definitions 1703 ]
List custom YAML source definitions in the Airbyte Cloud workspace.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1706@mcp_tool( 1707 read_only=True, 1708 idempotent=True, 1709 open_world=True, 1710) 1711def get_custom_source_definition( 1712 ctx: Context, 1713 definition_id: Annotated[ 1714 str, 1715 Field(description="The ID of the custom source definition to retrieve."), 1716 ], 1717 *, 1718 workspace_id: Annotated[ 1719 str | None, 1720 Field( 1721 description=WORKSPACE_ID_TIP_TEXT, 1722 default=None, 1723 ), 1724 ], 1725) -> dict[str, Any]: 1726 """Get a custom YAML source definition from Airbyte Cloud, including its manifest. 1727 1728 Returns the full definition details including the manifest YAML content, 1729 which can be used to inspect or store the connector configuration locally. 1730 1731 Note: Only YAML (declarative) connectors are currently supported. 1732 Docker-based custom sources are not yet available. 1733 """ 1734 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1735 definition = workspace.get_custom_source_definition( 1736 definition_id=definition_id, 1737 definition_type="yaml", 1738 ) 1739 1740 return { 1741 "definition_id": definition.definition_id, 1742 "name": definition.name, 1743 "version": definition.version, 1744 "connector_builder_project_id": definition.connector_builder_project_id, 1745 "connector_builder_project_url": definition.connector_builder_project_url, 1746 "manifest": definition.manifest, 1747 }
Get a custom YAML source definition from Airbyte Cloud, including its manifest.
Returns the full definition details including the manifest YAML content, which can be used to inspect or store the connector configuration locally.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1750@mcp_tool( 1751 destructive=True, 1752 open_world=True, 1753) 1754def update_custom_source_definition( 1755 ctx: Context, 1756 definition_id: Annotated[ 1757 str, 1758 Field(description="The ID of the definition to update."), 1759 ], 1760 manifest_yaml: Annotated[ 1761 str | Path | None, 1762 Field( 1763 description=( 1764 "New manifest as YAML string or file path. " 1765 "Optional; omit to update only testing values." 1766 ), 1767 default=None, 1768 ), 1769 ] = None, 1770 *, 1771 workspace_id: Annotated[ 1772 str | None, 1773 Field( 1774 description=WORKSPACE_ID_TIP_TEXT, 1775 default=None, 1776 ), 1777 ], 1778 pre_validate: Annotated[ 1779 bool, 1780 Field( 1781 description="Whether to validate the manifest client-side before updating.", 1782 default=True, 1783 ), 1784 ] = True, 1785 testing_values: Annotated[ 1786 dict | str | None, 1787 Field( 1788 description=( 1789 "Optional testing configuration values for the Builder UI. " 1790 "Can be provided as a JSON object or JSON string. " 1791 "Supports inline secret refs via 'secret_reference::ENV_VAR_NAME' syntax. " 1792 "If provided, these values replace any existing testing values " 1793 "for the connector builder project. The entire testing values object " 1794 "is overwritten, so pass the full set of values you want to persist." 1795 ), 1796 default=None, 1797 ), 1798 ], 1799 testing_values_secret_name: Annotated[ 1800 str | None, 1801 Field( 1802 description=( 1803 "Optional name of a secret containing testing configuration values " 1804 "in JSON or YAML format. The secret will be resolved by the MCP " 1805 "server and merged into testing_values, with secret values taking " 1806 "precedence. This lets the agent reference secrets without sending " 1807 "raw values as tool arguments." 1808 ), 1809 default=None, 1810 ), 1811 ], 1812) -> str: 1813 """Update a custom YAML source definition in Airbyte Cloud. 1814 1815 Updates the manifest and/or testing values for an existing custom source definition. 1816 At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided. 1817 """ 1818 check_guid_created_in_session(definition_id) 1819 1820 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1821 1822 if manifest_yaml is None and testing_values is None and testing_values_secret_name is None: 1823 raise PyAirbyteInputError( 1824 message=( 1825 "At least one of manifest_yaml, testing_values, or testing_values_secret_name " 1826 "must be provided to update a custom source definition." 1827 ), 1828 context={ 1829 "definition_id": definition_id, 1830 "workspace_id": workspace.workspace_id, 1831 }, 1832 ) 1833 1834 processed_manifest: str | Path | None = manifest_yaml 1835 if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: 1836 processed_manifest = Path(manifest_yaml) 1837 1838 # Resolve testing values from inline config and/or secret 1839 testing_values_dict: dict[str, Any] | None = None 1840 if testing_values is not None or testing_values_secret_name is not None: 1841 testing_values_dict = ( 1842 resolve_connector_config( 1843 config=testing_values, 1844 config_secret_name=testing_values_secret_name, 1845 ) 1846 or None 1847 ) 1848 1849 definition = workspace.get_custom_source_definition( 1850 definition_id=definition_id, 1851 definition_type="yaml", 1852 ) 1853 custom_source: CustomCloudSourceDefinition = definition 1854 1855 if processed_manifest is not None: 1856 custom_source = definition.update_definition( 1857 manifest_yaml=processed_manifest, 1858 pre_validate=pre_validate, 1859 ) 1860 1861 if testing_values_dict is not None: 1862 custom_source.set_testing_values(testing_values_dict) 1863 1864 return ( 1865 "Successfully updated custom YAML source definition:\n" 1866 + _get_custom_source_definition_description( 1867 custom_source=custom_source, 1868 ) 1869 )
Update a custom YAML source definition in Airbyte Cloud.
Updates the manifest and/or testing values for an existing custom source definition. At least one of manifest_yaml, testing_values, or testing_values_secret_name must be provided.
1872@mcp_tool( 1873 destructive=True, 1874 open_world=True, 1875) 1876def permanently_delete_custom_source_definition( 1877 ctx: Context, 1878 definition_id: Annotated[ 1879 str, 1880 Field(description="The ID of the custom source definition to delete."), 1881 ], 1882 name: Annotated[ 1883 str, 1884 Field(description="The expected name of the custom source definition (for verification)."), 1885 ], 1886 *, 1887 workspace_id: Annotated[ 1888 str | None, 1889 Field( 1890 description=WORKSPACE_ID_TIP_TEXT, 1891 default=None, 1892 ), 1893 ], 1894) -> str: 1895 """Permanently delete a custom YAML source definition from Airbyte Cloud. 1896 1897 IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" 1898 (case insensitive). 1899 1900 If the connector does not meet this requirement, the deletion will be rejected with a 1901 helpful error message. Instruct the user to rename the connector appropriately to authorize 1902 the deletion. 1903 1904 The provided name must match the actual name of the definition for the operation to proceed. 1905 This is a safety measure to ensure you are deleting the correct resource. 1906 1907 Note: Only YAML (declarative) connectors are currently supported. 1908 Docker-based custom sources are not yet available. 1909 """ 1910 check_guid_created_in_session(definition_id) 1911 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 1912 definition = workspace.get_custom_source_definition( 1913 definition_id=definition_id, 1914 definition_type="yaml", 1915 ) 1916 actual_name: str = definition.name 1917 1918 # Verify the name matches 1919 if actual_name != name: 1920 raise PyAirbyteInputError( 1921 message=( 1922 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1923 "The provided name must exactly match the definition's actual name. " 1924 "This is a safety measure to prevent accidental deletion." 1925 ), 1926 context={ 1927 "definition_id": definition_id, 1928 "expected_name": name, 1929 "actual_name": actual_name, 1930 }, 1931 ) 1932 1933 definition.permanently_delete( 1934 safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. 1935 ) 1936 return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})"
Permanently delete a custom YAML source definition from Airbyte Cloud.
IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" (case insensitive).
If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion.
The provided name must match the actual name of the definition for the operation to proceed. This is a safety measure to ensure you are deleting the correct resource.
Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available.
1939@mcp_tool( 1940 destructive=True, 1941 open_world=True, 1942 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1943) 1944def permanently_delete_cloud_source( 1945 ctx: Context, 1946 source_id: Annotated[ 1947 str, 1948 Field(description="The ID of the deployed source to delete."), 1949 ], 1950 name: Annotated[ 1951 str, 1952 Field(description="The expected name of the source (for verification)."), 1953 ], 1954) -> str: 1955 """Permanently delete a deployed source connector from Airbyte Cloud. 1956 1957 IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" 1958 (case insensitive). 1959 1960 If the source does not meet this requirement, the deletion will be rejected with a 1961 helpful error message. Instruct the user to rename the source appropriately to authorize 1962 the deletion. 1963 1964 The provided name must match the actual name of the source for the operation to proceed. 1965 This is a safety measure to ensure you are deleting the correct resource. 1966 """ 1967 check_guid_created_in_session(source_id) 1968 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 1969 source = workspace.get_source(source_id=source_id) 1970 actual_name: str = cast(str, source.name) 1971 1972 # Verify the name matches 1973 if actual_name != name: 1974 raise PyAirbyteInputError( 1975 message=( 1976 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 1977 "The provided name must exactly match the source's actual name. " 1978 "This is a safety measure to prevent accidental deletion." 1979 ), 1980 context={ 1981 "source_id": source_id, 1982 "expected_name": name, 1983 "actual_name": actual_name, 1984 }, 1985 ) 1986 1987 # Safe mode is hard-coded to True for extra protection when running in LLM agents 1988 workspace.permanently_delete_source( 1989 source=source_id, 1990 safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) 1991 ) 1992 return f"Successfully deleted source '{actual_name}' (ID: {source_id})"
Permanently delete a deployed source connector from Airbyte Cloud.
IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme"
(case insensitive).
If the source does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the source appropriately to authorize
the deletion.
The provided name must match the actual name of the source for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
1995@mcp_tool( 1996 destructive=True, 1997 open_world=True, 1998 extra_help_text=CLOUD_AUTH_TIP_TEXT, 1999) 2000def permanently_delete_cloud_destination( 2001 ctx: Context, 2002 destination_id: Annotated[ 2003 str, 2004 Field(description="The ID of the deployed destination to delete."), 2005 ], 2006 name: Annotated[ 2007 str, 2008 Field(description="The expected name of the destination (for verification)."), 2009 ], 2010) -> str: 2011 """Permanently delete a deployed destination connector from Airbyte Cloud. 2012 2013 IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" 2014 (case insensitive). 2015 2016 If the destination does not meet this requirement, the deletion will be rejected with a 2017 helpful error message. Instruct the user to rename the destination appropriately to authorize 2018 the deletion. 2019 2020 The provided name must match the actual name of the destination for the operation to proceed. 2021 This is a safety measure to ensure you are deleting the correct resource. 2022 """ 2023 check_guid_created_in_session(destination_id) 2024 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2025 destination = workspace.get_destination(destination_id=destination_id) 2026 actual_name: str = cast(str, destination.name) 2027 2028 # Verify the name matches 2029 if actual_name != name: 2030 raise PyAirbyteInputError( 2031 message=( 2032 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2033 "The provided name must exactly match the destination's actual name. " 2034 "This is a safety measure to prevent accidental deletion." 2035 ), 2036 context={ 2037 "destination_id": destination_id, 2038 "expected_name": name, 2039 "actual_name": actual_name, 2040 }, 2041 ) 2042 2043 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2044 workspace.permanently_delete_destination( 2045 destination=destination_id, 2046 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2047 ) 2048 return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})"
Permanently delete a deployed destination connector from Airbyte Cloud.
IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme"
(case insensitive).
If the destination does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the destination appropriately to authorize
the deletion.
The provided name must match the actual name of the destination for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2051@mcp_tool( 2052 destructive=True, 2053 open_world=True, 2054 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2055) 2056def permanently_delete_cloud_connection( 2057 ctx: Context, 2058 connection_id: Annotated[ 2059 str, 2060 Field(description="The ID of the connection to delete."), 2061 ], 2062 name: Annotated[ 2063 str, 2064 Field(description="The expected name of the connection (for verification)."), 2065 ], 2066 *, 2067 cascade_delete_source: Annotated[ 2068 bool, 2069 Field( 2070 description=( 2071 "Whether to also delete the source connector associated with this connection." 2072 ), 2073 default=False, 2074 ), 2075 ] = False, 2076 cascade_delete_destination: Annotated[ 2077 bool, 2078 Field( 2079 description=( 2080 "Whether to also delete the destination connector associated with this connection." 2081 ), 2082 default=False, 2083 ), 2084 ] = False, 2085) -> str: 2086 """Permanently delete a connection from Airbyte Cloud. 2087 2088 IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" 2089 (case insensitive). 2090 2091 If the connection does not meet this requirement, the deletion will be rejected with a 2092 helpful error message. Instruct the user to rename the connection appropriately to authorize 2093 the deletion. 2094 2095 The provided name must match the actual name of the connection for the operation to proceed. 2096 This is a safety measure to ensure you are deleting the correct resource. 2097 """ 2098 check_guid_created_in_session(connection_id) 2099 workspace: CloudWorkspace = _get_cloud_workspace(ctx) 2100 connection = workspace.get_connection(connection_id=connection_id) 2101 actual_name: str = cast(str, connection.name) 2102 2103 # Verify the name matches 2104 if actual_name != name: 2105 raise PyAirbyteInputError( 2106 message=( 2107 f"Name mismatch: expected '{name}' but found '{actual_name}'. " 2108 "The provided name must exactly match the connection's actual name. " 2109 "This is a safety measure to prevent accidental deletion." 2110 ), 2111 context={ 2112 "connection_id": connection_id, 2113 "expected_name": name, 2114 "actual_name": actual_name, 2115 }, 2116 ) 2117 2118 # Safe mode is hard-coded to True for extra protection when running in LLM agents 2119 workspace.permanently_delete_connection( 2120 safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") 2121 connection=connection_id, 2122 cascade_delete_source=cascade_delete_source, 2123 cascade_delete_destination=cascade_delete_destination, 2124 ) 2125 return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})"
Permanently delete a connection from Airbyte Cloud.
IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme"
(case insensitive).
If the connection does not meet this requirement, the deletion will be rejected with a
helpful error message. Instruct the user to rename the connection appropriately to authorize
the deletion.
The provided name must match the actual name of the connection for the operation to proceed.
This is a safety measure to ensure you are deleting the correct resource.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2128@mcp_tool( 2129 open_world=True, 2130 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2131) 2132def rename_cloud_source( 2133 ctx: Context, 2134 source_id: Annotated[ 2135 str, 2136 Field(description="The ID of the deployed source to rename."), 2137 ], 2138 name: Annotated[ 2139 str, 2140 Field(description="New name for the source."), 2141 ], 2142 *, 2143 workspace_id: Annotated[ 2144 str | None, 2145 Field( 2146 description=WORKSPACE_ID_TIP_TEXT, 2147 default=None, 2148 ), 2149 ], 2150) -> str: 2151 """Rename a deployed source connector on Airbyte Cloud.""" 2152 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2153 source = workspace.get_source(source_id=source_id) 2154 source.rename(name=name) 2155 return f"Successfully renamed source '{source_id}' to '{name}'. URL: {source.connector_url}"
Rename a deployed source connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2158@mcp_tool( 2159 destructive=True, 2160 open_world=True, 2161 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2162) 2163def update_cloud_source_config( 2164 ctx: Context, 2165 source_id: Annotated[ 2166 str, 2167 Field(description="The ID of the deployed source to update."), 2168 ], 2169 config: Annotated[ 2170 dict | str, 2171 Field( 2172 description="New configuration for the source connector.", 2173 ), 2174 ], 2175 config_secret_name: Annotated[ 2176 str | None, 2177 Field( 2178 description="The name of the secret containing the configuration.", 2179 default=None, 2180 ), 2181 ] = None, 2182 *, 2183 workspace_id: Annotated[ 2184 str | None, 2185 Field( 2186 description=WORKSPACE_ID_TIP_TEXT, 2187 default=None, 2188 ), 2189 ], 2190) -> str: 2191 """Update a deployed source connector's configuration on Airbyte Cloud. 2192 2193 This is a destructive operation that can break existing connections if the 2194 configuration is changed incorrectly. Use with caution. 2195 """ 2196 check_guid_created_in_session(source_id) 2197 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2198 source = workspace.get_source(source_id=source_id) 2199 2200 config_dict = resolve_connector_config( 2201 config=config, 2202 config_secret_name=config_secret_name, 2203 config_spec_jsonschema=None, # We don't have the spec here 2204 ) 2205 2206 source.update_config(config=config_dict) 2207 return f"Successfully updated source '{source_id}'. URL: {source.connector_url}"
Update a deployed source connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2210@mcp_tool( 2211 open_world=True, 2212 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2213) 2214def rename_cloud_destination( 2215 ctx: Context, 2216 destination_id: Annotated[ 2217 str, 2218 Field(description="The ID of the deployed destination to rename."), 2219 ], 2220 name: Annotated[ 2221 str, 2222 Field(description="New name for the destination."), 2223 ], 2224 *, 2225 workspace_id: Annotated[ 2226 str | None, 2227 Field( 2228 description=WORKSPACE_ID_TIP_TEXT, 2229 default=None, 2230 ), 2231 ], 2232) -> str: 2233 """Rename a deployed destination connector on Airbyte Cloud.""" 2234 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2235 destination = workspace.get_destination(destination_id=destination_id) 2236 destination.rename(name=name) 2237 return ( 2238 f"Successfully renamed destination '{destination_id}' to '{name}'. " 2239 f"URL: {destination.connector_url}" 2240 )
Rename a deployed destination connector on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2243@mcp_tool( 2244 destructive=True, 2245 open_world=True, 2246 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2247) 2248def update_cloud_destination_config( 2249 ctx: Context, 2250 destination_id: Annotated[ 2251 str, 2252 Field(description="The ID of the deployed destination to update."), 2253 ], 2254 config: Annotated[ 2255 dict | str, 2256 Field( 2257 description="New configuration for the destination connector.", 2258 ), 2259 ], 2260 config_secret_name: Annotated[ 2261 str | None, 2262 Field( 2263 description="The name of the secret containing the configuration.", 2264 default=None, 2265 ), 2266 ], 2267 *, 2268 workspace_id: Annotated[ 2269 str | None, 2270 Field( 2271 description=WORKSPACE_ID_TIP_TEXT, 2272 default=None, 2273 ), 2274 ], 2275) -> str: 2276 """Update a deployed destination connector's configuration on Airbyte Cloud. 2277 2278 This is a destructive operation that can break existing connections if the 2279 configuration is changed incorrectly. Use with caution. 2280 """ 2281 check_guid_created_in_session(destination_id) 2282 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2283 destination = workspace.get_destination(destination_id=destination_id) 2284 2285 config_dict = resolve_connector_config( 2286 config=config, 2287 config_secret_name=config_secret_name, 2288 config_spec_jsonschema=None, # We don't have the spec here 2289 ) 2290 2291 destination.update_config(config=config_dict) 2292 return ( 2293 f"Successfully updated destination '{destination_id}'. " f"URL: {destination.connector_url}" 2294 )
Update a deployed destination connector's configuration on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
configuration is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2297@mcp_tool( 2298 open_world=True, 2299 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2300) 2301def rename_cloud_connection( 2302 ctx: Context, 2303 connection_id: Annotated[ 2304 str, 2305 Field(description="The ID of the connection to rename."), 2306 ], 2307 name: Annotated[ 2308 str, 2309 Field(description="New name for the connection."), 2310 ], 2311 *, 2312 workspace_id: Annotated[ 2313 str | None, 2314 Field( 2315 description=WORKSPACE_ID_TIP_TEXT, 2316 default=None, 2317 ), 2318 ], 2319) -> str: 2320 """Rename a connection on Airbyte Cloud.""" 2321 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2322 connection = workspace.get_connection(connection_id=connection_id) 2323 connection.rename(name=name) 2324 return ( 2325 f"Successfully renamed connection '{connection_id}' to '{name}'. " 2326 f"URL: {connection.connection_url}" 2327 )
Rename a connection on Airbyte Cloud.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2330@mcp_tool( 2331 destructive=True, 2332 open_world=True, 2333 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2334) 2335def set_cloud_connection_table_prefix( 2336 ctx: Context, 2337 connection_id: Annotated[ 2338 str, 2339 Field(description="The ID of the connection to update."), 2340 ], 2341 prefix: Annotated[ 2342 str, 2343 Field(description="New table prefix to use when syncing to the destination."), 2344 ], 2345 *, 2346 workspace_id: Annotated[ 2347 str | None, 2348 Field( 2349 description=WORKSPACE_ID_TIP_TEXT, 2350 default=None, 2351 ), 2352 ], 2353) -> str: 2354 """Set the table prefix for a connection on Airbyte Cloud. 2355 2356 This is a destructive operation that can break downstream dependencies if the 2357 table prefix is changed incorrectly. Use with caution. 2358 """ 2359 check_guid_created_in_session(connection_id) 2360 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2361 connection = workspace.get_connection(connection_id=connection_id) 2362 connection.set_table_prefix(prefix=prefix) 2363 return ( 2364 f"Successfully set table prefix for connection '{connection_id}' to '{prefix}'. " 2365 f"URL: {connection.connection_url}" 2366 )
Set the table prefix for a connection on Airbyte Cloud.
This is a destructive operation that can break downstream dependencies if the
table prefix is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2369@mcp_tool( 2370 destructive=True, 2371 open_world=True, 2372 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2373) 2374def set_cloud_connection_selected_streams( 2375 ctx: Context, 2376 connection_id: Annotated[ 2377 str, 2378 Field(description="The ID of the connection to update."), 2379 ], 2380 stream_names: Annotated[ 2381 str | list[str], 2382 Field( 2383 description=( 2384 "The selected stream names to sync within the connection. " 2385 "Must be an explicit stream name or list of streams." 2386 ) 2387 ), 2388 ], 2389 *, 2390 workspace_id: Annotated[ 2391 str | None, 2392 Field( 2393 description=WORKSPACE_ID_TIP_TEXT, 2394 default=None, 2395 ), 2396 ], 2397) -> str: 2398 """Set the selected streams for a connection on Airbyte Cloud. 2399 2400 This is a destructive operation that can break existing connections if the 2401 stream selection is changed incorrectly. Use with caution. 2402 """ 2403 check_guid_created_in_session(connection_id) 2404 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2405 connection = workspace.get_connection(connection_id=connection_id) 2406 2407 resolved_streams_list: list[str] = resolve_list_of_strings(stream_names) 2408 connection.set_selected_streams(stream_names=resolved_streams_list) 2409 2410 return ( 2411 f"Successfully set selected streams for connection '{connection_id}' " 2412 f"to {resolved_streams_list}. URL: {connection.connection_url}" 2413 )
Set the selected streams for a connection on Airbyte Cloud.
This is a destructive operation that can break existing connections if the
stream selection is changed incorrectly. Use with caution.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2416@mcp_tool( 2417 open_world=True, 2418 destructive=True, 2419 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2420) 2421def update_cloud_connection( 2422 ctx: Context, 2423 connection_id: Annotated[ 2424 str, 2425 Field(description="The ID of the connection to update."), 2426 ], 2427 *, 2428 enabled: Annotated[ 2429 bool | None, 2430 Field( 2431 description=( 2432 "Set the connection's enabled status. " 2433 "True enables the connection (status='active'), " 2434 "False disables it (status='inactive'). " 2435 "Leave unset to keep the current status." 2436 ), 2437 default=None, 2438 ), 2439 ], 2440 cron_expression: Annotated[ 2441 str | None, 2442 Field( 2443 description=( 2444 "A cron expression defining when syncs should run. " 2445 "Examples: '0 0 * * *' (daily at midnight UTC), " 2446 "'0 */6 * * *' (every 6 hours), " 2447 "'0 0 * * 0' (weekly on Sunday at midnight UTC). " 2448 "Leave unset to keep the current schedule. " 2449 "Cannot be used together with 'manual_schedule'." 2450 ), 2451 default=None, 2452 ), 2453 ], 2454 manual_schedule: Annotated[ 2455 bool | None, 2456 Field( 2457 description=( 2458 "Set to True to disable automatic syncs (manual scheduling only). " 2459 "Syncs will only run when manually triggered. " 2460 "Cannot be used together with 'cron_expression'." 2461 ), 2462 default=None, 2463 ), 2464 ], 2465 workspace_id: Annotated[ 2466 str | None, 2467 Field( 2468 description=WORKSPACE_ID_TIP_TEXT, 2469 default=None, 2470 ), 2471 ], 2472) -> str: 2473 """Update a connection's settings on Airbyte Cloud. 2474 2475 This tool allows updating multiple connection settings in a single call: 2476 - Enable or disable the connection 2477 - Set a cron schedule for automatic syncs 2478 - Switch to manual scheduling (no automatic syncs) 2479 2480 At least one setting must be provided. The 'cron_expression' and 'manual_schedule' 2481 parameters are mutually exclusive. 2482 """ 2483 check_guid_created_in_session(connection_id) 2484 2485 # Validate that at least one setting is provided 2486 if enabled is None and cron_expression is None and manual_schedule is None: 2487 raise ValueError( 2488 "At least one setting must be provided: 'enabled', 'cron_expression', " 2489 "or 'manual_schedule'." 2490 ) 2491 2492 # Validate mutually exclusive schedule options 2493 if cron_expression is not None and manual_schedule is True: 2494 raise ValueError( 2495 "Cannot specify both 'cron_expression' and 'manual_schedule=True'. " 2496 "Use 'cron_expression' for scheduled syncs or 'manual_schedule=True' " 2497 "for manual-only syncs." 2498 ) 2499 2500 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2501 connection = workspace.get_connection(connection_id=connection_id) 2502 2503 changes_made: list[str] = [] 2504 2505 # Apply enabled status change 2506 if enabled is not None: 2507 connection.set_enabled(enabled=enabled) 2508 status_str = "enabled" if enabled else "disabled" 2509 changes_made.append(f"status set to '{status_str}'") 2510 2511 # Apply schedule change 2512 if cron_expression is not None: 2513 connection.set_schedule(cron_expression=cron_expression) 2514 changes_made.append(f"schedule set to '{cron_expression}'") 2515 elif manual_schedule is True: 2516 connection.set_manual_schedule() 2517 changes_made.append("schedule set to 'manual'") 2518 2519 changes_summary = ", ".join(changes_made) 2520 return ( 2521 f"Successfully updated connection '{connection_id}': {changes_summary}. " 2522 f"URL: {connection.connection_url}" 2523 )
Update a connection's settings on Airbyte Cloud.
This tool allows updating multiple connection settings in a single call:
- Enable or disable the connection
- Set a cron schedule for automatic syncs
- Switch to manual scheduling (no automatic syncs)
At least one setting must be provided. The 'cron_expression' and 'manual_schedule'
parameters are mutually exclusive.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2526@mcp_tool( 2527 read_only=True, 2528 idempotent=True, 2529 open_world=True, 2530 extra_help_text=CLOUD_AUTH_TIP_TEXT, 2531) 2532def get_connection_artifact( 2533 ctx: Context, 2534 connection_id: Annotated[ 2535 str, 2536 Field(description="The ID of the Airbyte Cloud connection."), 2537 ], 2538 artifact_type: Annotated[ 2539 Literal["state", "catalog"], 2540 Field(description="The type of artifact to retrieve: 'state' or 'catalog'."), 2541 ], 2542 *, 2543 workspace_id: Annotated[ 2544 str | None, 2545 Field( 2546 description=WORKSPACE_ID_TIP_TEXT, 2547 default=None, 2548 ), 2549 ], 2550) -> dict[str, Any]: 2551 """Get a connection artifact (state or catalog) from Airbyte Cloud. 2552 2553 Retrieves the specified artifact for a connection: 2554 - 'state': Returns the full raw connection state including stateType and all 2555 state data, or {"ERROR": "..."} if no state is set. 2556 - 'catalog': Returns the configured catalog (syncCatalog) as a dict, 2557 or {"ERROR": "..."} if not found. 2558 """ 2559 workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id) 2560 connection = workspace.get_connection(connection_id=connection_id) 2561 2562 if artifact_type == "state": 2563 result = connection.dump_raw_state() 2564 if result.get("stateType") == "not_set": 2565 return {"ERROR": "No state is set for this connection (stateType: not_set)"} 2566 return result 2567 2568 # artifact_type == "catalog" 2569 result = connection.dump_raw_catalog() 2570 if result is None: 2571 return {"ERROR": "No catalog found for this connection"} 2572 return result
Get a connection artifact (state or catalog) from Airbyte Cloud.
Retrieves the specified artifact for a connection:
- 'state': Returns the full raw connection state including stateType and all
state data, or {"ERROR": "..."} if no state is set.
- 'catalog': Returns the configured catalog (syncCatalog) as a dict,
or {"ERROR": "..."} if not found.
By default, the AIRBYTE_CLOUD_CLIENT_ID, AIRBYTE_CLOUD_CLIENT_SECRET, and AIRBYTE_CLOUD_WORKSPACE_ID environment variables will be used to authenticate with the Airbyte Cloud API.
2575def register_cloud_tools(app: FastMCP) -> None: 2576 """Register cloud tools with the FastMCP app. 2577 2578 Args: 2579 app: FastMCP application instance 2580 """ 2581 register_mcp_tools( 2582 app, 2583 mcp_module=__name__, 2584 exclude_args=["workspace_id"] if AIRBYTE_CLOUD_WORKSPACE_ID_IS_SET else None, 2585 )
Register cloud tools with the FastMCP app.
Arguments:
- app: FastMCP application instance