airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
name="test-source",
source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass, field 39from functools import cached_property 40from pathlib import Path 41from typing import TYPE_CHECKING, Any, Literal, overload 42 43import yaml 44 45from airbyte import exceptions as exc 46from airbyte._util import api_util, text_util 47from airbyte._util.api_util import get_web_url_root 48from airbyte.cloud.auth import ( 49 resolve_cloud_api_url, 50 resolve_cloud_bearer_token, 51 resolve_cloud_client_id, 52 resolve_cloud_client_secret, 53 resolve_cloud_workspace_id, 54) 55from airbyte.cloud.client_config import CloudClientConfig 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.connectors import ( 58 CloudDestination, 59 CloudSource, 60 CustomCloudSourceDefinition, 61) 62from airbyte.destinations.base import Destination 63from airbyte.exceptions import AirbyteError 64from airbyte.secrets.base import SecretString 65 66 67if TYPE_CHECKING: 68 from collections.abc import Callable 69 70 from airbyte.sources.base import Source 71 72 73class CloudOrganization: 74 """Information about an organization in Airbyte Cloud. 75 76 This class provides lazy loading of organization attributes including billing status. 77 It is typically created via CloudWorkspace.get_organization(). 78 """ 79 80 def __init__( 81 self, 82 organization_id: str, 83 organization_name: str | None = None, 84 email: str | None = None, 85 *, 86 api_root: str = api_util.CLOUD_API_ROOT, 87 client_id: SecretString | None = None, 88 client_secret: SecretString | None = None, 89 bearer_token: SecretString | None = None, 90 ) -> None: 91 """Initialize a CloudOrganization. 92 93 Args: 94 organization_id: The organization ID. 95 organization_name: Display name of the organization. 96 email: Email associated with the organization. 97 api_root: The API root URL. 98 client_id: OAuth client ID for authentication. 99 client_secret: OAuth client secret for authentication. 100 bearer_token: Bearer token for authentication (alternative to client credentials). 101 """ 102 self.organization_id = organization_id 103 """The organization ID.""" 104 105 self._organization_name = organization_name 106 """Display name of the organization.""" 107 108 self._email = email 109 """Email associated with the organization.""" 110 111 self._api_root = api_root 112 self._client_id = client_id 113 self._client_secret = client_secret 114 self._bearer_token = bearer_token 115 116 # Cached organization info (billing, etc.) 117 self._organization_info: dict[str, Any] | None = None 118 # Flag to remember if fetching organization info failed (e.g., permission issues) 119 self._organization_info_fetch_failed: bool = False 120 121 def _fetch_organization_info(self, *, force_refresh: bool = False) -> dict[str, Any]: 122 """Fetch and cache organization info including billing status. 123 124 If fetching fails (e.g., due to permission issues), the failure is cached and 125 subsequent calls will return an empty dict without retrying. 126 127 Args: 128 force_refresh: If True, always fetch from the API even if cached. 129 130 Returns: 131 Dictionary containing organization info including billing data. 132 Returns empty dict if fetching failed or is not permitted. 133 """ 134 # Reset failure flag if force_refresh is requested 135 if force_refresh: 136 self._organization_info_fetch_failed = False 137 138 # If we already know fetching failed, return empty dict without retrying 139 if self._organization_info_fetch_failed: 140 return {} 141 142 if not force_refresh and self._organization_info is not None: 143 return self._organization_info 144 145 try: 146 self._organization_info = api_util.get_organization_info( 147 organization_id=self.organization_id, 148 api_root=self._api_root, 149 client_id=self._client_id, 150 client_secret=self._client_secret, 151 bearer_token=self._bearer_token, 152 ) 153 except Exception: 154 # Cache the failure so we don't retry on subsequent property accesses 155 self._organization_info_fetch_failed = True 156 return {} 157 else: 158 return self._organization_info 159 160 @property 161 def organization_name(self) -> str | None: 162 """Display name of the organization.""" 163 if self._organization_name is not None: 164 return self._organization_name 165 # Try to fetch from API if not set (returns empty dict on failure) 166 info = self._fetch_organization_info() 167 return info.get("organizationName") 168 169 @property 170 def email(self) -> str | None: 171 """Email associated with the organization.""" 172 if self._email is not None: 173 return self._email 174 # Try to fetch from API if not set (returns empty dict on failure) 175 info = self._fetch_organization_info() 176 return info.get("email") 177 178 @property 179 def payment_status(self) -> str | None: 180 """Payment status of the organization. 181 182 Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'. 183 When 'disabled', syncs are blocked due to unpaid invoices. 184 Returns None if billing info is not available (e.g., due to permission issues). 185 """ 186 info = self._fetch_organization_info() 187 return (info.get("billing") or {}).get("paymentStatus") 188 189 @property 190 def subscription_status(self) -> str | None: 191 """Subscription status of the organization. 192 193 Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'. 194 Returns None if billing info is not available (e.g., due to permission issues). 195 """ 196 info = self._fetch_organization_info() 197 return (info.get("billing") or {}).get("subscriptionStatus") 198 199 @property 200 def is_account_locked(self) -> bool: 201 """Whether the account is locked due to billing issues. 202 203 Returns True if payment_status is 'disabled'/'locked' or subscription_status is 204 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state. 205 """ 206 return api_util.is_account_locked(self.payment_status, self.subscription_status) 207 208 209@dataclass 210class CloudWorkspace: 211 """A remote workspace on the Airbyte Cloud. 212 213 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 214 instances, both OSS and Enterprise. 215 216 Two authentication methods are supported (mutually exclusive): 217 1. OAuth2 client credentials (client_id + client_secret) 218 2. Bearer token authentication 219 220 Example with client credentials: 221 ```python 222 workspace = CloudWorkspace( 223 workspace_id="...", 224 client_id="...", 225 client_secret="...", 226 ) 227 ``` 228 229 Example with bearer token: 230 ```python 231 workspace = CloudWorkspace( 232 workspace_id="...", 233 bearer_token="...", 234 ) 235 ``` 236 """ 237 238 workspace_id: str 239 client_id: SecretString | None = None 240 client_secret: SecretString | None = None 241 api_root: str = api_util.CLOUD_API_ROOT 242 bearer_token: SecretString | None = None 243 244 # Internal credentials object (set in __post_init__, excluded from __init__) 245 _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) 246 247 def __post_init__(self) -> None: 248 """Validate and initialize credentials.""" 249 # Wrap secrets in SecretString if provided 250 if self.client_id is not None: 251 self.client_id = SecretString(self.client_id) 252 if self.client_secret is not None: 253 self.client_secret = SecretString(self.client_secret) 254 if self.bearer_token is not None: 255 self.bearer_token = SecretString(self.bearer_token) 256 257 # Create internal CloudClientConfig object (validates mutual exclusivity) 258 self._credentials = CloudClientConfig( 259 client_id=self.client_id, 260 client_secret=self.client_secret, 261 bearer_token=self.bearer_token, 262 api_root=self.api_root, 263 ) 264 265 @classmethod 266 def from_env( 267 cls, 268 workspace_id: str | None = None, 269 *, 270 api_root: str | None = None, 271 ) -> CloudWorkspace: 272 """Create a CloudWorkspace using credentials from environment variables. 273 274 This factory method resolves credentials from environment variables, 275 providing a convenient way to create a workspace without explicitly 276 passing credentials. 277 278 Two authentication methods are supported (mutually exclusive): 279 1. Bearer token (checked first) 280 2. OAuth2 client credentials (fallback) 281 282 Environment variables used: 283 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 284 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 285 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 286 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 287 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 288 289 Args: 290 workspace_id: The workspace ID. If not provided, will be resolved from 291 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 292 api_root: The API root URL. If not provided, will be resolved from 293 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 294 the Airbyte Cloud API. 295 296 Returns: 297 A CloudWorkspace instance configured with credentials from the environment. 298 299 Raises: 300 PyAirbyteSecretNotFoundError: If required credentials are not found in 301 the environment. 302 303 Example: 304 ```python 305 # With workspace_id from environment 306 workspace = CloudWorkspace.from_env() 307 308 # With explicit workspace_id 309 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 310 ``` 311 """ 312 resolved_api_root = resolve_cloud_api_url(api_root) 313 314 # Try bearer token first 315 bearer_token = resolve_cloud_bearer_token() 316 if bearer_token: 317 return cls( 318 workspace_id=resolve_cloud_workspace_id(workspace_id), 319 bearer_token=bearer_token, 320 api_root=resolved_api_root, 321 ) 322 323 # Fall back to client credentials 324 return cls( 325 workspace_id=resolve_cloud_workspace_id(workspace_id), 326 client_id=resolve_cloud_client_id(), 327 client_secret=resolve_cloud_client_secret(), 328 api_root=resolved_api_root, 329 ) 330 331 @property 332 def workspace_url(self) -> str | None: 333 """The web URL of the workspace.""" 334 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 335 336 @cached_property 337 def _organization_info(self) -> dict[str, Any]: 338 """Fetch and cache organization info for this workspace. 339 340 Uses the Config API endpoint for an efficient O(1) lookup. 341 This is an internal method; use get_organization() for public access. 342 """ 343 return api_util.get_workspace_organization_info( 344 workspace_id=self.workspace_id, 345 api_root=self.api_root, 346 client_id=self.client_id, 347 client_secret=self.client_secret, 348 bearer_token=self.bearer_token, 349 ) 350 351 @overload 352 def get_organization(self) -> CloudOrganization: ... 353 354 @overload 355 def get_organization( 356 self, 357 *, 358 raise_on_error: Literal[True], 359 ) -> CloudOrganization: ... 360 361 @overload 362 def get_organization( 363 self, 364 *, 365 raise_on_error: Literal[False], 366 ) -> CloudOrganization | None: ... 367 368 def get_organization( 369 self, 370 *, 371 raise_on_error: bool = True, 372 ) -> CloudOrganization | None: 373 """Get the organization this workspace belongs to. 374 375 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 376 which may not be available with workspace-scoped credentials. 377 378 Args: 379 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 380 If False, returns None instead of raising. 381 382 Returns: 383 CloudOrganization object with organization_id and organization_name, 384 or None if raise_on_error=False and an error occurred. 385 386 Raises: 387 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 388 (e.g., due to insufficient permissions or missing data). 389 """ 390 try: 391 info = self._organization_info 392 except (AirbyteError, NotImplementedError): 393 if raise_on_error: 394 raise 395 return None 396 397 organization_id = info.get("organizationId") 398 organization_name = info.get("organizationName") 399 400 # Validate that both organization_id and organization_name are non-null and non-empty 401 if not organization_id or not organization_name: 402 if raise_on_error: 403 raise AirbyteError( 404 message="Organization info is incomplete.", 405 context={ 406 "organization_id": organization_id, 407 "organization_name": organization_name, 408 }, 409 ) 410 return None 411 412 return CloudOrganization( 413 organization_id=organization_id, 414 organization_name=organization_name, 415 api_root=self.api_root, 416 client_id=self.client_id, 417 client_secret=self.client_secret, 418 bearer_token=self.bearer_token, 419 ) 420 421 # Test connection and creds 422 423 def connect(self) -> None: 424 """Check that the workspace is reachable and raise an exception otherwise. 425 426 Note: It is not necessary to call this method before calling other operations. It 427 serves primarily as a simple check to ensure that the workspace is reachable 428 and credentials are correct. 429 """ 430 _ = api_util.get_workspace( 431 api_root=self.api_root, 432 workspace_id=self.workspace_id, 433 client_id=self.client_id, 434 client_secret=self.client_secret, 435 bearer_token=self.bearer_token, 436 ) 437 print(f"Successfully connected to workspace: {self.workspace_url}") 438 439 # Get sources, destinations, and connections 440 441 def get_connection( 442 self, 443 connection_id: str, 444 ) -> CloudConnection: 445 """Get a connection by ID. 446 447 This method does not fetch data from the API. It returns a `CloudConnection` object, 448 which will be loaded lazily as needed. 449 """ 450 return CloudConnection( 451 workspace=self, 452 connection_id=connection_id, 453 ) 454 455 def get_source( 456 self, 457 source_id: str, 458 ) -> CloudSource: 459 """Get a source by ID. 460 461 This method does not fetch data from the API. It returns a `CloudSource` object, 462 which will be loaded lazily as needed. 463 """ 464 return CloudSource( 465 workspace=self, 466 connector_id=source_id, 467 ) 468 469 def get_destination( 470 self, 471 destination_id: str, 472 ) -> CloudDestination: 473 """Get a destination by ID. 474 475 This method does not fetch data from the API. It returns a `CloudDestination` object, 476 which will be loaded lazily as needed. 477 """ 478 return CloudDestination( 479 workspace=self, 480 connector_id=destination_id, 481 ) 482 483 # Deploy sources and destinations 484 485 def deploy_source( 486 self, 487 name: str, 488 source: Source, 489 *, 490 unique: bool = True, 491 random_name_suffix: bool = False, 492 ) -> CloudSource: 493 """Deploy a source to the workspace. 494 495 Returns the newly deployed source. 496 497 Args: 498 name: The name to use when deploying. 499 source: The source object to deploy. 500 unique: Whether to require a unique name. If `True`, duplicate names 501 are not allowed. Defaults to `True`. 502 random_name_suffix: Whether to append a random suffix to the name. 503 """ 504 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 505 source_config_dict["sourceType"] = source.name.replace("source-", "") 506 507 if random_name_suffix: 508 name += f" (ID: {text_util.generate_random_suffix()})" 509 510 if unique: 511 existing = self.list_sources(name=name) 512 if existing: 513 raise exc.AirbyteDuplicateResourcesError( 514 resource_type="source", 515 resource_name=name, 516 ) 517 518 deployed_source = api_util.create_source( 519 name=name, 520 api_root=self.api_root, 521 workspace_id=self.workspace_id, 522 config=source_config_dict, 523 client_id=self.client_id, 524 client_secret=self.client_secret, 525 bearer_token=self.bearer_token, 526 ) 527 return CloudSource( 528 workspace=self, 529 connector_id=deployed_source.source_id, 530 ) 531 532 def deploy_destination( 533 self, 534 name: str, 535 destination: Destination | dict[str, Any], 536 *, 537 unique: bool = True, 538 random_name_suffix: bool = False, 539 ) -> CloudDestination: 540 """Deploy a destination to the workspace. 541 542 Returns the newly deployed destination ID. 543 544 Args: 545 name: The name to use when deploying. 546 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 547 dictionary of configuration values. 548 unique: Whether to require a unique name. If `True`, duplicate names 549 are not allowed. Defaults to `True`. 550 random_name_suffix: Whether to append a random suffix to the name. 551 """ 552 if isinstance(destination, Destination): 553 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 554 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 555 # raise ValueError(destination_conf_dict) 556 else: 557 destination_conf_dict = destination.copy() 558 if "destinationType" not in destination_conf_dict: 559 raise exc.PyAirbyteInputError( 560 message="Missing `destinationType` in configuration dictionary.", 561 ) 562 563 if random_name_suffix: 564 name += f" (ID: {text_util.generate_random_suffix()})" 565 566 if unique: 567 existing = self.list_destinations(name=name) 568 if existing: 569 raise exc.AirbyteDuplicateResourcesError( 570 resource_type="destination", 571 resource_name=name, 572 ) 573 574 deployed_destination = api_util.create_destination( 575 name=name, 576 api_root=self.api_root, 577 workspace_id=self.workspace_id, 578 config=destination_conf_dict, # Wants a dataclass but accepts dict 579 client_id=self.client_id, 580 client_secret=self.client_secret, 581 bearer_token=self.bearer_token, 582 ) 583 return CloudDestination( 584 workspace=self, 585 connector_id=deployed_destination.destination_id, 586 ) 587 588 def permanently_delete_source( 589 self, 590 source: str | CloudSource, 591 *, 592 safe_mode: bool = True, 593 ) -> None: 594 """Delete a source from the workspace. 595 596 You can pass either the source ID `str` or a deployed `Source` object. 597 598 Args: 599 source: The source ID or CloudSource object to delete 600 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 601 (case insensitive) to prevent accidental deletion. Defaults to True. 602 """ 603 if not isinstance(source, (str, CloudSource)): 604 raise exc.PyAirbyteInputError( 605 message="Invalid source type.", 606 input_value=type(source).__name__, 607 ) 608 609 api_util.delete_source( 610 source_id=source.connector_id if isinstance(source, CloudSource) else source, 611 source_name=source.name if isinstance(source, CloudSource) else None, 612 api_root=self.api_root, 613 client_id=self.client_id, 614 client_secret=self.client_secret, 615 bearer_token=self.bearer_token, 616 safe_mode=safe_mode, 617 ) 618 619 # Deploy and delete destinations 620 621 def permanently_delete_destination( 622 self, 623 destination: str | CloudDestination, 624 *, 625 safe_mode: bool = True, 626 ) -> None: 627 """Delete a deployed destination from the workspace. 628 629 You can pass either the `Cache` class or the deployed destination ID as a `str`. 630 631 Args: 632 destination: The destination ID or CloudDestination object to delete 633 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 634 (case insensitive) to prevent accidental deletion. Defaults to True. 635 """ 636 if not isinstance(destination, (str, CloudDestination)): 637 raise exc.PyAirbyteInputError( 638 message="Invalid destination type.", 639 input_value=type(destination).__name__, 640 ) 641 642 api_util.delete_destination( 643 destination_id=( 644 destination if isinstance(destination, str) else destination.destination_id 645 ), 646 destination_name=( 647 destination.name if isinstance(destination, CloudDestination) else None 648 ), 649 api_root=self.api_root, 650 client_id=self.client_id, 651 client_secret=self.client_secret, 652 bearer_token=self.bearer_token, 653 safe_mode=safe_mode, 654 ) 655 656 # Deploy and delete connections 657 658 def deploy_connection( 659 self, 660 connection_name: str, 661 *, 662 source: CloudSource | str, 663 selected_streams: list[str], 664 destination: CloudDestination | str, 665 table_prefix: str | None = None, 666 ) -> CloudConnection: 667 """Create a new connection between an already deployed source and destination. 668 669 Returns the newly deployed connection object. 670 671 Args: 672 connection_name: The name of the connection. 673 source: The deployed source. You can pass a source ID or a CloudSource object. 674 destination: The deployed destination. You can pass a destination ID or a 675 CloudDestination object. 676 table_prefix: Optional. The table prefix to use when syncing to the destination. 677 selected_streams: The selected stream names to sync within the connection. 678 """ 679 if not selected_streams: 680 raise exc.PyAirbyteInputError( 681 guidance="You must provide `selected_streams` when creating a connection." 682 ) 683 684 source_id: str = source if isinstance(source, str) else source.connector_id 685 destination_id: str = ( 686 destination if isinstance(destination, str) else destination.connector_id 687 ) 688 689 deployed_connection = api_util.create_connection( 690 name=connection_name, 691 source_id=source_id, 692 destination_id=destination_id, 693 api_root=self.api_root, 694 workspace_id=self.workspace_id, 695 selected_stream_names=selected_streams, 696 prefix=table_prefix or "", 697 client_id=self.client_id, 698 client_secret=self.client_secret, 699 bearer_token=self.bearer_token, 700 ) 701 702 return CloudConnection( 703 workspace=self, 704 connection_id=deployed_connection.connection_id, 705 source=deployed_connection.source_id, 706 destination=deployed_connection.destination_id, 707 ) 708 709 def permanently_delete_connection( 710 self, 711 connection: str | CloudConnection, 712 *, 713 cascade_delete_source: bool = False, 714 cascade_delete_destination: bool = False, 715 safe_mode: bool = True, 716 ) -> None: 717 """Delete a deployed connection from the workspace. 718 719 Args: 720 connection: The connection ID or CloudConnection object to delete 721 cascade_delete_source: If True, also delete the source after deleting the connection 722 cascade_delete_destination: If True, also delete the destination after deleting 723 the connection 724 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 725 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 726 to cascade deletes. 727 """ 728 if connection is None: 729 raise ValueError("No connection ID provided.") 730 731 if isinstance(connection, str): 732 connection = CloudConnection( 733 workspace=self, 734 connection_id=connection, 735 ) 736 737 api_util.delete_connection( 738 connection_id=connection.connection_id, 739 connection_name=connection.name, 740 api_root=self.api_root, 741 workspace_id=self.workspace_id, 742 client_id=self.client_id, 743 client_secret=self.client_secret, 744 bearer_token=self.bearer_token, 745 safe_mode=safe_mode, 746 ) 747 748 if cascade_delete_source: 749 self.permanently_delete_source( 750 source=connection.source_id, 751 safe_mode=safe_mode, 752 ) 753 if cascade_delete_destination: 754 self.permanently_delete_destination( 755 destination=connection.destination_id, 756 safe_mode=safe_mode, 757 ) 758 759 # List sources, destinations, and connections 760 761 def list_connections( 762 self, 763 name: str | None = None, 764 *, 765 name_filter: Callable | None = None, 766 ) -> list[CloudConnection]: 767 """List connections by name in the workspace. 768 769 TODO: Add pagination support 770 """ 771 connections = api_util.list_connections( 772 api_root=self.api_root, 773 workspace_id=self.workspace_id, 774 name=name, 775 name_filter=name_filter, 776 client_id=self.client_id, 777 client_secret=self.client_secret, 778 bearer_token=self.bearer_token, 779 ) 780 return [ 781 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 782 workspace=self, 783 connection_response=connection, 784 ) 785 for connection in connections 786 if name is None or connection.name == name 787 ] 788 789 def list_sources( 790 self, 791 name: str | None = None, 792 *, 793 name_filter: Callable | None = None, 794 ) -> list[CloudSource]: 795 """List all sources in the workspace. 796 797 TODO: Add pagination support 798 """ 799 sources = api_util.list_sources( 800 api_root=self.api_root, 801 workspace_id=self.workspace_id, 802 name=name, 803 name_filter=name_filter, 804 client_id=self.client_id, 805 client_secret=self.client_secret, 806 bearer_token=self.bearer_token, 807 ) 808 return [ 809 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 810 workspace=self, 811 source_response=source, 812 ) 813 for source in sources 814 if name is None or source.name == name 815 ] 816 817 def list_destinations( 818 self, 819 name: str | None = None, 820 *, 821 name_filter: Callable | None = None, 822 ) -> list[CloudDestination]: 823 """List all destinations in the workspace. 824 825 TODO: Add pagination support 826 """ 827 destinations = api_util.list_destinations( 828 api_root=self.api_root, 829 workspace_id=self.workspace_id, 830 name=name, 831 name_filter=name_filter, 832 client_id=self.client_id, 833 client_secret=self.client_secret, 834 bearer_token=self.bearer_token, 835 ) 836 return [ 837 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 838 workspace=self, 839 destination_response=destination, 840 ) 841 for destination in destinations 842 if name is None or destination.name == name 843 ] 844 845 def publish_custom_source_definition( 846 self, 847 name: str, 848 *, 849 manifest_yaml: dict[str, Any] | Path | str | None = None, 850 docker_image: str | None = None, 851 docker_tag: str | None = None, 852 unique: bool = True, 853 pre_validate: bool = True, 854 testing_values: dict[str, Any] | None = None, 855 ) -> CustomCloudSourceDefinition: 856 """Publish a custom source connector definition. 857 858 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 859 and docker_tag (for Docker connectors), but not both. 860 861 Args: 862 name: Display name for the connector definition 863 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 864 docker_image: Docker repository (e.g., 'airbyte/source-custom') 865 docker_tag: Docker image tag (e.g., '1.0.0') 866 unique: Whether to enforce name uniqueness 867 pre_validate: Whether to validate manifest client-side (YAML only) 868 testing_values: Optional configuration values to use for testing in the 869 Connector Builder UI. If provided, these values are stored as the complete 870 testing values object for the connector builder project (replaces any existing 871 values), allowing immediate test read operations. 872 873 Returns: 874 CustomCloudSourceDefinition object representing the created definition 875 876 Raises: 877 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 878 AirbyteDuplicateResourcesError: If unique=True and name already exists 879 """ 880 is_yaml = manifest_yaml is not None 881 is_docker = docker_image is not None 882 883 if is_yaml == is_docker: 884 raise exc.PyAirbyteInputError( 885 message=( 886 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 887 "docker_image + docker_tag (for Docker connectors), but not both" 888 ), 889 context={ 890 "manifest_yaml_provided": is_yaml, 891 "docker_image_provided": is_docker, 892 }, 893 ) 894 895 if is_docker and docker_tag is None: 896 raise exc.PyAirbyteInputError( 897 message="docker_tag is required when docker_image is specified", 898 context={"docker_image": docker_image}, 899 ) 900 901 if unique: 902 existing = self.list_custom_source_definitions( 903 definition_type="yaml" if is_yaml else "docker", 904 ) 905 if any(d.name == name for d in existing): 906 raise exc.AirbyteDuplicateResourcesError( 907 resource_type="custom_source_definition", 908 resource_name=name, 909 ) 910 911 if is_yaml: 912 manifest_dict: dict[str, Any] 913 if isinstance(manifest_yaml, Path): 914 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 915 elif isinstance(manifest_yaml, str): 916 manifest_dict = yaml.safe_load(manifest_yaml) 917 elif manifest_yaml is not None: 918 manifest_dict = manifest_yaml 919 else: 920 raise exc.PyAirbyteInputError( 921 message="manifest_yaml is required for YAML connectors", 922 context={"name": name}, 923 ) 924 925 if pre_validate: 926 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 927 928 result = api_util.create_custom_yaml_source_definition( 929 name=name, 930 workspace_id=self.workspace_id, 931 manifest=manifest_dict, 932 api_root=self.api_root, 933 client_id=self.client_id, 934 client_secret=self.client_secret, 935 bearer_token=self.bearer_token, 936 ) 937 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 938 self, result 939 ) 940 941 # Set testing values if provided 942 if testing_values is not None: 943 custom_definition.set_testing_values(testing_values) 944 945 return custom_definition 946 947 raise NotImplementedError( 948 "Docker custom source definitions are not yet supported. " 949 "Only YAML manifest-based custom sources are currently available." 950 ) 951 952 def list_custom_source_definitions( 953 self, 954 *, 955 definition_type: Literal["yaml", "docker"], 956 ) -> list[CustomCloudSourceDefinition]: 957 """List custom source connector definitions. 958 959 Args: 960 definition_type: Connector type to list ("yaml" or "docker"). Required. 961 962 Returns: 963 List of CustomCloudSourceDefinition objects matching the specified type 964 """ 965 if definition_type == "yaml": 966 yaml_definitions = api_util.list_custom_yaml_source_definitions( 967 workspace_id=self.workspace_id, 968 api_root=self.api_root, 969 client_id=self.client_id, 970 client_secret=self.client_secret, 971 bearer_token=self.bearer_token, 972 ) 973 return [ 974 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 975 for d in yaml_definitions 976 ] 977 978 raise NotImplementedError( 979 "Docker custom source definitions are not yet supported. " 980 "Only YAML manifest-based custom sources are currently available." 981 ) 982 983 def get_custom_source_definition( 984 self, 985 definition_id: str, 986 *, 987 definition_type: Literal["yaml", "docker"], 988 ) -> CustomCloudSourceDefinition: 989 """Get a specific custom source definition by ID. 990 991 Args: 992 definition_id: The definition ID 993 definition_type: Connector type ("yaml" or "docker"). Required. 994 995 Returns: 996 CustomCloudSourceDefinition object 997 """ 998 if definition_type == "yaml": 999 result = api_util.get_custom_yaml_source_definition( 1000 workspace_id=self.workspace_id, 1001 definition_id=definition_id, 1002 api_root=self.api_root, 1003 client_id=self.client_id, 1004 client_secret=self.client_secret, 1005 bearer_token=self.bearer_token, 1006 ) 1007 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 1008 1009 raise NotImplementedError( 1010 "Docker custom source definitions are not yet supported. " 1011 "Only YAML manifest-based custom sources are currently available." 1012 )
74class CloudOrganization: 75 """Information about an organization in Airbyte Cloud. 76 77 This class provides lazy loading of organization attributes including billing status. 78 It is typically created via CloudWorkspace.get_organization(). 79 """ 80 81 def __init__( 82 self, 83 organization_id: str, 84 organization_name: str | None = None, 85 email: str | None = None, 86 *, 87 api_root: str = api_util.CLOUD_API_ROOT, 88 client_id: SecretString | None = None, 89 client_secret: SecretString | None = None, 90 bearer_token: SecretString | None = None, 91 ) -> None: 92 """Initialize a CloudOrganization. 93 94 Args: 95 organization_id: The organization ID. 96 organization_name: Display name of the organization. 97 email: Email associated with the organization. 98 api_root: The API root URL. 99 client_id: OAuth client ID for authentication. 100 client_secret: OAuth client secret for authentication. 101 bearer_token: Bearer token for authentication (alternative to client credentials). 102 """ 103 self.organization_id = organization_id 104 """The organization ID.""" 105 106 self._organization_name = organization_name 107 """Display name of the organization.""" 108 109 self._email = email 110 """Email associated with the organization.""" 111 112 self._api_root = api_root 113 self._client_id = client_id 114 self._client_secret = client_secret 115 self._bearer_token = bearer_token 116 117 # Cached organization info (billing, etc.) 118 self._organization_info: dict[str, Any] | None = None 119 # Flag to remember if fetching organization info failed (e.g., permission issues) 120 self._organization_info_fetch_failed: bool = False 121 122 def _fetch_organization_info(self, *, force_refresh: bool = False) -> dict[str, Any]: 123 """Fetch and cache organization info including billing status. 124 125 If fetching fails (e.g., due to permission issues), the failure is cached and 126 subsequent calls will return an empty dict without retrying. 127 128 Args: 129 force_refresh: If True, always fetch from the API even if cached. 130 131 Returns: 132 Dictionary containing organization info including billing data. 133 Returns empty dict if fetching failed or is not permitted. 134 """ 135 # Reset failure flag if force_refresh is requested 136 if force_refresh: 137 self._organization_info_fetch_failed = False 138 139 # If we already know fetching failed, return empty dict without retrying 140 if self._organization_info_fetch_failed: 141 return {} 142 143 if not force_refresh and self._organization_info is not None: 144 return self._organization_info 145 146 try: 147 self._organization_info = api_util.get_organization_info( 148 organization_id=self.organization_id, 149 api_root=self._api_root, 150 client_id=self._client_id, 151 client_secret=self._client_secret, 152 bearer_token=self._bearer_token, 153 ) 154 except Exception: 155 # Cache the failure so we don't retry on subsequent property accesses 156 self._organization_info_fetch_failed = True 157 return {} 158 else: 159 return self._organization_info 160 161 @property 162 def organization_name(self) -> str | None: 163 """Display name of the organization.""" 164 if self._organization_name is not None: 165 return self._organization_name 166 # Try to fetch from API if not set (returns empty dict on failure) 167 info = self._fetch_organization_info() 168 return info.get("organizationName") 169 170 @property 171 def email(self) -> str | None: 172 """Email associated with the organization.""" 173 if self._email is not None: 174 return self._email 175 # Try to fetch from API if not set (returns empty dict on failure) 176 info = self._fetch_organization_info() 177 return info.get("email") 178 179 @property 180 def payment_status(self) -> str | None: 181 """Payment status of the organization. 182 183 Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'. 184 When 'disabled', syncs are blocked due to unpaid invoices. 185 Returns None if billing info is not available (e.g., due to permission issues). 186 """ 187 info = self._fetch_organization_info() 188 return (info.get("billing") or {}).get("paymentStatus") 189 190 @property 191 def subscription_status(self) -> str | None: 192 """Subscription status of the organization. 193 194 Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'. 195 Returns None if billing info is not available (e.g., due to permission issues). 196 """ 197 info = self._fetch_organization_info() 198 return (info.get("billing") or {}).get("subscriptionStatus") 199 200 @property 201 def is_account_locked(self) -> bool: 202 """Whether the account is locked due to billing issues. 203 204 Returns True if payment_status is 'disabled'/'locked' or subscription_status is 205 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state. 206 """ 207 return api_util.is_account_locked(self.payment_status, self.subscription_status)
Information about an organization in Airbyte Cloud.
This class provides lazy loading of organization attributes including billing status. It is typically created via CloudWorkspace.get_organization().
81 def __init__( 82 self, 83 organization_id: str, 84 organization_name: str | None = None, 85 email: str | None = None, 86 *, 87 api_root: str = api_util.CLOUD_API_ROOT, 88 client_id: SecretString | None = None, 89 client_secret: SecretString | None = None, 90 bearer_token: SecretString | None = None, 91 ) -> None: 92 """Initialize a CloudOrganization. 93 94 Args: 95 organization_id: The organization ID. 96 organization_name: Display name of the organization. 97 email: Email associated with the organization. 98 api_root: The API root URL. 99 client_id: OAuth client ID for authentication. 100 client_secret: OAuth client secret for authentication. 101 bearer_token: Bearer token for authentication (alternative to client credentials). 102 """ 103 self.organization_id = organization_id 104 """The organization ID.""" 105 106 self._organization_name = organization_name 107 """Display name of the organization.""" 108 109 self._email = email 110 """Email associated with the organization.""" 111 112 self._api_root = api_root 113 self._client_id = client_id 114 self._client_secret = client_secret 115 self._bearer_token = bearer_token 116 117 # Cached organization info (billing, etc.) 118 self._organization_info: dict[str, Any] | None = None 119 # Flag to remember if fetching organization info failed (e.g., permission issues) 120 self._organization_info_fetch_failed: bool = False
Initialize a CloudOrganization.
Arguments:
- organization_id: The organization ID.
- organization_name: Display name of the organization.
- email: Email associated with the organization.
- api_root: The API root URL.
- client_id: OAuth client ID for authentication.
- client_secret: OAuth client secret for authentication.
- bearer_token: Bearer token for authentication (alternative to client credentials).
161 @property 162 def organization_name(self) -> str | None: 163 """Display name of the organization.""" 164 if self._organization_name is not None: 165 return self._organization_name 166 # Try to fetch from API if not set (returns empty dict on failure) 167 info = self._fetch_organization_info() 168 return info.get("organizationName")
Display name of the organization.
170 @property 171 def email(self) -> str | None: 172 """Email associated with the organization.""" 173 if self._email is not None: 174 return self._email 175 # Try to fetch from API if not set (returns empty dict on failure) 176 info = self._fetch_organization_info() 177 return info.get("email")
Email associated with the organization.
179 @property 180 def payment_status(self) -> str | None: 181 """Payment status of the organization. 182 183 Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'. 184 When 'disabled', syncs are blocked due to unpaid invoices. 185 Returns None if billing info is not available (e.g., due to permission issues). 186 """ 187 info = self._fetch_organization_info() 188 return (info.get("billing") or {}).get("paymentStatus")
Payment status of the organization.
Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'. When 'disabled', syncs are blocked due to unpaid invoices. Returns None if billing info is not available (e.g., due to permission issues).
190 @property 191 def subscription_status(self) -> str | None: 192 """Subscription status of the organization. 193 194 Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'. 195 Returns None if billing info is not available (e.g., due to permission issues). 196 """ 197 info = self._fetch_organization_info() 198 return (info.get("billing") or {}).get("subscriptionStatus")
Subscription status of the organization.
Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'. Returns None if billing info is not available (e.g., due to permission issues).
200 @property 201 def is_account_locked(self) -> bool: 202 """Whether the account is locked due to billing issues. 203 204 Returns True if payment_status is 'disabled'/'locked' or subscription_status is 205 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state. 206 """ 207 return api_util.is_account_locked(self.payment_status, self.subscription_status)
Whether the account is locked due to billing issues.
Returns True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.
210@dataclass 211class CloudWorkspace: 212 """A remote workspace on the Airbyte Cloud. 213 214 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 215 instances, both OSS and Enterprise. 216 217 Two authentication methods are supported (mutually exclusive): 218 1. OAuth2 client credentials (client_id + client_secret) 219 2. Bearer token authentication 220 221 Example with client credentials: 222 ```python 223 workspace = CloudWorkspace( 224 workspace_id="...", 225 client_id="...", 226 client_secret="...", 227 ) 228 ``` 229 230 Example with bearer token: 231 ```python 232 workspace = CloudWorkspace( 233 workspace_id="...", 234 bearer_token="...", 235 ) 236 ``` 237 """ 238 239 workspace_id: str 240 client_id: SecretString | None = None 241 client_secret: SecretString | None = None 242 api_root: str = api_util.CLOUD_API_ROOT 243 bearer_token: SecretString | None = None 244 245 # Internal credentials object (set in __post_init__, excluded from __init__) 246 _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) 247 248 def __post_init__(self) -> None: 249 """Validate and initialize credentials.""" 250 # Wrap secrets in SecretString if provided 251 if self.client_id is not None: 252 self.client_id = SecretString(self.client_id) 253 if self.client_secret is not None: 254 self.client_secret = SecretString(self.client_secret) 255 if self.bearer_token is not None: 256 self.bearer_token = SecretString(self.bearer_token) 257 258 # Create internal CloudClientConfig object (validates mutual exclusivity) 259 self._credentials = CloudClientConfig( 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 bearer_token=self.bearer_token, 263 api_root=self.api_root, 264 ) 265 266 @classmethod 267 def from_env( 268 cls, 269 workspace_id: str | None = None, 270 *, 271 api_root: str | None = None, 272 ) -> CloudWorkspace: 273 """Create a CloudWorkspace using credentials from environment variables. 274 275 This factory method resolves credentials from environment variables, 276 providing a convenient way to create a workspace without explicitly 277 passing credentials. 278 279 Two authentication methods are supported (mutually exclusive): 280 1. Bearer token (checked first) 281 2. OAuth2 client credentials (fallback) 282 283 Environment variables used: 284 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 285 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 286 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 287 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 288 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 289 290 Args: 291 workspace_id: The workspace ID. If not provided, will be resolved from 292 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 293 api_root: The API root URL. If not provided, will be resolved from 294 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 295 the Airbyte Cloud API. 296 297 Returns: 298 A CloudWorkspace instance configured with credentials from the environment. 299 300 Raises: 301 PyAirbyteSecretNotFoundError: If required credentials are not found in 302 the environment. 303 304 Example: 305 ```python 306 # With workspace_id from environment 307 workspace = CloudWorkspace.from_env() 308 309 # With explicit workspace_id 310 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 311 ``` 312 """ 313 resolved_api_root = resolve_cloud_api_url(api_root) 314 315 # Try bearer token first 316 bearer_token = resolve_cloud_bearer_token() 317 if bearer_token: 318 return cls( 319 workspace_id=resolve_cloud_workspace_id(workspace_id), 320 bearer_token=bearer_token, 321 api_root=resolved_api_root, 322 ) 323 324 # Fall back to client credentials 325 return cls( 326 workspace_id=resolve_cloud_workspace_id(workspace_id), 327 client_id=resolve_cloud_client_id(), 328 client_secret=resolve_cloud_client_secret(), 329 api_root=resolved_api_root, 330 ) 331 332 @property 333 def workspace_url(self) -> str | None: 334 """The web URL of the workspace.""" 335 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 336 337 @cached_property 338 def _organization_info(self) -> dict[str, Any]: 339 """Fetch and cache organization info for this workspace. 340 341 Uses the Config API endpoint for an efficient O(1) lookup. 342 This is an internal method; use get_organization() for public access. 343 """ 344 return api_util.get_workspace_organization_info( 345 workspace_id=self.workspace_id, 346 api_root=self.api_root, 347 client_id=self.client_id, 348 client_secret=self.client_secret, 349 bearer_token=self.bearer_token, 350 ) 351 352 @overload 353 def get_organization(self) -> CloudOrganization: ... 354 355 @overload 356 def get_organization( 357 self, 358 *, 359 raise_on_error: Literal[True], 360 ) -> CloudOrganization: ... 361 362 @overload 363 def get_organization( 364 self, 365 *, 366 raise_on_error: Literal[False], 367 ) -> CloudOrganization | None: ... 368 369 def get_organization( 370 self, 371 *, 372 raise_on_error: bool = True, 373 ) -> CloudOrganization | None: 374 """Get the organization this workspace belongs to. 375 376 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 377 which may not be available with workspace-scoped credentials. 378 379 Args: 380 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 381 If False, returns None instead of raising. 382 383 Returns: 384 CloudOrganization object with organization_id and organization_name, 385 or None if raise_on_error=False and an error occurred. 386 387 Raises: 388 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 389 (e.g., due to insufficient permissions or missing data). 390 """ 391 try: 392 info = self._organization_info 393 except (AirbyteError, NotImplementedError): 394 if raise_on_error: 395 raise 396 return None 397 398 organization_id = info.get("organizationId") 399 organization_name = info.get("organizationName") 400 401 # Validate that both organization_id and organization_name are non-null and non-empty 402 if not organization_id or not organization_name: 403 if raise_on_error: 404 raise AirbyteError( 405 message="Organization info is incomplete.", 406 context={ 407 "organization_id": organization_id, 408 "organization_name": organization_name, 409 }, 410 ) 411 return None 412 413 return CloudOrganization( 414 organization_id=organization_id, 415 organization_name=organization_name, 416 api_root=self.api_root, 417 client_id=self.client_id, 418 client_secret=self.client_secret, 419 bearer_token=self.bearer_token, 420 ) 421 422 # Test connection and creds 423 424 def connect(self) -> None: 425 """Check that the workspace is reachable and raise an exception otherwise. 426 427 Note: It is not necessary to call this method before calling other operations. It 428 serves primarily as a simple check to ensure that the workspace is reachable 429 and credentials are correct. 430 """ 431 _ = api_util.get_workspace( 432 api_root=self.api_root, 433 workspace_id=self.workspace_id, 434 client_id=self.client_id, 435 client_secret=self.client_secret, 436 bearer_token=self.bearer_token, 437 ) 438 print(f"Successfully connected to workspace: {self.workspace_url}") 439 440 # Get sources, destinations, and connections 441 442 def get_connection( 443 self, 444 connection_id: str, 445 ) -> CloudConnection: 446 """Get a connection by ID. 447 448 This method does not fetch data from the API. It returns a `CloudConnection` object, 449 which will be loaded lazily as needed. 450 """ 451 return CloudConnection( 452 workspace=self, 453 connection_id=connection_id, 454 ) 455 456 def get_source( 457 self, 458 source_id: str, 459 ) -> CloudSource: 460 """Get a source by ID. 461 462 This method does not fetch data from the API. It returns a `CloudSource` object, 463 which will be loaded lazily as needed. 464 """ 465 return CloudSource( 466 workspace=self, 467 connector_id=source_id, 468 ) 469 470 def get_destination( 471 self, 472 destination_id: str, 473 ) -> CloudDestination: 474 """Get a destination by ID. 475 476 This method does not fetch data from the API. It returns a `CloudDestination` object, 477 which will be loaded lazily as needed. 478 """ 479 return CloudDestination( 480 workspace=self, 481 connector_id=destination_id, 482 ) 483 484 # Deploy sources and destinations 485 486 def deploy_source( 487 self, 488 name: str, 489 source: Source, 490 *, 491 unique: bool = True, 492 random_name_suffix: bool = False, 493 ) -> CloudSource: 494 """Deploy a source to the workspace. 495 496 Returns the newly deployed source. 497 498 Args: 499 name: The name to use when deploying. 500 source: The source object to deploy. 501 unique: Whether to require a unique name. If `True`, duplicate names 502 are not allowed. Defaults to `True`. 503 random_name_suffix: Whether to append a random suffix to the name. 504 """ 505 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 506 source_config_dict["sourceType"] = source.name.replace("source-", "") 507 508 if random_name_suffix: 509 name += f" (ID: {text_util.generate_random_suffix()})" 510 511 if unique: 512 existing = self.list_sources(name=name) 513 if existing: 514 raise exc.AirbyteDuplicateResourcesError( 515 resource_type="source", 516 resource_name=name, 517 ) 518 519 deployed_source = api_util.create_source( 520 name=name, 521 api_root=self.api_root, 522 workspace_id=self.workspace_id, 523 config=source_config_dict, 524 client_id=self.client_id, 525 client_secret=self.client_secret, 526 bearer_token=self.bearer_token, 527 ) 528 return CloudSource( 529 workspace=self, 530 connector_id=deployed_source.source_id, 531 ) 532 533 def deploy_destination( 534 self, 535 name: str, 536 destination: Destination | dict[str, Any], 537 *, 538 unique: bool = True, 539 random_name_suffix: bool = False, 540 ) -> CloudDestination: 541 """Deploy a destination to the workspace. 542 543 Returns the newly deployed destination ID. 544 545 Args: 546 name: The name to use when deploying. 547 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 548 dictionary of configuration values. 549 unique: Whether to require a unique name. If `True`, duplicate names 550 are not allowed. Defaults to `True`. 551 random_name_suffix: Whether to append a random suffix to the name. 552 """ 553 if isinstance(destination, Destination): 554 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 555 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 556 # raise ValueError(destination_conf_dict) 557 else: 558 destination_conf_dict = destination.copy() 559 if "destinationType" not in destination_conf_dict: 560 raise exc.PyAirbyteInputError( 561 message="Missing `destinationType` in configuration dictionary.", 562 ) 563 564 if random_name_suffix: 565 name += f" (ID: {text_util.generate_random_suffix()})" 566 567 if unique: 568 existing = self.list_destinations(name=name) 569 if existing: 570 raise exc.AirbyteDuplicateResourcesError( 571 resource_type="destination", 572 resource_name=name, 573 ) 574 575 deployed_destination = api_util.create_destination( 576 name=name, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 config=destination_conf_dict, # Wants a dataclass but accepts dict 580 client_id=self.client_id, 581 client_secret=self.client_secret, 582 bearer_token=self.bearer_token, 583 ) 584 return CloudDestination( 585 workspace=self, 586 connector_id=deployed_destination.destination_id, 587 ) 588 589 def permanently_delete_source( 590 self, 591 source: str | CloudSource, 592 *, 593 safe_mode: bool = True, 594 ) -> None: 595 """Delete a source from the workspace. 596 597 You can pass either the source ID `str` or a deployed `Source` object. 598 599 Args: 600 source: The source ID or CloudSource object to delete 601 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 602 (case insensitive) to prevent accidental deletion. Defaults to True. 603 """ 604 if not isinstance(source, (str, CloudSource)): 605 raise exc.PyAirbyteInputError( 606 message="Invalid source type.", 607 input_value=type(source).__name__, 608 ) 609 610 api_util.delete_source( 611 source_id=source.connector_id if isinstance(source, CloudSource) else source, 612 source_name=source.name if isinstance(source, CloudSource) else None, 613 api_root=self.api_root, 614 client_id=self.client_id, 615 client_secret=self.client_secret, 616 bearer_token=self.bearer_token, 617 safe_mode=safe_mode, 618 ) 619 620 # Deploy and delete destinations 621 622 def permanently_delete_destination( 623 self, 624 destination: str | CloudDestination, 625 *, 626 safe_mode: bool = True, 627 ) -> None: 628 """Delete a deployed destination from the workspace. 629 630 You can pass either the `Cache` class or the deployed destination ID as a `str`. 631 632 Args: 633 destination: The destination ID or CloudDestination object to delete 634 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 635 (case insensitive) to prevent accidental deletion. Defaults to True. 636 """ 637 if not isinstance(destination, (str, CloudDestination)): 638 raise exc.PyAirbyteInputError( 639 message="Invalid destination type.", 640 input_value=type(destination).__name__, 641 ) 642 643 api_util.delete_destination( 644 destination_id=( 645 destination if isinstance(destination, str) else destination.destination_id 646 ), 647 destination_name=( 648 destination.name if isinstance(destination, CloudDestination) else None 649 ), 650 api_root=self.api_root, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 safe_mode=safe_mode, 655 ) 656 657 # Deploy and delete connections 658 659 def deploy_connection( 660 self, 661 connection_name: str, 662 *, 663 source: CloudSource | str, 664 selected_streams: list[str], 665 destination: CloudDestination | str, 666 table_prefix: str | None = None, 667 ) -> CloudConnection: 668 """Create a new connection between an already deployed source and destination. 669 670 Returns the newly deployed connection object. 671 672 Args: 673 connection_name: The name of the connection. 674 source: The deployed source. You can pass a source ID or a CloudSource object. 675 destination: The deployed destination. You can pass a destination ID or a 676 CloudDestination object. 677 table_prefix: Optional. The table prefix to use when syncing to the destination. 678 selected_streams: The selected stream names to sync within the connection. 679 """ 680 if not selected_streams: 681 raise exc.PyAirbyteInputError( 682 guidance="You must provide `selected_streams` when creating a connection." 683 ) 684 685 source_id: str = source if isinstance(source, str) else source.connector_id 686 destination_id: str = ( 687 destination if isinstance(destination, str) else destination.connector_id 688 ) 689 690 deployed_connection = api_util.create_connection( 691 name=connection_name, 692 source_id=source_id, 693 destination_id=destination_id, 694 api_root=self.api_root, 695 workspace_id=self.workspace_id, 696 selected_stream_names=selected_streams, 697 prefix=table_prefix or "", 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 ) 702 703 return CloudConnection( 704 workspace=self, 705 connection_id=deployed_connection.connection_id, 706 source=deployed_connection.source_id, 707 destination=deployed_connection.destination_id, 708 ) 709 710 def permanently_delete_connection( 711 self, 712 connection: str | CloudConnection, 713 *, 714 cascade_delete_source: bool = False, 715 cascade_delete_destination: bool = False, 716 safe_mode: bool = True, 717 ) -> None: 718 """Delete a deployed connection from the workspace. 719 720 Args: 721 connection: The connection ID or CloudConnection object to delete 722 cascade_delete_source: If True, also delete the source after deleting the connection 723 cascade_delete_destination: If True, also delete the destination after deleting 724 the connection 725 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 726 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 727 to cascade deletes. 728 """ 729 if connection is None: 730 raise ValueError("No connection ID provided.") 731 732 if isinstance(connection, str): 733 connection = CloudConnection( 734 workspace=self, 735 connection_id=connection, 736 ) 737 738 api_util.delete_connection( 739 connection_id=connection.connection_id, 740 connection_name=connection.name, 741 api_root=self.api_root, 742 workspace_id=self.workspace_id, 743 client_id=self.client_id, 744 client_secret=self.client_secret, 745 bearer_token=self.bearer_token, 746 safe_mode=safe_mode, 747 ) 748 749 if cascade_delete_source: 750 self.permanently_delete_source( 751 source=connection.source_id, 752 safe_mode=safe_mode, 753 ) 754 if cascade_delete_destination: 755 self.permanently_delete_destination( 756 destination=connection.destination_id, 757 safe_mode=safe_mode, 758 ) 759 760 # List sources, destinations, and connections 761 762 def list_connections( 763 self, 764 name: str | None = None, 765 *, 766 name_filter: Callable | None = None, 767 ) -> list[CloudConnection]: 768 """List connections by name in the workspace. 769 770 TODO: Add pagination support 771 """ 772 connections = api_util.list_connections( 773 api_root=self.api_root, 774 workspace_id=self.workspace_id, 775 name=name, 776 name_filter=name_filter, 777 client_id=self.client_id, 778 client_secret=self.client_secret, 779 bearer_token=self.bearer_token, 780 ) 781 return [ 782 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 783 workspace=self, 784 connection_response=connection, 785 ) 786 for connection in connections 787 if name is None or connection.name == name 788 ] 789 790 def list_sources( 791 self, 792 name: str | None = None, 793 *, 794 name_filter: Callable | None = None, 795 ) -> list[CloudSource]: 796 """List all sources in the workspace. 797 798 TODO: Add pagination support 799 """ 800 sources = api_util.list_sources( 801 api_root=self.api_root, 802 workspace_id=self.workspace_id, 803 name=name, 804 name_filter=name_filter, 805 client_id=self.client_id, 806 client_secret=self.client_secret, 807 bearer_token=self.bearer_token, 808 ) 809 return [ 810 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 811 workspace=self, 812 source_response=source, 813 ) 814 for source in sources 815 if name is None or source.name == name 816 ] 817 818 def list_destinations( 819 self, 820 name: str | None = None, 821 *, 822 name_filter: Callable | None = None, 823 ) -> list[CloudDestination]: 824 """List all destinations in the workspace. 825 826 TODO: Add pagination support 827 """ 828 destinations = api_util.list_destinations( 829 api_root=self.api_root, 830 workspace_id=self.workspace_id, 831 name=name, 832 name_filter=name_filter, 833 client_id=self.client_id, 834 client_secret=self.client_secret, 835 bearer_token=self.bearer_token, 836 ) 837 return [ 838 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 839 workspace=self, 840 destination_response=destination, 841 ) 842 for destination in destinations 843 if name is None or destination.name == name 844 ] 845 846 def publish_custom_source_definition( 847 self, 848 name: str, 849 *, 850 manifest_yaml: dict[str, Any] | Path | str | None = None, 851 docker_image: str | None = None, 852 docker_tag: str | None = None, 853 unique: bool = True, 854 pre_validate: bool = True, 855 testing_values: dict[str, Any] | None = None, 856 ) -> CustomCloudSourceDefinition: 857 """Publish a custom source connector definition. 858 859 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 860 and docker_tag (for Docker connectors), but not both. 861 862 Args: 863 name: Display name for the connector definition 864 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 865 docker_image: Docker repository (e.g., 'airbyte/source-custom') 866 docker_tag: Docker image tag (e.g., '1.0.0') 867 unique: Whether to enforce name uniqueness 868 pre_validate: Whether to validate manifest client-side (YAML only) 869 testing_values: Optional configuration values to use for testing in the 870 Connector Builder UI. If provided, these values are stored as the complete 871 testing values object for the connector builder project (replaces any existing 872 values), allowing immediate test read operations. 873 874 Returns: 875 CustomCloudSourceDefinition object representing the created definition 876 877 Raises: 878 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 879 AirbyteDuplicateResourcesError: If unique=True and name already exists 880 """ 881 is_yaml = manifest_yaml is not None 882 is_docker = docker_image is not None 883 884 if is_yaml == is_docker: 885 raise exc.PyAirbyteInputError( 886 message=( 887 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 888 "docker_image + docker_tag (for Docker connectors), but not both" 889 ), 890 context={ 891 "manifest_yaml_provided": is_yaml, 892 "docker_image_provided": is_docker, 893 }, 894 ) 895 896 if is_docker and docker_tag is None: 897 raise exc.PyAirbyteInputError( 898 message="docker_tag is required when docker_image is specified", 899 context={"docker_image": docker_image}, 900 ) 901 902 if unique: 903 existing = self.list_custom_source_definitions( 904 definition_type="yaml" if is_yaml else "docker", 905 ) 906 if any(d.name == name for d in existing): 907 raise exc.AirbyteDuplicateResourcesError( 908 resource_type="custom_source_definition", 909 resource_name=name, 910 ) 911 912 if is_yaml: 913 manifest_dict: dict[str, Any] 914 if isinstance(manifest_yaml, Path): 915 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 916 elif isinstance(manifest_yaml, str): 917 manifest_dict = yaml.safe_load(manifest_yaml) 918 elif manifest_yaml is not None: 919 manifest_dict = manifest_yaml 920 else: 921 raise exc.PyAirbyteInputError( 922 message="manifest_yaml is required for YAML connectors", 923 context={"name": name}, 924 ) 925 926 if pre_validate: 927 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 928 929 result = api_util.create_custom_yaml_source_definition( 930 name=name, 931 workspace_id=self.workspace_id, 932 manifest=manifest_dict, 933 api_root=self.api_root, 934 client_id=self.client_id, 935 client_secret=self.client_secret, 936 bearer_token=self.bearer_token, 937 ) 938 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 939 self, result 940 ) 941 942 # Set testing values if provided 943 if testing_values is not None: 944 custom_definition.set_testing_values(testing_values) 945 946 return custom_definition 947 948 raise NotImplementedError( 949 "Docker custom source definitions are not yet supported. " 950 "Only YAML manifest-based custom sources are currently available." 951 ) 952 953 def list_custom_source_definitions( 954 self, 955 *, 956 definition_type: Literal["yaml", "docker"], 957 ) -> list[CustomCloudSourceDefinition]: 958 """List custom source connector definitions. 959 960 Args: 961 definition_type: Connector type to list ("yaml" or "docker"). Required. 962 963 Returns: 964 List of CustomCloudSourceDefinition objects matching the specified type 965 """ 966 if definition_type == "yaml": 967 yaml_definitions = api_util.list_custom_yaml_source_definitions( 968 workspace_id=self.workspace_id, 969 api_root=self.api_root, 970 client_id=self.client_id, 971 client_secret=self.client_secret, 972 bearer_token=self.bearer_token, 973 ) 974 return [ 975 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 976 for d in yaml_definitions 977 ] 978 979 raise NotImplementedError( 980 "Docker custom source definitions are not yet supported. " 981 "Only YAML manifest-based custom sources are currently available." 982 ) 983 984 def get_custom_source_definition( 985 self, 986 definition_id: str, 987 *, 988 definition_type: Literal["yaml", "docker"], 989 ) -> CustomCloudSourceDefinition: 990 """Get a specific custom source definition by ID. 991 992 Args: 993 definition_id: The definition ID 994 definition_type: Connector type ("yaml" or "docker"). Required. 995 996 Returns: 997 CustomCloudSourceDefinition object 998 """ 999 if definition_type == "yaml": 1000 result = api_util.get_custom_yaml_source_definition( 1001 workspace_id=self.workspace_id, 1002 definition_id=definition_id, 1003 api_root=self.api_root, 1004 client_id=self.client_id, 1005 client_secret=self.client_secret, 1006 bearer_token=self.bearer_token, 1007 ) 1008 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 1009 1010 raise NotImplementedError( 1011 "Docker custom source definitions are not yet supported. " 1012 "Only YAML manifest-based custom sources are currently available." 1013 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
Two authentication methods are supported (mutually exclusive):
- OAuth2 client credentials (client_id + client_secret)
- Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", )
Example with bearer token:
workspace = CloudWorkspace( workspace_id="...", bearer_token="...", )
266 @classmethod 267 def from_env( 268 cls, 269 workspace_id: str | None = None, 270 *, 271 api_root: str | None = None, 272 ) -> CloudWorkspace: 273 """Create a CloudWorkspace using credentials from environment variables. 274 275 This factory method resolves credentials from environment variables, 276 providing a convenient way to create a workspace without explicitly 277 passing credentials. 278 279 Two authentication methods are supported (mutually exclusive): 280 1. Bearer token (checked first) 281 2. OAuth2 client credentials (fallback) 282 283 Environment variables used: 284 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 285 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 286 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 287 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 288 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 289 290 Args: 291 workspace_id: The workspace ID. If not provided, will be resolved from 292 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 293 api_root: The API root URL. If not provided, will be resolved from 294 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 295 the Airbyte Cloud API. 296 297 Returns: 298 A CloudWorkspace instance configured with credentials from the environment. 299 300 Raises: 301 PyAirbyteSecretNotFoundError: If required credentials are not found in 302 the environment. 303 304 Example: 305 ```python 306 # With workspace_id from environment 307 workspace = CloudWorkspace.from_env() 308 309 # With explicit workspace_id 310 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 311 ``` 312 """ 313 resolved_api_root = resolve_cloud_api_url(api_root) 314 315 # Try bearer token first 316 bearer_token = resolve_cloud_bearer_token() 317 if bearer_token: 318 return cls( 319 workspace_id=resolve_cloud_workspace_id(workspace_id), 320 bearer_token=bearer_token, 321 api_root=resolved_api_root, 322 ) 323 324 # Fall back to client credentials 325 return cls( 326 workspace_id=resolve_cloud_workspace_id(workspace_id), 327 client_id=resolve_cloud_client_id(), 328 client_secret=resolve_cloud_client_secret(), 329 api_root=resolved_api_root, 330 )
Create a CloudWorkspace using credentials from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.
Two authentication methods are supported (mutually exclusive):
- Bearer token (checked first)
- OAuth2 client credentials (fallback)
Environment variables used:
AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
- workspace_id: The workspace ID. If not provided, will be resolved from
the
AIRBYTE_CLOUD_WORKSPACE_IDenvironment variable. - api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API.
Returns:
A CloudWorkspace instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
332 @property 333 def workspace_url(self) -> str | None: 334 """The web URL of the workspace.""" 335 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
369 def get_organization( 370 self, 371 *, 372 raise_on_error: bool = True, 373 ) -> CloudOrganization | None: 374 """Get the organization this workspace belongs to. 375 376 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 377 which may not be available with workspace-scoped credentials. 378 379 Args: 380 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 381 If False, returns None instead of raising. 382 383 Returns: 384 CloudOrganization object with organization_id and organization_name, 385 or None if raise_on_error=False and an error occurred. 386 387 Raises: 388 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 389 (e.g., due to insufficient permissions or missing data). 390 """ 391 try: 392 info = self._organization_info 393 except (AirbyteError, NotImplementedError): 394 if raise_on_error: 395 raise 396 return None 397 398 organization_id = info.get("organizationId") 399 organization_name = info.get("organizationName") 400 401 # Validate that both organization_id and organization_name are non-null and non-empty 402 if not organization_id or not organization_name: 403 if raise_on_error: 404 raise AirbyteError( 405 message="Organization info is incomplete.", 406 context={ 407 "organization_id": organization_id, 408 "organization_name": organization_name, 409 }, 410 ) 411 return None 412 413 return CloudOrganization( 414 organization_id=organization_id, 415 organization_name=organization_name, 416 api_root=self.api_root, 417 client_id=self.client_id, 418 client_secret=self.client_secret, 419 bearer_token=self.bearer_token, 420 )
Get the organization this workspace belongs to.
Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.
Arguments:
- raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:
CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.
Raises:
- AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
424 def connect(self) -> None: 425 """Check that the workspace is reachable and raise an exception otherwise. 426 427 Note: It is not necessary to call this method before calling other operations. It 428 serves primarily as a simple check to ensure that the workspace is reachable 429 and credentials are correct. 430 """ 431 _ = api_util.get_workspace( 432 api_root=self.api_root, 433 workspace_id=self.workspace_id, 434 client_id=self.client_id, 435 client_secret=self.client_secret, 436 bearer_token=self.bearer_token, 437 ) 438 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
442 def get_connection( 443 self, 444 connection_id: str, 445 ) -> CloudConnection: 446 """Get a connection by ID. 447 448 This method does not fetch data from the API. It returns a `CloudConnection` object, 449 which will be loaded lazily as needed. 450 """ 451 return CloudConnection( 452 workspace=self, 453 connection_id=connection_id, 454 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
456 def get_source( 457 self, 458 source_id: str, 459 ) -> CloudSource: 460 """Get a source by ID. 461 462 This method does not fetch data from the API. It returns a `CloudSource` object, 463 which will be loaded lazily as needed. 464 """ 465 return CloudSource( 466 workspace=self, 467 connector_id=source_id, 468 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
470 def get_destination( 471 self, 472 destination_id: str, 473 ) -> CloudDestination: 474 """Get a destination by ID. 475 476 This method does not fetch data from the API. It returns a `CloudDestination` object, 477 which will be loaded lazily as needed. 478 """ 479 return CloudDestination( 480 workspace=self, 481 connector_id=destination_id, 482 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
486 def deploy_source( 487 self, 488 name: str, 489 source: Source, 490 *, 491 unique: bool = True, 492 random_name_suffix: bool = False, 493 ) -> CloudSource: 494 """Deploy a source to the workspace. 495 496 Returns the newly deployed source. 497 498 Args: 499 name: The name to use when deploying. 500 source: The source object to deploy. 501 unique: Whether to require a unique name. If `True`, duplicate names 502 are not allowed. Defaults to `True`. 503 random_name_suffix: Whether to append a random suffix to the name. 504 """ 505 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 506 source_config_dict["sourceType"] = source.name.replace("source-", "") 507 508 if random_name_suffix: 509 name += f" (ID: {text_util.generate_random_suffix()})" 510 511 if unique: 512 existing = self.list_sources(name=name) 513 if existing: 514 raise exc.AirbyteDuplicateResourcesError( 515 resource_type="source", 516 resource_name=name, 517 ) 518 519 deployed_source = api_util.create_source( 520 name=name, 521 api_root=self.api_root, 522 workspace_id=self.workspace_id, 523 config=source_config_dict, 524 client_id=self.client_id, 525 client_secret=self.client_secret, 526 bearer_token=self.bearer_token, 527 ) 528 return CloudSource( 529 workspace=self, 530 connector_id=deployed_source.source_id, 531 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
533 def deploy_destination( 534 self, 535 name: str, 536 destination: Destination | dict[str, Any], 537 *, 538 unique: bool = True, 539 random_name_suffix: bool = False, 540 ) -> CloudDestination: 541 """Deploy a destination to the workspace. 542 543 Returns the newly deployed destination ID. 544 545 Args: 546 name: The name to use when deploying. 547 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 548 dictionary of configuration values. 549 unique: Whether to require a unique name. If `True`, duplicate names 550 are not allowed. Defaults to `True`. 551 random_name_suffix: Whether to append a random suffix to the name. 552 """ 553 if isinstance(destination, Destination): 554 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 555 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 556 # raise ValueError(destination_conf_dict) 557 else: 558 destination_conf_dict = destination.copy() 559 if "destinationType" not in destination_conf_dict: 560 raise exc.PyAirbyteInputError( 561 message="Missing `destinationType` in configuration dictionary.", 562 ) 563 564 if random_name_suffix: 565 name += f" (ID: {text_util.generate_random_suffix()})" 566 567 if unique: 568 existing = self.list_destinations(name=name) 569 if existing: 570 raise exc.AirbyteDuplicateResourcesError( 571 resource_type="destination", 572 resource_name=name, 573 ) 574 575 deployed_destination = api_util.create_destination( 576 name=name, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 config=destination_conf_dict, # Wants a dataclass but accepts dict 580 client_id=self.client_id, 581 client_secret=self.client_secret, 582 bearer_token=self.bearer_token, 583 ) 584 return CloudDestination( 585 workspace=self, 586 connector_id=deployed_destination.destination_id, 587 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
589 def permanently_delete_source( 590 self, 591 source: str | CloudSource, 592 *, 593 safe_mode: bool = True, 594 ) -> None: 595 """Delete a source from the workspace. 596 597 You can pass either the source ID `str` or a deployed `Source` object. 598 599 Args: 600 source: The source ID or CloudSource object to delete 601 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 602 (case insensitive) to prevent accidental deletion. Defaults to True. 603 """ 604 if not isinstance(source, (str, CloudSource)): 605 raise exc.PyAirbyteInputError( 606 message="Invalid source type.", 607 input_value=type(source).__name__, 608 ) 609 610 api_util.delete_source( 611 source_id=source.connector_id if isinstance(source, CloudSource) else source, 612 source_name=source.name if isinstance(source, CloudSource) else None, 613 api_root=self.api_root, 614 client_id=self.client_id, 615 client_secret=self.client_secret, 616 bearer_token=self.bearer_token, 617 safe_mode=safe_mode, 618 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
622 def permanently_delete_destination( 623 self, 624 destination: str | CloudDestination, 625 *, 626 safe_mode: bool = True, 627 ) -> None: 628 """Delete a deployed destination from the workspace. 629 630 You can pass either the `Cache` class or the deployed destination ID as a `str`. 631 632 Args: 633 destination: The destination ID or CloudDestination object to delete 634 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 635 (case insensitive) to prevent accidental deletion. Defaults to True. 636 """ 637 if not isinstance(destination, (str, CloudDestination)): 638 raise exc.PyAirbyteInputError( 639 message="Invalid destination type.", 640 input_value=type(destination).__name__, 641 ) 642 643 api_util.delete_destination( 644 destination_id=( 645 destination if isinstance(destination, str) else destination.destination_id 646 ), 647 destination_name=( 648 destination.name if isinstance(destination, CloudDestination) else None 649 ), 650 api_root=self.api_root, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 safe_mode=safe_mode, 655 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
659 def deploy_connection( 660 self, 661 connection_name: str, 662 *, 663 source: CloudSource | str, 664 selected_streams: list[str], 665 destination: CloudDestination | str, 666 table_prefix: str | None = None, 667 ) -> CloudConnection: 668 """Create a new connection between an already deployed source and destination. 669 670 Returns the newly deployed connection object. 671 672 Args: 673 connection_name: The name of the connection. 674 source: The deployed source. You can pass a source ID or a CloudSource object. 675 destination: The deployed destination. You can pass a destination ID or a 676 CloudDestination object. 677 table_prefix: Optional. The table prefix to use when syncing to the destination. 678 selected_streams: The selected stream names to sync within the connection. 679 """ 680 if not selected_streams: 681 raise exc.PyAirbyteInputError( 682 guidance="You must provide `selected_streams` when creating a connection." 683 ) 684 685 source_id: str = source if isinstance(source, str) else source.connector_id 686 destination_id: str = ( 687 destination if isinstance(destination, str) else destination.connector_id 688 ) 689 690 deployed_connection = api_util.create_connection( 691 name=connection_name, 692 source_id=source_id, 693 destination_id=destination_id, 694 api_root=self.api_root, 695 workspace_id=self.workspace_id, 696 selected_stream_names=selected_streams, 697 prefix=table_prefix or "", 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 ) 702 703 return CloudConnection( 704 workspace=self, 705 connection_id=deployed_connection.connection_id, 706 source=deployed_connection.source_id, 707 destination=deployed_connection.destination_id, 708 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
710 def permanently_delete_connection( 711 self, 712 connection: str | CloudConnection, 713 *, 714 cascade_delete_source: bool = False, 715 cascade_delete_destination: bool = False, 716 safe_mode: bool = True, 717 ) -> None: 718 """Delete a deployed connection from the workspace. 719 720 Args: 721 connection: The connection ID or CloudConnection object to delete 722 cascade_delete_source: If True, also delete the source after deleting the connection 723 cascade_delete_destination: If True, also delete the destination after deleting 724 the connection 725 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 726 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 727 to cascade deletes. 728 """ 729 if connection is None: 730 raise ValueError("No connection ID provided.") 731 732 if isinstance(connection, str): 733 connection = CloudConnection( 734 workspace=self, 735 connection_id=connection, 736 ) 737 738 api_util.delete_connection( 739 connection_id=connection.connection_id, 740 connection_name=connection.name, 741 api_root=self.api_root, 742 workspace_id=self.workspace_id, 743 client_id=self.client_id, 744 client_secret=self.client_secret, 745 bearer_token=self.bearer_token, 746 safe_mode=safe_mode, 747 ) 748 749 if cascade_delete_source: 750 self.permanently_delete_source( 751 source=connection.source_id, 752 safe_mode=safe_mode, 753 ) 754 if cascade_delete_destination: 755 self.permanently_delete_destination( 756 destination=connection.destination_id, 757 safe_mode=safe_mode, 758 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
762 def list_connections( 763 self, 764 name: str | None = None, 765 *, 766 name_filter: Callable | None = None, 767 ) -> list[CloudConnection]: 768 """List connections by name in the workspace. 769 770 TODO: Add pagination support 771 """ 772 connections = api_util.list_connections( 773 api_root=self.api_root, 774 workspace_id=self.workspace_id, 775 name=name, 776 name_filter=name_filter, 777 client_id=self.client_id, 778 client_secret=self.client_secret, 779 bearer_token=self.bearer_token, 780 ) 781 return [ 782 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 783 workspace=self, 784 connection_response=connection, 785 ) 786 for connection in connections 787 if name is None or connection.name == name 788 ]
List connections by name in the workspace.
TODO: Add pagination support
790 def list_sources( 791 self, 792 name: str | None = None, 793 *, 794 name_filter: Callable | None = None, 795 ) -> list[CloudSource]: 796 """List all sources in the workspace. 797 798 TODO: Add pagination support 799 """ 800 sources = api_util.list_sources( 801 api_root=self.api_root, 802 workspace_id=self.workspace_id, 803 name=name, 804 name_filter=name_filter, 805 client_id=self.client_id, 806 client_secret=self.client_secret, 807 bearer_token=self.bearer_token, 808 ) 809 return [ 810 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 811 workspace=self, 812 source_response=source, 813 ) 814 for source in sources 815 if name is None or source.name == name 816 ]
List all sources in the workspace.
TODO: Add pagination support
818 def list_destinations( 819 self, 820 name: str | None = None, 821 *, 822 name_filter: Callable | None = None, 823 ) -> list[CloudDestination]: 824 """List all destinations in the workspace. 825 826 TODO: Add pagination support 827 """ 828 destinations = api_util.list_destinations( 829 api_root=self.api_root, 830 workspace_id=self.workspace_id, 831 name=name, 832 name_filter=name_filter, 833 client_id=self.client_id, 834 client_secret=self.client_secret, 835 bearer_token=self.bearer_token, 836 ) 837 return [ 838 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 839 workspace=self, 840 destination_response=destination, 841 ) 842 for destination in destinations 843 if name is None or destination.name == name 844 ]
List all destinations in the workspace.
TODO: Add pagination support
846 def publish_custom_source_definition( 847 self, 848 name: str, 849 *, 850 manifest_yaml: dict[str, Any] | Path | str | None = None, 851 docker_image: str | None = None, 852 docker_tag: str | None = None, 853 unique: bool = True, 854 pre_validate: bool = True, 855 testing_values: dict[str, Any] | None = None, 856 ) -> CustomCloudSourceDefinition: 857 """Publish a custom source connector definition. 858 859 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 860 and docker_tag (for Docker connectors), but not both. 861 862 Args: 863 name: Display name for the connector definition 864 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 865 docker_image: Docker repository (e.g., 'airbyte/source-custom') 866 docker_tag: Docker image tag (e.g., '1.0.0') 867 unique: Whether to enforce name uniqueness 868 pre_validate: Whether to validate manifest client-side (YAML only) 869 testing_values: Optional configuration values to use for testing in the 870 Connector Builder UI. If provided, these values are stored as the complete 871 testing values object for the connector builder project (replaces any existing 872 values), allowing immediate test read operations. 873 874 Returns: 875 CustomCloudSourceDefinition object representing the created definition 876 877 Raises: 878 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 879 AirbyteDuplicateResourcesError: If unique=True and name already exists 880 """ 881 is_yaml = manifest_yaml is not None 882 is_docker = docker_image is not None 883 884 if is_yaml == is_docker: 885 raise exc.PyAirbyteInputError( 886 message=( 887 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 888 "docker_image + docker_tag (for Docker connectors), but not both" 889 ), 890 context={ 891 "manifest_yaml_provided": is_yaml, 892 "docker_image_provided": is_docker, 893 }, 894 ) 895 896 if is_docker and docker_tag is None: 897 raise exc.PyAirbyteInputError( 898 message="docker_tag is required when docker_image is specified", 899 context={"docker_image": docker_image}, 900 ) 901 902 if unique: 903 existing = self.list_custom_source_definitions( 904 definition_type="yaml" if is_yaml else "docker", 905 ) 906 if any(d.name == name for d in existing): 907 raise exc.AirbyteDuplicateResourcesError( 908 resource_type="custom_source_definition", 909 resource_name=name, 910 ) 911 912 if is_yaml: 913 manifest_dict: dict[str, Any] 914 if isinstance(manifest_yaml, Path): 915 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 916 elif isinstance(manifest_yaml, str): 917 manifest_dict = yaml.safe_load(manifest_yaml) 918 elif manifest_yaml is not None: 919 manifest_dict = manifest_yaml 920 else: 921 raise exc.PyAirbyteInputError( 922 message="manifest_yaml is required for YAML connectors", 923 context={"name": name}, 924 ) 925 926 if pre_validate: 927 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 928 929 result = api_util.create_custom_yaml_source_definition( 930 name=name, 931 workspace_id=self.workspace_id, 932 manifest=manifest_dict, 933 api_root=self.api_root, 934 client_id=self.client_id, 935 client_secret=self.client_secret, 936 bearer_token=self.bearer_token, 937 ) 938 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 939 self, result 940 ) 941 942 # Set testing values if provided 943 if testing_values is not None: 944 custom_definition.set_testing_values(testing_values) 945 946 return custom_definition 947 948 raise NotImplementedError( 949 "Docker custom source definitions are not yet supported. " 950 "Only YAML manifest-based custom sources are currently available." 951 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
953 def list_custom_source_definitions( 954 self, 955 *, 956 definition_type: Literal["yaml", "docker"], 957 ) -> list[CustomCloudSourceDefinition]: 958 """List custom source connector definitions. 959 960 Args: 961 definition_type: Connector type to list ("yaml" or "docker"). Required. 962 963 Returns: 964 List of CustomCloudSourceDefinition objects matching the specified type 965 """ 966 if definition_type == "yaml": 967 yaml_definitions = api_util.list_custom_yaml_source_definitions( 968 workspace_id=self.workspace_id, 969 api_root=self.api_root, 970 client_id=self.client_id, 971 client_secret=self.client_secret, 972 bearer_token=self.bearer_token, 973 ) 974 return [ 975 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 976 for d in yaml_definitions 977 ] 978 979 raise NotImplementedError( 980 "Docker custom source definitions are not yet supported. " 981 "Only YAML manifest-based custom sources are currently available." 982 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
984 def get_custom_source_definition( 985 self, 986 definition_id: str, 987 *, 988 definition_type: Literal["yaml", "docker"], 989 ) -> CustomCloudSourceDefinition: 990 """Get a specific custom source definition by ID. 991 992 Args: 993 definition_id: The definition ID 994 definition_type: Connector type ("yaml" or "docker"). Required. 995 996 Returns: 997 CustomCloudSourceDefinition object 998 """ 999 if definition_type == "yaml": 1000 result = api_util.get_custom_yaml_source_definition( 1001 workspace_id=self.workspace_id, 1002 definition_id=definition_id, 1003 api_root=self.api_root, 1004 client_id=self.client_id, 1005 client_secret=self.client_secret, 1006 bearer_token=self.bearer_token, 1007 ) 1008 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 1009 1010 raise NotImplementedError( 1011 "Docker custom source definitions are not yet supported. " 1012 "Only YAML manifest-based custom sources are currently available." 1013 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object