airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
name="test-source",
source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass, field 39from functools import cached_property 40from pathlib import Path 41from typing import TYPE_CHECKING, Any, Literal, overload 42 43import yaml 44 45from airbyte import exceptions as exc 46from airbyte._util import api_util, text_util 47from airbyte._util.api_util import get_web_url_root 48from airbyte.cloud._credentials import _AirbyteCredentials 49from airbyte.cloud.client_config import CloudClientConfig 50from airbyte.cloud.connections import CloudConnection 51from airbyte.cloud.connectors import ( 52 CloudDestination, 53 CloudSource, 54 CustomCloudSourceDefinition, 55) 56from airbyte.cloud.models import CloudWorkspaceInfo 57from airbyte.cloud.organizations import CloudOrganization 58from airbyte.destinations.base import Destination 59from airbyte.exceptions import AirbyteError 60 61 62if TYPE_CHECKING: 63 from collections.abc import Callable 64 65 from airbyte.secrets.base import SecretString 66 from airbyte.sources.base import Source 67 68 69@dataclass(init=False, kw_only=True) # noqa: PLR0904 # Core cloud API facade. 70class CloudWorkspace: 71 """A remote workspace on the Airbyte Cloud. 72 73 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 74 instances, both OSS and Enterprise. 75 76 Two authentication methods are supported (mutually exclusive): 77 1. OAuth2 client credentials (client_id + client_secret) 78 2. Bearer token authentication 79 80 Example with client credentials: 81 ```python 82 workspace = CloudWorkspace( 83 workspace_id="...", 84 client_id="...", 85 client_secret="...", 86 ) 87 ``` 88 89 Example with bearer token: 90 ```python 91 workspace = CloudWorkspace( 92 workspace_id="...", 93 bearer_token="...", 94 ) 95 ``` 96 """ 97 98 workspace_id: str 99 client_id: SecretString | None 100 client_secret: SecretString | None 101 api_root: str 102 config_api_root: str | None 103 """The Config API root URL.""" 104 bearer_token: SecretString | None 105 106 # Internal credentials objects (set in __init__, excluded from repr) 107 _credentials: _AirbyteCredentials = field(init=False, repr=False) 108 _client_config: CloudClientConfig = field(init=False, repr=False) 109 110 def __init__( 111 self, 112 *, 113 workspace_id: str | None = None, 114 client_id: str | SecretString | None = None, 115 client_secret: str | SecretString | None = None, 116 api_root: str | None = None, 117 config_api_root: str | None = None, 118 bearer_token: str | SecretString | None = None, 119 ) -> None: 120 """Validate and initialize credentials.""" 121 env_vars = not (client_id or client_secret or bearer_token) 122 credentials = _AirbyteCredentials.from_auth( 123 workspace_id=workspace_id, 124 client_id=client_id, 125 client_secret=client_secret, 126 bearer_token=bearer_token, 127 public_api_root=api_root, 128 config_api_root=config_api_root, 129 env_vars=env_vars, 130 ) 131 if not credentials.workspace_id: 132 raise exc.PyAirbyteInputError( 133 message="Workspace ID is required.", 134 guidance="Provide a workspace ID.", 135 ) 136 137 self._credentials = credentials 138 self.workspace_id = credentials.workspace_id or "" 139 self.client_id = credentials.client_id 140 self.client_secret = credentials.client_secret 141 self.bearer_token = credentials.bearer_token 142 self.api_root = credentials.public_api_root 143 self.config_api_root = credentials.config_api_root 144 145 # Create internal CloudClientConfig object (validates mutual exclusivity) 146 self._client_config = CloudClientConfig( 147 client_id=self.client_id, 148 client_secret=self.client_secret, 149 bearer_token=self.bearer_token, 150 api_root=self.api_root, 151 config_api_root=self.config_api_root, 152 ) 153 154 @classmethod 155 def from_env( 156 cls, 157 workspace_id: str | None = None, 158 *, 159 api_root: str | None = None, 160 config_api_root: str | None = None, 161 ) -> CloudWorkspace: 162 """Create a CloudWorkspace using credentials from environment variables. 163 164 This factory method resolves credentials from environment variables, 165 providing a convenient way to create a workspace without explicitly 166 passing credentials. 167 168 Two authentication methods are supported (mutually exclusive): 169 1. Bearer token (checked first) 170 2. OAuth2 client credentials (fallback) 171 172 Environment variables used: 173 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 174 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 175 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 176 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 177 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 178 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 179 180 Args: 181 workspace_id: The workspace ID. If not provided, will be resolved from 182 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 183 api_root: The API root URL. If not provided, will be resolved from 184 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 185 the Airbyte Cloud API. 186 config_api_root: The Config API root URL. If not provided, will be resolved 187 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 188 189 Returns: 190 A CloudWorkspace instance configured with credentials from the environment. 191 192 Raises: 193 PyAirbyteInputError: If required credentials are not found in 194 the environment or are incomplete. 195 196 Example: 197 ```python 198 # With workspace_id from environment 199 workspace = CloudWorkspace.from_env() 200 201 # With explicit workspace_id 202 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 203 ``` 204 """ 205 return cls( 206 workspace_id=workspace_id, 207 api_root=api_root, 208 config_api_root=config_api_root, 209 ) 210 211 @property 212 def workspace_url(self) -> str | None: 213 """The web URL of the workspace.""" 214 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 215 216 @cached_property 217 def _organization_info(self) -> dict[str, Any]: 218 """Fetch and cache organization info for this workspace. 219 220 Uses the Config API endpoint for an efficient O(1) lookup. 221 This is an internal method; use get_organization() for public access. 222 """ 223 return api_util.get_workspace_organization_info( 224 workspace_id=self.workspace_id, 225 api_root=self.api_root, 226 config_api_root=self.config_api_root, 227 client_id=self.client_id, 228 client_secret=self.client_secret, 229 bearer_token=self.bearer_token, 230 ) 231 232 @overload 233 def get_organization(self) -> CloudOrganization: ... 234 235 @overload 236 def get_organization( 237 self, 238 *, 239 raise_on_error: Literal[True], 240 ) -> CloudOrganization: ... 241 242 @overload 243 def get_organization( 244 self, 245 *, 246 raise_on_error: Literal[False], 247 ) -> CloudOrganization | None: ... 248 249 def get_organization( 250 self, 251 *, 252 raise_on_error: bool = True, 253 ) -> CloudOrganization | None: 254 """Get the organization this workspace belongs to. 255 256 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 257 which may not be available with workspace-scoped credentials. 258 259 Args: 260 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 261 If False, returns None instead of raising. 262 263 Returns: 264 CloudOrganization object with organization_id and organization_name, 265 or None if raise_on_error=False and an error occurred. 266 267 Raises: 268 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 269 (e.g., due to insufficient permissions or missing data). 270 """ 271 try: 272 info = self._organization_info 273 except (AirbyteError, NotImplementedError): 274 if raise_on_error: 275 raise 276 return None 277 278 organization_id = info.get("organizationId") 279 organization_name = info.get("organizationName") 280 281 # Validate that both organization_id and organization_name are non-null and non-empty 282 if not organization_id or not organization_name: 283 if raise_on_error: 284 raise AirbyteError( 285 message="Organization info is incomplete.", 286 context={ 287 "organization_id": organization_id, 288 "organization_name": organization_name, 289 }, 290 ) 291 return None 292 293 organization_credentials = self._credentials.with_organization_id(organization_id) 294 return CloudOrganization( 295 organization_id=organization_id, 296 organization_name=organization_name, 297 client_id=organization_credentials.client_id, 298 client_secret=organization_credentials.client_secret, 299 bearer_token=organization_credentials.bearer_token, 300 public_api_root=organization_credentials.public_api_root, 301 config_api_root=organization_credentials.config_api_root, 302 ) 303 304 # Test connection and creds 305 306 def connect(self) -> None: 307 """Check that the workspace is reachable and raise an exception otherwise. 308 309 Note: It is not necessary to call this method before calling other operations. It 310 serves primarily as a simple check to ensure that the workspace is reachable 311 and credentials are correct. 312 """ 313 _ = api_util.get_workspace( 314 api_root=self.api_root, 315 workspace_id=self.workspace_id, 316 client_id=self.client_id, 317 client_secret=self.client_secret, 318 bearer_token=self.bearer_token, 319 ) 320 print(f"Successfully connected to workspace: {self.workspace_url}") 321 322 # Get sources, destinations, and connections 323 324 def get_connection( 325 self, 326 connection_id: str, 327 ) -> CloudConnection: 328 """Get a connection by ID. 329 330 This method does not fetch data from the API. It returns a `CloudConnection` object, 331 which will be loaded lazily as needed. 332 """ 333 return CloudConnection( 334 workspace=self, 335 connection_id=connection_id, 336 ) 337 338 def get_source( 339 self, 340 source_id: str, 341 ) -> CloudSource: 342 """Get a source by ID. 343 344 This method does not fetch data from the API. It returns a `CloudSource` object, 345 which will be loaded lazily as needed. 346 """ 347 return CloudSource( 348 workspace=self, 349 connector_id=source_id, 350 ) 351 352 def get_destination( 353 self, 354 destination_id: str, 355 ) -> CloudDestination: 356 """Get a destination by ID. 357 358 This method does not fetch data from the API. It returns a `CloudDestination` object, 359 which will be loaded lazily as needed. 360 """ 361 return CloudDestination( 362 workspace=self, 363 connector_id=destination_id, 364 ) 365 366 # Deploy sources and destinations 367 368 def deploy_source( 369 self, 370 name: str, 371 source: Source, 372 *, 373 unique: bool = True, 374 random_name_suffix: bool = False, 375 ) -> CloudSource: 376 """Deploy a source to the workspace. 377 378 Returns the newly deployed source. 379 380 Args: 381 name: The name to use when deploying. 382 source: The source object to deploy. 383 unique: Whether to require a unique name. If `True`, duplicate names 384 are not allowed. Defaults to `True`. 385 random_name_suffix: Whether to append a random suffix to the name. 386 """ 387 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 388 source_config_dict["sourceType"] = source.name.replace("source-", "") 389 390 if random_name_suffix: 391 name += f" (ID: {text_util.generate_random_suffix()})" 392 393 if unique: 394 existing = self.list_sources(name=name) 395 if existing: 396 raise exc.AirbyteDuplicateResourcesError( 397 resource_type="source", 398 resource_name=name, 399 ) 400 401 deployed_source = api_util.create_source( 402 name=name, 403 api_root=self.api_root, 404 workspace_id=self.workspace_id, 405 config=source_config_dict, 406 client_id=self.client_id, 407 client_secret=self.client_secret, 408 bearer_token=self.bearer_token, 409 ) 410 return CloudSource( 411 workspace=self, 412 connector_id=deployed_source.source_id, 413 ) 414 415 def deploy_destination( 416 self, 417 name: str, 418 destination: Destination | dict[str, Any], 419 *, 420 unique: bool = True, 421 random_name_suffix: bool = False, 422 ) -> CloudDestination: 423 """Deploy a destination to the workspace. 424 425 Returns the newly deployed destination ID. 426 427 Args: 428 name: The name to use when deploying. 429 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 430 dictionary of configuration values. 431 unique: Whether to require a unique name. If `True`, duplicate names 432 are not allowed. Defaults to `True`. 433 random_name_suffix: Whether to append a random suffix to the name. 434 """ 435 if isinstance(destination, Destination): 436 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 437 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 438 # raise ValueError(destination_conf_dict) 439 else: 440 destination_conf_dict = destination.copy() 441 if "destinationType" not in destination_conf_dict: 442 raise exc.PyAirbyteInputError( 443 message="Missing `destinationType` in configuration dictionary.", 444 ) 445 446 if random_name_suffix: 447 name += f" (ID: {text_util.generate_random_suffix()})" 448 449 if unique: 450 existing = self.list_destinations(name=name) 451 if existing: 452 raise exc.AirbyteDuplicateResourcesError( 453 resource_type="destination", 454 resource_name=name, 455 ) 456 457 deployed_destination = api_util.create_destination( 458 name=name, 459 api_root=self.api_root, 460 workspace_id=self.workspace_id, 461 config=destination_conf_dict, # Wants a dataclass but accepts dict 462 client_id=self.client_id, 463 client_secret=self.client_secret, 464 bearer_token=self.bearer_token, 465 ) 466 return CloudDestination( 467 workspace=self, 468 connector_id=deployed_destination.destination_id, 469 ) 470 471 def permanently_delete_source( 472 self, 473 source: str | CloudSource, 474 *, 475 safe_mode: bool = True, 476 ) -> None: 477 """Delete a source from the workspace. 478 479 You can pass either the source ID `str` or a deployed `Source` object. 480 481 Args: 482 source: The source ID or CloudSource object to delete 483 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 484 (case insensitive) to prevent accidental deletion. Defaults to True. 485 """ 486 if not isinstance(source, (str, CloudSource)): 487 raise exc.PyAirbyteInputError( 488 message="Invalid source type.", 489 input_value=type(source).__name__, 490 ) 491 492 api_util.delete_source( 493 source_id=source.connector_id if isinstance(source, CloudSource) else source, 494 source_name=source.name if isinstance(source, CloudSource) else None, 495 api_root=self.api_root, 496 client_id=self.client_id, 497 client_secret=self.client_secret, 498 bearer_token=self.bearer_token, 499 safe_mode=safe_mode, 500 ) 501 502 # Deploy and delete destinations 503 504 def permanently_delete_destination( 505 self, 506 destination: str | CloudDestination, 507 *, 508 safe_mode: bool = True, 509 ) -> None: 510 """Delete a deployed destination from the workspace. 511 512 You can pass either the `Cache` class or the deployed destination ID as a `str`. 513 514 Args: 515 destination: The destination ID or CloudDestination object to delete 516 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 517 (case insensitive) to prevent accidental deletion. Defaults to True. 518 """ 519 if not isinstance(destination, (str, CloudDestination)): 520 raise exc.PyAirbyteInputError( 521 message="Invalid destination type.", 522 input_value=type(destination).__name__, 523 ) 524 525 api_util.delete_destination( 526 destination_id=( 527 destination if isinstance(destination, str) else destination.destination_id 528 ), 529 destination_name=( 530 destination.name if isinstance(destination, CloudDestination) else None 531 ), 532 api_root=self.api_root, 533 client_id=self.client_id, 534 client_secret=self.client_secret, 535 bearer_token=self.bearer_token, 536 safe_mode=safe_mode, 537 ) 538 539 # Deploy and delete connections 540 541 def deploy_connection( 542 self, 543 connection_name: str, 544 *, 545 source: CloudSource | str, 546 selected_streams: list[str], 547 destination: CloudDestination | str, 548 table_prefix: str | None = None, 549 ) -> CloudConnection: 550 """Create a new connection between an already deployed source and destination. 551 552 Returns the newly deployed connection object. 553 554 Args: 555 connection_name: The name of the connection. 556 source: The deployed source. You can pass a source ID or a CloudSource object. 557 destination: The deployed destination. You can pass a destination ID or a 558 CloudDestination object. 559 table_prefix: Optional. The table prefix to use when syncing to the destination. 560 selected_streams: The selected stream names to sync within the connection. 561 """ 562 if not selected_streams: 563 raise exc.PyAirbyteInputError( 564 guidance="You must provide `selected_streams` when creating a connection." 565 ) 566 567 source_id: str = source if isinstance(source, str) else source.connector_id 568 destination_id: str = ( 569 destination if isinstance(destination, str) else destination.connector_id 570 ) 571 572 deployed_connection = api_util.create_connection( 573 name=connection_name, 574 source_id=source_id, 575 destination_id=destination_id, 576 api_root=self.api_root, 577 workspace_id=self.workspace_id, 578 selected_stream_names=selected_streams, 579 prefix=table_prefix or "", 580 client_id=self.client_id, 581 client_secret=self.client_secret, 582 bearer_token=self.bearer_token, 583 ) 584 585 return CloudConnection( 586 workspace=self, 587 connection_id=deployed_connection.connection_id, 588 source=deployed_connection.source_id, 589 destination=deployed_connection.destination_id, 590 ) 591 592 def permanently_delete_connection( 593 self, 594 connection: str | CloudConnection, 595 *, 596 cascade_delete_source: bool = False, 597 cascade_delete_destination: bool = False, 598 safe_mode: bool = True, 599 ) -> None: 600 """Delete a deployed connection from the workspace. 601 602 Args: 603 connection: The connection ID or CloudConnection object to delete 604 cascade_delete_source: If True, also delete the source after deleting the connection 605 cascade_delete_destination: If True, also delete the destination after deleting 606 the connection 607 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 608 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 609 to cascade deletes. 610 """ 611 if connection is None: 612 raise ValueError("No connection ID provided.") 613 614 if isinstance(connection, str): 615 connection = CloudConnection( 616 workspace=self, 617 connection_id=connection, 618 ) 619 620 api_util.delete_connection( 621 connection_id=connection.connection_id, 622 connection_name=connection.name, 623 api_root=self.api_root, 624 workspace_id=self.workspace_id, 625 client_id=self.client_id, 626 client_secret=self.client_secret, 627 bearer_token=self.bearer_token, 628 safe_mode=safe_mode, 629 ) 630 631 if cascade_delete_source: 632 self.permanently_delete_source( 633 source=connection.source_id, 634 safe_mode=safe_mode, 635 ) 636 if cascade_delete_destination: 637 self.permanently_delete_destination( 638 destination=connection.destination_id, 639 safe_mode=safe_mode, 640 ) 641 642 # List workspaces, sources, destinations, and connections 643 644 def list_workspaces( 645 self, 646 name: str | None = None, 647 *, 648 name_filter: Callable | None = None, 649 limit: int | None = None, 650 ) -> list[CloudWorkspaceInfo]: 651 """List workspaces available to the current credentials, with an optional limit.""" 652 return [ 653 CloudWorkspaceInfo.from_api_response(workspace) 654 for workspace in api_util.list_workspaces( 655 workspace_id="", 656 api_root=self.api_root, 657 name=name, 658 name_filter=name_filter, 659 client_id=self.client_id, 660 client_secret=self.client_secret, 661 bearer_token=self.bearer_token, 662 limit=limit, 663 ) 664 ] 665 666 def rename( 667 self, 668 name: str, 669 ) -> CloudWorkspace: 670 """Rename this workspace.""" 671 api_util.rename_workspace( 672 workspace_id=self.workspace_id, 673 name=name, 674 api_root=self.api_root, 675 client_id=self.client_id, 676 client_secret=self.client_secret, 677 bearer_token=self.bearer_token, 678 ) 679 return self 680 681 def permanently_delete( 682 self, 683 *, 684 workspace_name: str | None = None, 685 safe_mode: bool = True, 686 ) -> None: 687 """Permanently delete this workspace if it has no connections. 688 689 When `safe_mode` is enabled, the workspace name must contain `delete-me` 690 or `deleteme`. This also checks for existing connections before deleting 691 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 692 """ 693 api_util.permanently_delete_workspace( 694 workspace_id=self.workspace_id, 695 workspace_name=workspace_name, 696 api_root=self.api_root, 697 client_id=self.client_id, 698 client_secret=self.client_secret, 699 bearer_token=self.bearer_token, 700 safe_mode=safe_mode, 701 ) 702 703 def list_connections( 704 self, 705 name: str | None = None, 706 *, 707 name_filter: Callable | None = None, 708 limit: int | None = None, 709 ) -> list[CloudConnection]: 710 """List connections by name in the workspace, with an optional limit.""" 711 connections = api_util.list_connections( 712 api_root=self.api_root, 713 workspace_id=self.workspace_id, 714 name=name, 715 name_filter=name_filter, 716 limit=limit, 717 client_id=self.client_id, 718 client_secret=self.client_secret, 719 bearer_token=self.bearer_token, 720 ) 721 return [ 722 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 723 workspace=self, 724 connection_response=connection, 725 ) 726 for connection in connections 727 ] 728 729 def list_sources( 730 self, 731 name: str | None = None, 732 *, 733 name_filter: Callable | None = None, 734 limit: int | None = None, 735 ) -> list[CloudSource]: 736 """List all sources in the workspace, with an optional limit.""" 737 sources = api_util.list_sources( 738 api_root=self.api_root, 739 workspace_id=self.workspace_id, 740 name=name, 741 name_filter=name_filter, 742 limit=limit, 743 client_id=self.client_id, 744 client_secret=self.client_secret, 745 bearer_token=self.bearer_token, 746 ) 747 return [ 748 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 749 workspace=self, 750 source_response=source, 751 ) 752 for source in sources 753 ] 754 755 def list_destinations( 756 self, 757 name: str | None = None, 758 *, 759 name_filter: Callable | None = None, 760 limit: int | None = None, 761 ) -> list[CloudDestination]: 762 """List all destinations in the workspace, with an optional limit.""" 763 destinations = api_util.list_destinations( 764 api_root=self.api_root, 765 workspace_id=self.workspace_id, 766 name=name, 767 name_filter=name_filter, 768 limit=limit, 769 client_id=self.client_id, 770 client_secret=self.client_secret, 771 bearer_token=self.bearer_token, 772 ) 773 return [ 774 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 775 workspace=self, 776 destination_response=destination, 777 ) 778 for destination in destinations 779 ] 780 781 def publish_custom_source_definition( 782 self, 783 name: str, 784 *, 785 manifest_yaml: dict[str, Any] | Path | str | None = None, 786 docker_image: str | None = None, 787 docker_tag: str | None = None, 788 unique: bool = True, 789 pre_validate: bool = True, 790 testing_values: dict[str, Any] | None = None, 791 ) -> CustomCloudSourceDefinition: 792 """Publish a custom source connector definition. 793 794 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 795 and docker_tag (for Docker connectors), but not both. 796 797 Args: 798 name: Display name for the connector definition 799 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 800 docker_image: Docker repository (e.g., 'airbyte/source-custom') 801 docker_tag: Docker image tag (e.g., '1.0.0') 802 unique: Whether to enforce name uniqueness 803 pre_validate: Whether to validate manifest client-side (YAML only) 804 testing_values: Optional configuration values to use for testing in the 805 Connector Builder UI. If provided, these values are stored as the complete 806 testing values object for the connector builder project (replaces any existing 807 values), allowing immediate test read operations. 808 809 Returns: 810 CustomCloudSourceDefinition object representing the created definition 811 812 Raises: 813 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 814 AirbyteDuplicateResourcesError: If unique=True and name already exists 815 """ 816 is_yaml = manifest_yaml is not None 817 is_docker = docker_image is not None 818 819 if is_yaml == is_docker: 820 raise exc.PyAirbyteInputError( 821 message=( 822 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 823 "docker_image + docker_tag (for Docker connectors), but not both" 824 ), 825 context={ 826 "manifest_yaml_provided": is_yaml, 827 "docker_image_provided": is_docker, 828 }, 829 ) 830 831 if is_docker and docker_tag is None: 832 raise exc.PyAirbyteInputError( 833 message="docker_tag is required when docker_image is specified", 834 context={"docker_image": docker_image}, 835 ) 836 837 if unique: 838 existing = self.list_custom_source_definitions( 839 definition_type="yaml" if is_yaml else "docker", 840 ) 841 if any(d.name == name for d in existing): 842 raise exc.AirbyteDuplicateResourcesError( 843 resource_type="custom_source_definition", 844 resource_name=name, 845 ) 846 847 if is_yaml: 848 manifest_dict: dict[str, Any] 849 if isinstance(manifest_yaml, Path): 850 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 851 elif isinstance(manifest_yaml, str): 852 manifest_dict = yaml.safe_load(manifest_yaml) 853 elif manifest_yaml is not None: 854 manifest_dict = manifest_yaml 855 else: 856 raise exc.PyAirbyteInputError( 857 message="manifest_yaml is required for YAML connectors", 858 context={"name": name}, 859 ) 860 861 if pre_validate: 862 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 863 864 result = api_util.create_custom_yaml_source_definition( 865 name=name, 866 workspace_id=self.workspace_id, 867 manifest=manifest_dict, 868 api_root=self.api_root, 869 client_id=self.client_id, 870 client_secret=self.client_secret, 871 bearer_token=self.bearer_token, 872 ) 873 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 874 self, result 875 ) 876 877 # Set testing values if provided 878 if testing_values is not None: 879 custom_definition.set_testing_values(testing_values) 880 881 return custom_definition 882 883 raise NotImplementedError( 884 "Docker custom source definitions are not yet supported. " 885 "Only YAML manifest-based custom sources are currently available." 886 ) 887 888 def list_custom_source_definitions( 889 self, 890 *, 891 definition_type: Literal["yaml", "docker"], 892 ) -> list[CustomCloudSourceDefinition]: 893 """List custom source connector definitions. 894 895 Args: 896 definition_type: Connector type to list ("yaml" or "docker"). Required. 897 898 Returns: 899 List of CustomCloudSourceDefinition objects matching the specified type 900 """ 901 if definition_type == "yaml": 902 yaml_definitions = api_util.list_custom_yaml_source_definitions( 903 workspace_id=self.workspace_id, 904 api_root=self.api_root, 905 client_id=self.client_id, 906 client_secret=self.client_secret, 907 bearer_token=self.bearer_token, 908 ) 909 return [ 910 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 911 for d in yaml_definitions 912 ] 913 914 raise NotImplementedError( 915 "Docker custom source definitions are not yet supported. " 916 "Only YAML manifest-based custom sources are currently available." 917 ) 918 919 def get_custom_source_definition( 920 self, 921 definition_id: str, 922 *, 923 definition_type: Literal["yaml", "docker"], 924 ) -> CustomCloudSourceDefinition: 925 """Get a specific custom source definition by ID. 926 927 Args: 928 definition_id: The definition ID 929 definition_type: Connector type ("yaml" or "docker"). Required. 930 931 Returns: 932 CustomCloudSourceDefinition object 933 """ 934 if definition_type == "yaml": 935 result = api_util.get_custom_yaml_source_definition( 936 workspace_id=self.workspace_id, 937 definition_id=definition_id, 938 api_root=self.api_root, 939 client_id=self.client_id, 940 client_secret=self.client_secret, 941 bearer_token=self.bearer_token, 942 ) 943 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 944 945 raise NotImplementedError( 946 "Docker custom source definitions are not yet supported. " 947 "Only YAML manifest-based custom sources are currently available." 948 )
70@dataclass(init=False, kw_only=True) # noqa: PLR0904 # Core cloud API facade. 71class CloudWorkspace: 72 """A remote workspace on the Airbyte Cloud. 73 74 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 75 instances, both OSS and Enterprise. 76 77 Two authentication methods are supported (mutually exclusive): 78 1. OAuth2 client credentials (client_id + client_secret) 79 2. Bearer token authentication 80 81 Example with client credentials: 82 ```python 83 workspace = CloudWorkspace( 84 workspace_id="...", 85 client_id="...", 86 client_secret="...", 87 ) 88 ``` 89 90 Example with bearer token: 91 ```python 92 workspace = CloudWorkspace( 93 workspace_id="...", 94 bearer_token="...", 95 ) 96 ``` 97 """ 98 99 workspace_id: str 100 client_id: SecretString | None 101 client_secret: SecretString | None 102 api_root: str 103 config_api_root: str | None 104 """The Config API root URL.""" 105 bearer_token: SecretString | None 106 107 # Internal credentials objects (set in __init__, excluded from repr) 108 _credentials: _AirbyteCredentials = field(init=False, repr=False) 109 _client_config: CloudClientConfig = field(init=False, repr=False) 110 111 def __init__( 112 self, 113 *, 114 workspace_id: str | None = None, 115 client_id: str | SecretString | None = None, 116 client_secret: str | SecretString | None = None, 117 api_root: str | None = None, 118 config_api_root: str | None = None, 119 bearer_token: str | SecretString | None = None, 120 ) -> None: 121 """Validate and initialize credentials.""" 122 env_vars = not (client_id or client_secret or bearer_token) 123 credentials = _AirbyteCredentials.from_auth( 124 workspace_id=workspace_id, 125 client_id=client_id, 126 client_secret=client_secret, 127 bearer_token=bearer_token, 128 public_api_root=api_root, 129 config_api_root=config_api_root, 130 env_vars=env_vars, 131 ) 132 if not credentials.workspace_id: 133 raise exc.PyAirbyteInputError( 134 message="Workspace ID is required.", 135 guidance="Provide a workspace ID.", 136 ) 137 138 self._credentials = credentials 139 self.workspace_id = credentials.workspace_id or "" 140 self.client_id = credentials.client_id 141 self.client_secret = credentials.client_secret 142 self.bearer_token = credentials.bearer_token 143 self.api_root = credentials.public_api_root 144 self.config_api_root = credentials.config_api_root 145 146 # Create internal CloudClientConfig object (validates mutual exclusivity) 147 self._client_config = CloudClientConfig( 148 client_id=self.client_id, 149 client_secret=self.client_secret, 150 bearer_token=self.bearer_token, 151 api_root=self.api_root, 152 config_api_root=self.config_api_root, 153 ) 154 155 @classmethod 156 def from_env( 157 cls, 158 workspace_id: str | None = None, 159 *, 160 api_root: str | None = None, 161 config_api_root: str | None = None, 162 ) -> CloudWorkspace: 163 """Create a CloudWorkspace using credentials from environment variables. 164 165 This factory method resolves credentials from environment variables, 166 providing a convenient way to create a workspace without explicitly 167 passing credentials. 168 169 Two authentication methods are supported (mutually exclusive): 170 1. Bearer token (checked first) 171 2. OAuth2 client credentials (fallback) 172 173 Environment variables used: 174 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 175 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 176 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 177 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 178 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 179 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 180 181 Args: 182 workspace_id: The workspace ID. If not provided, will be resolved from 183 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 184 api_root: The API root URL. If not provided, will be resolved from 185 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 186 the Airbyte Cloud API. 187 config_api_root: The Config API root URL. If not provided, will be resolved 188 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 189 190 Returns: 191 A CloudWorkspace instance configured with credentials from the environment. 192 193 Raises: 194 PyAirbyteInputError: If required credentials are not found in 195 the environment or are incomplete. 196 197 Example: 198 ```python 199 # With workspace_id from environment 200 workspace = CloudWorkspace.from_env() 201 202 # With explicit workspace_id 203 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 204 ``` 205 """ 206 return cls( 207 workspace_id=workspace_id, 208 api_root=api_root, 209 config_api_root=config_api_root, 210 ) 211 212 @property 213 def workspace_url(self) -> str | None: 214 """The web URL of the workspace.""" 215 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 216 217 @cached_property 218 def _organization_info(self) -> dict[str, Any]: 219 """Fetch and cache organization info for this workspace. 220 221 Uses the Config API endpoint for an efficient O(1) lookup. 222 This is an internal method; use get_organization() for public access. 223 """ 224 return api_util.get_workspace_organization_info( 225 workspace_id=self.workspace_id, 226 api_root=self.api_root, 227 config_api_root=self.config_api_root, 228 client_id=self.client_id, 229 client_secret=self.client_secret, 230 bearer_token=self.bearer_token, 231 ) 232 233 @overload 234 def get_organization(self) -> CloudOrganization: ... 235 236 @overload 237 def get_organization( 238 self, 239 *, 240 raise_on_error: Literal[True], 241 ) -> CloudOrganization: ... 242 243 @overload 244 def get_organization( 245 self, 246 *, 247 raise_on_error: Literal[False], 248 ) -> CloudOrganization | None: ... 249 250 def get_organization( 251 self, 252 *, 253 raise_on_error: bool = True, 254 ) -> CloudOrganization | None: 255 """Get the organization this workspace belongs to. 256 257 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 258 which may not be available with workspace-scoped credentials. 259 260 Args: 261 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 262 If False, returns None instead of raising. 263 264 Returns: 265 CloudOrganization object with organization_id and organization_name, 266 or None if raise_on_error=False and an error occurred. 267 268 Raises: 269 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 270 (e.g., due to insufficient permissions or missing data). 271 """ 272 try: 273 info = self._organization_info 274 except (AirbyteError, NotImplementedError): 275 if raise_on_error: 276 raise 277 return None 278 279 organization_id = info.get("organizationId") 280 organization_name = info.get("organizationName") 281 282 # Validate that both organization_id and organization_name are non-null and non-empty 283 if not organization_id or not organization_name: 284 if raise_on_error: 285 raise AirbyteError( 286 message="Organization info is incomplete.", 287 context={ 288 "organization_id": organization_id, 289 "organization_name": organization_name, 290 }, 291 ) 292 return None 293 294 organization_credentials = self._credentials.with_organization_id(organization_id) 295 return CloudOrganization( 296 organization_id=organization_id, 297 organization_name=organization_name, 298 client_id=organization_credentials.client_id, 299 client_secret=organization_credentials.client_secret, 300 bearer_token=organization_credentials.bearer_token, 301 public_api_root=organization_credentials.public_api_root, 302 config_api_root=organization_credentials.config_api_root, 303 ) 304 305 # Test connection and creds 306 307 def connect(self) -> None: 308 """Check that the workspace is reachable and raise an exception otherwise. 309 310 Note: It is not necessary to call this method before calling other operations. It 311 serves primarily as a simple check to ensure that the workspace is reachable 312 and credentials are correct. 313 """ 314 _ = api_util.get_workspace( 315 api_root=self.api_root, 316 workspace_id=self.workspace_id, 317 client_id=self.client_id, 318 client_secret=self.client_secret, 319 bearer_token=self.bearer_token, 320 ) 321 print(f"Successfully connected to workspace: {self.workspace_url}") 322 323 # Get sources, destinations, and connections 324 325 def get_connection( 326 self, 327 connection_id: str, 328 ) -> CloudConnection: 329 """Get a connection by ID. 330 331 This method does not fetch data from the API. It returns a `CloudConnection` object, 332 which will be loaded lazily as needed. 333 """ 334 return CloudConnection( 335 workspace=self, 336 connection_id=connection_id, 337 ) 338 339 def get_source( 340 self, 341 source_id: str, 342 ) -> CloudSource: 343 """Get a source by ID. 344 345 This method does not fetch data from the API. It returns a `CloudSource` object, 346 which will be loaded lazily as needed. 347 """ 348 return CloudSource( 349 workspace=self, 350 connector_id=source_id, 351 ) 352 353 def get_destination( 354 self, 355 destination_id: str, 356 ) -> CloudDestination: 357 """Get a destination by ID. 358 359 This method does not fetch data from the API. It returns a `CloudDestination` object, 360 which will be loaded lazily as needed. 361 """ 362 return CloudDestination( 363 workspace=self, 364 connector_id=destination_id, 365 ) 366 367 # Deploy sources and destinations 368 369 def deploy_source( 370 self, 371 name: str, 372 source: Source, 373 *, 374 unique: bool = True, 375 random_name_suffix: bool = False, 376 ) -> CloudSource: 377 """Deploy a source to the workspace. 378 379 Returns the newly deployed source. 380 381 Args: 382 name: The name to use when deploying. 383 source: The source object to deploy. 384 unique: Whether to require a unique name. If `True`, duplicate names 385 are not allowed. Defaults to `True`. 386 random_name_suffix: Whether to append a random suffix to the name. 387 """ 388 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 389 source_config_dict["sourceType"] = source.name.replace("source-", "") 390 391 if random_name_suffix: 392 name += f" (ID: {text_util.generate_random_suffix()})" 393 394 if unique: 395 existing = self.list_sources(name=name) 396 if existing: 397 raise exc.AirbyteDuplicateResourcesError( 398 resource_type="source", 399 resource_name=name, 400 ) 401 402 deployed_source = api_util.create_source( 403 name=name, 404 api_root=self.api_root, 405 workspace_id=self.workspace_id, 406 config=source_config_dict, 407 client_id=self.client_id, 408 client_secret=self.client_secret, 409 bearer_token=self.bearer_token, 410 ) 411 return CloudSource( 412 workspace=self, 413 connector_id=deployed_source.source_id, 414 ) 415 416 def deploy_destination( 417 self, 418 name: str, 419 destination: Destination | dict[str, Any], 420 *, 421 unique: bool = True, 422 random_name_suffix: bool = False, 423 ) -> CloudDestination: 424 """Deploy a destination to the workspace. 425 426 Returns the newly deployed destination ID. 427 428 Args: 429 name: The name to use when deploying. 430 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 431 dictionary of configuration values. 432 unique: Whether to require a unique name. If `True`, duplicate names 433 are not allowed. Defaults to `True`. 434 random_name_suffix: Whether to append a random suffix to the name. 435 """ 436 if isinstance(destination, Destination): 437 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 438 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 439 # raise ValueError(destination_conf_dict) 440 else: 441 destination_conf_dict = destination.copy() 442 if "destinationType" not in destination_conf_dict: 443 raise exc.PyAirbyteInputError( 444 message="Missing `destinationType` in configuration dictionary.", 445 ) 446 447 if random_name_suffix: 448 name += f" (ID: {text_util.generate_random_suffix()})" 449 450 if unique: 451 existing = self.list_destinations(name=name) 452 if existing: 453 raise exc.AirbyteDuplicateResourcesError( 454 resource_type="destination", 455 resource_name=name, 456 ) 457 458 deployed_destination = api_util.create_destination( 459 name=name, 460 api_root=self.api_root, 461 workspace_id=self.workspace_id, 462 config=destination_conf_dict, # Wants a dataclass but accepts dict 463 client_id=self.client_id, 464 client_secret=self.client_secret, 465 bearer_token=self.bearer_token, 466 ) 467 return CloudDestination( 468 workspace=self, 469 connector_id=deployed_destination.destination_id, 470 ) 471 472 def permanently_delete_source( 473 self, 474 source: str | CloudSource, 475 *, 476 safe_mode: bool = True, 477 ) -> None: 478 """Delete a source from the workspace. 479 480 You can pass either the source ID `str` or a deployed `Source` object. 481 482 Args: 483 source: The source ID or CloudSource object to delete 484 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 485 (case insensitive) to prevent accidental deletion. Defaults to True. 486 """ 487 if not isinstance(source, (str, CloudSource)): 488 raise exc.PyAirbyteInputError( 489 message="Invalid source type.", 490 input_value=type(source).__name__, 491 ) 492 493 api_util.delete_source( 494 source_id=source.connector_id if isinstance(source, CloudSource) else source, 495 source_name=source.name if isinstance(source, CloudSource) else None, 496 api_root=self.api_root, 497 client_id=self.client_id, 498 client_secret=self.client_secret, 499 bearer_token=self.bearer_token, 500 safe_mode=safe_mode, 501 ) 502 503 # Deploy and delete destinations 504 505 def permanently_delete_destination( 506 self, 507 destination: str | CloudDestination, 508 *, 509 safe_mode: bool = True, 510 ) -> None: 511 """Delete a deployed destination from the workspace. 512 513 You can pass either the `Cache` class or the deployed destination ID as a `str`. 514 515 Args: 516 destination: The destination ID or CloudDestination object to delete 517 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 518 (case insensitive) to prevent accidental deletion. Defaults to True. 519 """ 520 if not isinstance(destination, (str, CloudDestination)): 521 raise exc.PyAirbyteInputError( 522 message="Invalid destination type.", 523 input_value=type(destination).__name__, 524 ) 525 526 api_util.delete_destination( 527 destination_id=( 528 destination if isinstance(destination, str) else destination.destination_id 529 ), 530 destination_name=( 531 destination.name if isinstance(destination, CloudDestination) else None 532 ), 533 api_root=self.api_root, 534 client_id=self.client_id, 535 client_secret=self.client_secret, 536 bearer_token=self.bearer_token, 537 safe_mode=safe_mode, 538 ) 539 540 # Deploy and delete connections 541 542 def deploy_connection( 543 self, 544 connection_name: str, 545 *, 546 source: CloudSource | str, 547 selected_streams: list[str], 548 destination: CloudDestination | str, 549 table_prefix: str | None = None, 550 ) -> CloudConnection: 551 """Create a new connection between an already deployed source and destination. 552 553 Returns the newly deployed connection object. 554 555 Args: 556 connection_name: The name of the connection. 557 source: The deployed source. You can pass a source ID or a CloudSource object. 558 destination: The deployed destination. You can pass a destination ID or a 559 CloudDestination object. 560 table_prefix: Optional. The table prefix to use when syncing to the destination. 561 selected_streams: The selected stream names to sync within the connection. 562 """ 563 if not selected_streams: 564 raise exc.PyAirbyteInputError( 565 guidance="You must provide `selected_streams` when creating a connection." 566 ) 567 568 source_id: str = source if isinstance(source, str) else source.connector_id 569 destination_id: str = ( 570 destination if isinstance(destination, str) else destination.connector_id 571 ) 572 573 deployed_connection = api_util.create_connection( 574 name=connection_name, 575 source_id=source_id, 576 destination_id=destination_id, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 selected_stream_names=selected_streams, 580 prefix=table_prefix or "", 581 client_id=self.client_id, 582 client_secret=self.client_secret, 583 bearer_token=self.bearer_token, 584 ) 585 586 return CloudConnection( 587 workspace=self, 588 connection_id=deployed_connection.connection_id, 589 source=deployed_connection.source_id, 590 destination=deployed_connection.destination_id, 591 ) 592 593 def permanently_delete_connection( 594 self, 595 connection: str | CloudConnection, 596 *, 597 cascade_delete_source: bool = False, 598 cascade_delete_destination: bool = False, 599 safe_mode: bool = True, 600 ) -> None: 601 """Delete a deployed connection from the workspace. 602 603 Args: 604 connection: The connection ID or CloudConnection object to delete 605 cascade_delete_source: If True, also delete the source after deleting the connection 606 cascade_delete_destination: If True, also delete the destination after deleting 607 the connection 608 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 609 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 610 to cascade deletes. 611 """ 612 if connection is None: 613 raise ValueError("No connection ID provided.") 614 615 if isinstance(connection, str): 616 connection = CloudConnection( 617 workspace=self, 618 connection_id=connection, 619 ) 620 621 api_util.delete_connection( 622 connection_id=connection.connection_id, 623 connection_name=connection.name, 624 api_root=self.api_root, 625 workspace_id=self.workspace_id, 626 client_id=self.client_id, 627 client_secret=self.client_secret, 628 bearer_token=self.bearer_token, 629 safe_mode=safe_mode, 630 ) 631 632 if cascade_delete_source: 633 self.permanently_delete_source( 634 source=connection.source_id, 635 safe_mode=safe_mode, 636 ) 637 if cascade_delete_destination: 638 self.permanently_delete_destination( 639 destination=connection.destination_id, 640 safe_mode=safe_mode, 641 ) 642 643 # List workspaces, sources, destinations, and connections 644 645 def list_workspaces( 646 self, 647 name: str | None = None, 648 *, 649 name_filter: Callable | None = None, 650 limit: int | None = None, 651 ) -> list[CloudWorkspaceInfo]: 652 """List workspaces available to the current credentials, with an optional limit.""" 653 return [ 654 CloudWorkspaceInfo.from_api_response(workspace) 655 for workspace in api_util.list_workspaces( 656 workspace_id="", 657 api_root=self.api_root, 658 name=name, 659 name_filter=name_filter, 660 client_id=self.client_id, 661 client_secret=self.client_secret, 662 bearer_token=self.bearer_token, 663 limit=limit, 664 ) 665 ] 666 667 def rename( 668 self, 669 name: str, 670 ) -> CloudWorkspace: 671 """Rename this workspace.""" 672 api_util.rename_workspace( 673 workspace_id=self.workspace_id, 674 name=name, 675 api_root=self.api_root, 676 client_id=self.client_id, 677 client_secret=self.client_secret, 678 bearer_token=self.bearer_token, 679 ) 680 return self 681 682 def permanently_delete( 683 self, 684 *, 685 workspace_name: str | None = None, 686 safe_mode: bool = True, 687 ) -> None: 688 """Permanently delete this workspace if it has no connections. 689 690 When `safe_mode` is enabled, the workspace name must contain `delete-me` 691 or `deleteme`. This also checks for existing connections before deleting 692 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 693 """ 694 api_util.permanently_delete_workspace( 695 workspace_id=self.workspace_id, 696 workspace_name=workspace_name, 697 api_root=self.api_root, 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 safe_mode=safe_mode, 702 ) 703 704 def list_connections( 705 self, 706 name: str | None = None, 707 *, 708 name_filter: Callable | None = None, 709 limit: int | None = None, 710 ) -> list[CloudConnection]: 711 """List connections by name in the workspace, with an optional limit.""" 712 connections = api_util.list_connections( 713 api_root=self.api_root, 714 workspace_id=self.workspace_id, 715 name=name, 716 name_filter=name_filter, 717 limit=limit, 718 client_id=self.client_id, 719 client_secret=self.client_secret, 720 bearer_token=self.bearer_token, 721 ) 722 return [ 723 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 724 workspace=self, 725 connection_response=connection, 726 ) 727 for connection in connections 728 ] 729 730 def list_sources( 731 self, 732 name: str | None = None, 733 *, 734 name_filter: Callable | None = None, 735 limit: int | None = None, 736 ) -> list[CloudSource]: 737 """List all sources in the workspace, with an optional limit.""" 738 sources = api_util.list_sources( 739 api_root=self.api_root, 740 workspace_id=self.workspace_id, 741 name=name, 742 name_filter=name_filter, 743 limit=limit, 744 client_id=self.client_id, 745 client_secret=self.client_secret, 746 bearer_token=self.bearer_token, 747 ) 748 return [ 749 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 750 workspace=self, 751 source_response=source, 752 ) 753 for source in sources 754 ] 755 756 def list_destinations( 757 self, 758 name: str | None = None, 759 *, 760 name_filter: Callable | None = None, 761 limit: int | None = None, 762 ) -> list[CloudDestination]: 763 """List all destinations in the workspace, with an optional limit.""" 764 destinations = api_util.list_destinations( 765 api_root=self.api_root, 766 workspace_id=self.workspace_id, 767 name=name, 768 name_filter=name_filter, 769 limit=limit, 770 client_id=self.client_id, 771 client_secret=self.client_secret, 772 bearer_token=self.bearer_token, 773 ) 774 return [ 775 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 776 workspace=self, 777 destination_response=destination, 778 ) 779 for destination in destinations 780 ] 781 782 def publish_custom_source_definition( 783 self, 784 name: str, 785 *, 786 manifest_yaml: dict[str, Any] | Path | str | None = None, 787 docker_image: str | None = None, 788 docker_tag: str | None = None, 789 unique: bool = True, 790 pre_validate: bool = True, 791 testing_values: dict[str, Any] | None = None, 792 ) -> CustomCloudSourceDefinition: 793 """Publish a custom source connector definition. 794 795 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 796 and docker_tag (for Docker connectors), but not both. 797 798 Args: 799 name: Display name for the connector definition 800 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 801 docker_image: Docker repository (e.g., 'airbyte/source-custom') 802 docker_tag: Docker image tag (e.g., '1.0.0') 803 unique: Whether to enforce name uniqueness 804 pre_validate: Whether to validate manifest client-side (YAML only) 805 testing_values: Optional configuration values to use for testing in the 806 Connector Builder UI. If provided, these values are stored as the complete 807 testing values object for the connector builder project (replaces any existing 808 values), allowing immediate test read operations. 809 810 Returns: 811 CustomCloudSourceDefinition object representing the created definition 812 813 Raises: 814 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 815 AirbyteDuplicateResourcesError: If unique=True and name already exists 816 """ 817 is_yaml = manifest_yaml is not None 818 is_docker = docker_image is not None 819 820 if is_yaml == is_docker: 821 raise exc.PyAirbyteInputError( 822 message=( 823 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 824 "docker_image + docker_tag (for Docker connectors), but not both" 825 ), 826 context={ 827 "manifest_yaml_provided": is_yaml, 828 "docker_image_provided": is_docker, 829 }, 830 ) 831 832 if is_docker and docker_tag is None: 833 raise exc.PyAirbyteInputError( 834 message="docker_tag is required when docker_image is specified", 835 context={"docker_image": docker_image}, 836 ) 837 838 if unique: 839 existing = self.list_custom_source_definitions( 840 definition_type="yaml" if is_yaml else "docker", 841 ) 842 if any(d.name == name for d in existing): 843 raise exc.AirbyteDuplicateResourcesError( 844 resource_type="custom_source_definition", 845 resource_name=name, 846 ) 847 848 if is_yaml: 849 manifest_dict: dict[str, Any] 850 if isinstance(manifest_yaml, Path): 851 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 852 elif isinstance(manifest_yaml, str): 853 manifest_dict = yaml.safe_load(manifest_yaml) 854 elif manifest_yaml is not None: 855 manifest_dict = manifest_yaml 856 else: 857 raise exc.PyAirbyteInputError( 858 message="manifest_yaml is required for YAML connectors", 859 context={"name": name}, 860 ) 861 862 if pre_validate: 863 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 864 865 result = api_util.create_custom_yaml_source_definition( 866 name=name, 867 workspace_id=self.workspace_id, 868 manifest=manifest_dict, 869 api_root=self.api_root, 870 client_id=self.client_id, 871 client_secret=self.client_secret, 872 bearer_token=self.bearer_token, 873 ) 874 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 875 self, result 876 ) 877 878 # Set testing values if provided 879 if testing_values is not None: 880 custom_definition.set_testing_values(testing_values) 881 882 return custom_definition 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 ) 888 889 def list_custom_source_definitions( 890 self, 891 *, 892 definition_type: Literal["yaml", "docker"], 893 ) -> list[CustomCloudSourceDefinition]: 894 """List custom source connector definitions. 895 896 Args: 897 definition_type: Connector type to list ("yaml" or "docker"). Required. 898 899 Returns: 900 List of CustomCloudSourceDefinition objects matching the specified type 901 """ 902 if definition_type == "yaml": 903 yaml_definitions = api_util.list_custom_yaml_source_definitions( 904 workspace_id=self.workspace_id, 905 api_root=self.api_root, 906 client_id=self.client_id, 907 client_secret=self.client_secret, 908 bearer_token=self.bearer_token, 909 ) 910 return [ 911 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 912 for d in yaml_definitions 913 ] 914 915 raise NotImplementedError( 916 "Docker custom source definitions are not yet supported. " 917 "Only YAML manifest-based custom sources are currently available." 918 ) 919 920 def get_custom_source_definition( 921 self, 922 definition_id: str, 923 *, 924 definition_type: Literal["yaml", "docker"], 925 ) -> CustomCloudSourceDefinition: 926 """Get a specific custom source definition by ID. 927 928 Args: 929 definition_id: The definition ID 930 definition_type: Connector type ("yaml" or "docker"). Required. 931 932 Returns: 933 CustomCloudSourceDefinition object 934 """ 935 if definition_type == "yaml": 936 result = api_util.get_custom_yaml_source_definition( 937 workspace_id=self.workspace_id, 938 definition_id=definition_id, 939 api_root=self.api_root, 940 client_id=self.client_id, 941 client_secret=self.client_secret, 942 bearer_token=self.bearer_token, 943 ) 944 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 945 946 raise NotImplementedError( 947 "Docker custom source definitions are not yet supported. " 948 "Only YAML manifest-based custom sources are currently available." 949 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
Two authentication methods are supported (mutually exclusive):
- OAuth2 client credentials (client_id + client_secret)
- Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", )
Example with bearer token:
workspace = CloudWorkspace( workspace_id="...", bearer_token="...", )
111 def __init__( 112 self, 113 *, 114 workspace_id: str | None = None, 115 client_id: str | SecretString | None = None, 116 client_secret: str | SecretString | None = None, 117 api_root: str | None = None, 118 config_api_root: str | None = None, 119 bearer_token: str | SecretString | None = None, 120 ) -> None: 121 """Validate and initialize credentials.""" 122 env_vars = not (client_id or client_secret or bearer_token) 123 credentials = _AirbyteCredentials.from_auth( 124 workspace_id=workspace_id, 125 client_id=client_id, 126 client_secret=client_secret, 127 bearer_token=bearer_token, 128 public_api_root=api_root, 129 config_api_root=config_api_root, 130 env_vars=env_vars, 131 ) 132 if not credentials.workspace_id: 133 raise exc.PyAirbyteInputError( 134 message="Workspace ID is required.", 135 guidance="Provide a workspace ID.", 136 ) 137 138 self._credentials = credentials 139 self.workspace_id = credentials.workspace_id or "" 140 self.client_id = credentials.client_id 141 self.client_secret = credentials.client_secret 142 self.bearer_token = credentials.bearer_token 143 self.api_root = credentials.public_api_root 144 self.config_api_root = credentials.config_api_root 145 146 # Create internal CloudClientConfig object (validates mutual exclusivity) 147 self._client_config = CloudClientConfig( 148 client_id=self.client_id, 149 client_secret=self.client_secret, 150 bearer_token=self.bearer_token, 151 api_root=self.api_root, 152 config_api_root=self.config_api_root, 153 )
Validate and initialize credentials.
155 @classmethod 156 def from_env( 157 cls, 158 workspace_id: str | None = None, 159 *, 160 api_root: str | None = None, 161 config_api_root: str | None = None, 162 ) -> CloudWorkspace: 163 """Create a CloudWorkspace using credentials from environment variables. 164 165 This factory method resolves credentials from environment variables, 166 providing a convenient way to create a workspace without explicitly 167 passing credentials. 168 169 Two authentication methods are supported (mutually exclusive): 170 1. Bearer token (checked first) 171 2. OAuth2 client credentials (fallback) 172 173 Environment variables used: 174 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 175 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 176 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 177 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 178 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 179 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 180 181 Args: 182 workspace_id: The workspace ID. If not provided, will be resolved from 183 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 184 api_root: The API root URL. If not provided, will be resolved from 185 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 186 the Airbyte Cloud API. 187 config_api_root: The Config API root URL. If not provided, will be resolved 188 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 189 190 Returns: 191 A CloudWorkspace instance configured with credentials from the environment. 192 193 Raises: 194 PyAirbyteInputError: If required credentials are not found in 195 the environment or are incomplete. 196 197 Example: 198 ```python 199 # With workspace_id from environment 200 workspace = CloudWorkspace.from_env() 201 202 # With explicit workspace_id 203 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 204 ``` 205 """ 206 return cls( 207 workspace_id=workspace_id, 208 api_root=api_root, 209 config_api_root=config_api_root, 210 )
Create a CloudWorkspace using credentials from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.
Two authentication methods are supported (mutually exclusive):
- Bearer token (checked first)
- OAuth2 client credentials (fallback)
Environment variables used:
AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).AIRBYTE_CLOUD_CONFIG_API_URL: Optional. The Config API root URL.
Arguments:
- workspace_id: The workspace ID. If not provided, will be resolved from
the
AIRBYTE_CLOUD_WORKSPACE_IDenvironment variable. - api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API. - config_api_root: The Config API root URL. If not provided, will be resolved
from the
AIRBYTE_CLOUD_CONFIG_API_URLenvironment variable.
Returns:
A CloudWorkspace instance configured with credentials from the environment.
Raises:
- PyAirbyteInputError: If required credentials are not found in the environment or are incomplete.
Example:
# With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
212 @property 213 def workspace_url(self) -> str | None: 214 """The web URL of the workspace.""" 215 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
250 def get_organization( 251 self, 252 *, 253 raise_on_error: bool = True, 254 ) -> CloudOrganization | None: 255 """Get the organization this workspace belongs to. 256 257 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 258 which may not be available with workspace-scoped credentials. 259 260 Args: 261 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 262 If False, returns None instead of raising. 263 264 Returns: 265 CloudOrganization object with organization_id and organization_name, 266 or None if raise_on_error=False and an error occurred. 267 268 Raises: 269 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 270 (e.g., due to insufficient permissions or missing data). 271 """ 272 try: 273 info = self._organization_info 274 except (AirbyteError, NotImplementedError): 275 if raise_on_error: 276 raise 277 return None 278 279 organization_id = info.get("organizationId") 280 organization_name = info.get("organizationName") 281 282 # Validate that both organization_id and organization_name are non-null and non-empty 283 if not organization_id or not organization_name: 284 if raise_on_error: 285 raise AirbyteError( 286 message="Organization info is incomplete.", 287 context={ 288 "organization_id": organization_id, 289 "organization_name": organization_name, 290 }, 291 ) 292 return None 293 294 organization_credentials = self._credentials.with_organization_id(organization_id) 295 return CloudOrganization( 296 organization_id=organization_id, 297 organization_name=organization_name, 298 client_id=organization_credentials.client_id, 299 client_secret=organization_credentials.client_secret, 300 bearer_token=organization_credentials.bearer_token, 301 public_api_root=organization_credentials.public_api_root, 302 config_api_root=organization_credentials.config_api_root, 303 )
Get the organization this workspace belongs to.
Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.
Arguments:
- raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:
CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.
Raises:
- AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
307 def connect(self) -> None: 308 """Check that the workspace is reachable and raise an exception otherwise. 309 310 Note: It is not necessary to call this method before calling other operations. It 311 serves primarily as a simple check to ensure that the workspace is reachable 312 and credentials are correct. 313 """ 314 _ = api_util.get_workspace( 315 api_root=self.api_root, 316 workspace_id=self.workspace_id, 317 client_id=self.client_id, 318 client_secret=self.client_secret, 319 bearer_token=self.bearer_token, 320 ) 321 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
325 def get_connection( 326 self, 327 connection_id: str, 328 ) -> CloudConnection: 329 """Get a connection by ID. 330 331 This method does not fetch data from the API. It returns a `CloudConnection` object, 332 which will be loaded lazily as needed. 333 """ 334 return CloudConnection( 335 workspace=self, 336 connection_id=connection_id, 337 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
339 def get_source( 340 self, 341 source_id: str, 342 ) -> CloudSource: 343 """Get a source by ID. 344 345 This method does not fetch data from the API. It returns a `CloudSource` object, 346 which will be loaded lazily as needed. 347 """ 348 return CloudSource( 349 workspace=self, 350 connector_id=source_id, 351 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
353 def get_destination( 354 self, 355 destination_id: str, 356 ) -> CloudDestination: 357 """Get a destination by ID. 358 359 This method does not fetch data from the API. It returns a `CloudDestination` object, 360 which will be loaded lazily as needed. 361 """ 362 return CloudDestination( 363 workspace=self, 364 connector_id=destination_id, 365 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
369 def deploy_source( 370 self, 371 name: str, 372 source: Source, 373 *, 374 unique: bool = True, 375 random_name_suffix: bool = False, 376 ) -> CloudSource: 377 """Deploy a source to the workspace. 378 379 Returns the newly deployed source. 380 381 Args: 382 name: The name to use when deploying. 383 source: The source object to deploy. 384 unique: Whether to require a unique name. If `True`, duplicate names 385 are not allowed. Defaults to `True`. 386 random_name_suffix: Whether to append a random suffix to the name. 387 """ 388 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 389 source_config_dict["sourceType"] = source.name.replace("source-", "") 390 391 if random_name_suffix: 392 name += f" (ID: {text_util.generate_random_suffix()})" 393 394 if unique: 395 existing = self.list_sources(name=name) 396 if existing: 397 raise exc.AirbyteDuplicateResourcesError( 398 resource_type="source", 399 resource_name=name, 400 ) 401 402 deployed_source = api_util.create_source( 403 name=name, 404 api_root=self.api_root, 405 workspace_id=self.workspace_id, 406 config=source_config_dict, 407 client_id=self.client_id, 408 client_secret=self.client_secret, 409 bearer_token=self.bearer_token, 410 ) 411 return CloudSource( 412 workspace=self, 413 connector_id=deployed_source.source_id, 414 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
416 def deploy_destination( 417 self, 418 name: str, 419 destination: Destination | dict[str, Any], 420 *, 421 unique: bool = True, 422 random_name_suffix: bool = False, 423 ) -> CloudDestination: 424 """Deploy a destination to the workspace. 425 426 Returns the newly deployed destination ID. 427 428 Args: 429 name: The name to use when deploying. 430 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 431 dictionary of configuration values. 432 unique: Whether to require a unique name. If `True`, duplicate names 433 are not allowed. Defaults to `True`. 434 random_name_suffix: Whether to append a random suffix to the name. 435 """ 436 if isinstance(destination, Destination): 437 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 438 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 439 # raise ValueError(destination_conf_dict) 440 else: 441 destination_conf_dict = destination.copy() 442 if "destinationType" not in destination_conf_dict: 443 raise exc.PyAirbyteInputError( 444 message="Missing `destinationType` in configuration dictionary.", 445 ) 446 447 if random_name_suffix: 448 name += f" (ID: {text_util.generate_random_suffix()})" 449 450 if unique: 451 existing = self.list_destinations(name=name) 452 if existing: 453 raise exc.AirbyteDuplicateResourcesError( 454 resource_type="destination", 455 resource_name=name, 456 ) 457 458 deployed_destination = api_util.create_destination( 459 name=name, 460 api_root=self.api_root, 461 workspace_id=self.workspace_id, 462 config=destination_conf_dict, # Wants a dataclass but accepts dict 463 client_id=self.client_id, 464 client_secret=self.client_secret, 465 bearer_token=self.bearer_token, 466 ) 467 return CloudDestination( 468 workspace=self, 469 connector_id=deployed_destination.destination_id, 470 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
472 def permanently_delete_source( 473 self, 474 source: str | CloudSource, 475 *, 476 safe_mode: bool = True, 477 ) -> None: 478 """Delete a source from the workspace. 479 480 You can pass either the source ID `str` or a deployed `Source` object. 481 482 Args: 483 source: The source ID or CloudSource object to delete 484 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 485 (case insensitive) to prevent accidental deletion. Defaults to True. 486 """ 487 if not isinstance(source, (str, CloudSource)): 488 raise exc.PyAirbyteInputError( 489 message="Invalid source type.", 490 input_value=type(source).__name__, 491 ) 492 493 api_util.delete_source( 494 source_id=source.connector_id if isinstance(source, CloudSource) else source, 495 source_name=source.name if isinstance(source, CloudSource) else None, 496 api_root=self.api_root, 497 client_id=self.client_id, 498 client_secret=self.client_secret, 499 bearer_token=self.bearer_token, 500 safe_mode=safe_mode, 501 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
505 def permanently_delete_destination( 506 self, 507 destination: str | CloudDestination, 508 *, 509 safe_mode: bool = True, 510 ) -> None: 511 """Delete a deployed destination from the workspace. 512 513 You can pass either the `Cache` class or the deployed destination ID as a `str`. 514 515 Args: 516 destination: The destination ID or CloudDestination object to delete 517 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 518 (case insensitive) to prevent accidental deletion. Defaults to True. 519 """ 520 if not isinstance(destination, (str, CloudDestination)): 521 raise exc.PyAirbyteInputError( 522 message="Invalid destination type.", 523 input_value=type(destination).__name__, 524 ) 525 526 api_util.delete_destination( 527 destination_id=( 528 destination if isinstance(destination, str) else destination.destination_id 529 ), 530 destination_name=( 531 destination.name if isinstance(destination, CloudDestination) else None 532 ), 533 api_root=self.api_root, 534 client_id=self.client_id, 535 client_secret=self.client_secret, 536 bearer_token=self.bearer_token, 537 safe_mode=safe_mode, 538 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
542 def deploy_connection( 543 self, 544 connection_name: str, 545 *, 546 source: CloudSource | str, 547 selected_streams: list[str], 548 destination: CloudDestination | str, 549 table_prefix: str | None = None, 550 ) -> CloudConnection: 551 """Create a new connection between an already deployed source and destination. 552 553 Returns the newly deployed connection object. 554 555 Args: 556 connection_name: The name of the connection. 557 source: The deployed source. You can pass a source ID or a CloudSource object. 558 destination: The deployed destination. You can pass a destination ID or a 559 CloudDestination object. 560 table_prefix: Optional. The table prefix to use when syncing to the destination. 561 selected_streams: The selected stream names to sync within the connection. 562 """ 563 if not selected_streams: 564 raise exc.PyAirbyteInputError( 565 guidance="You must provide `selected_streams` when creating a connection." 566 ) 567 568 source_id: str = source if isinstance(source, str) else source.connector_id 569 destination_id: str = ( 570 destination if isinstance(destination, str) else destination.connector_id 571 ) 572 573 deployed_connection = api_util.create_connection( 574 name=connection_name, 575 source_id=source_id, 576 destination_id=destination_id, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 selected_stream_names=selected_streams, 580 prefix=table_prefix or "", 581 client_id=self.client_id, 582 client_secret=self.client_secret, 583 bearer_token=self.bearer_token, 584 ) 585 586 return CloudConnection( 587 workspace=self, 588 connection_id=deployed_connection.connection_id, 589 source=deployed_connection.source_id, 590 destination=deployed_connection.destination_id, 591 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
593 def permanently_delete_connection( 594 self, 595 connection: str | CloudConnection, 596 *, 597 cascade_delete_source: bool = False, 598 cascade_delete_destination: bool = False, 599 safe_mode: bool = True, 600 ) -> None: 601 """Delete a deployed connection from the workspace. 602 603 Args: 604 connection: The connection ID or CloudConnection object to delete 605 cascade_delete_source: If True, also delete the source after deleting the connection 606 cascade_delete_destination: If True, also delete the destination after deleting 607 the connection 608 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 609 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 610 to cascade deletes. 611 """ 612 if connection is None: 613 raise ValueError("No connection ID provided.") 614 615 if isinstance(connection, str): 616 connection = CloudConnection( 617 workspace=self, 618 connection_id=connection, 619 ) 620 621 api_util.delete_connection( 622 connection_id=connection.connection_id, 623 connection_name=connection.name, 624 api_root=self.api_root, 625 workspace_id=self.workspace_id, 626 client_id=self.client_id, 627 client_secret=self.client_secret, 628 bearer_token=self.bearer_token, 629 safe_mode=safe_mode, 630 ) 631 632 if cascade_delete_source: 633 self.permanently_delete_source( 634 source=connection.source_id, 635 safe_mode=safe_mode, 636 ) 637 if cascade_delete_destination: 638 self.permanently_delete_destination( 639 destination=connection.destination_id, 640 safe_mode=safe_mode, 641 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
645 def list_workspaces( 646 self, 647 name: str | None = None, 648 *, 649 name_filter: Callable | None = None, 650 limit: int | None = None, 651 ) -> list[CloudWorkspaceInfo]: 652 """List workspaces available to the current credentials, with an optional limit.""" 653 return [ 654 CloudWorkspaceInfo.from_api_response(workspace) 655 for workspace in api_util.list_workspaces( 656 workspace_id="", 657 api_root=self.api_root, 658 name=name, 659 name_filter=name_filter, 660 client_id=self.client_id, 661 client_secret=self.client_secret, 662 bearer_token=self.bearer_token, 663 limit=limit, 664 ) 665 ]
List workspaces available to the current credentials, with an optional limit.
667 def rename( 668 self, 669 name: str, 670 ) -> CloudWorkspace: 671 """Rename this workspace.""" 672 api_util.rename_workspace( 673 workspace_id=self.workspace_id, 674 name=name, 675 api_root=self.api_root, 676 client_id=self.client_id, 677 client_secret=self.client_secret, 678 bearer_token=self.bearer_token, 679 ) 680 return self
Rename this workspace.
682 def permanently_delete( 683 self, 684 *, 685 workspace_name: str | None = None, 686 safe_mode: bool = True, 687 ) -> None: 688 """Permanently delete this workspace if it has no connections. 689 690 When `safe_mode` is enabled, the workspace name must contain `delete-me` 691 or `deleteme`. This also checks for existing connections before deleting 692 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 693 """ 694 api_util.permanently_delete_workspace( 695 workspace_id=self.workspace_id, 696 workspace_name=workspace_name, 697 api_root=self.api_root, 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 safe_mode=safe_mode, 702 )
Permanently delete this workspace if it has no connections.
When safe_mode is enabled, the workspace name must contain delete-me
or deleteme. This also checks for existing connections before deleting
and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.
704 def list_connections( 705 self, 706 name: str | None = None, 707 *, 708 name_filter: Callable | None = None, 709 limit: int | None = None, 710 ) -> list[CloudConnection]: 711 """List connections by name in the workspace, with an optional limit.""" 712 connections = api_util.list_connections( 713 api_root=self.api_root, 714 workspace_id=self.workspace_id, 715 name=name, 716 name_filter=name_filter, 717 limit=limit, 718 client_id=self.client_id, 719 client_secret=self.client_secret, 720 bearer_token=self.bearer_token, 721 ) 722 return [ 723 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 724 workspace=self, 725 connection_response=connection, 726 ) 727 for connection in connections 728 ]
List connections by name in the workspace, with an optional limit.
730 def list_sources( 731 self, 732 name: str | None = None, 733 *, 734 name_filter: Callable | None = None, 735 limit: int | None = None, 736 ) -> list[CloudSource]: 737 """List all sources in the workspace, with an optional limit.""" 738 sources = api_util.list_sources( 739 api_root=self.api_root, 740 workspace_id=self.workspace_id, 741 name=name, 742 name_filter=name_filter, 743 limit=limit, 744 client_id=self.client_id, 745 client_secret=self.client_secret, 746 bearer_token=self.bearer_token, 747 ) 748 return [ 749 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 750 workspace=self, 751 source_response=source, 752 ) 753 for source in sources 754 ]
List all sources in the workspace, with an optional limit.
756 def list_destinations( 757 self, 758 name: str | None = None, 759 *, 760 name_filter: Callable | None = None, 761 limit: int | None = None, 762 ) -> list[CloudDestination]: 763 """List all destinations in the workspace, with an optional limit.""" 764 destinations = api_util.list_destinations( 765 api_root=self.api_root, 766 workspace_id=self.workspace_id, 767 name=name, 768 name_filter=name_filter, 769 limit=limit, 770 client_id=self.client_id, 771 client_secret=self.client_secret, 772 bearer_token=self.bearer_token, 773 ) 774 return [ 775 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 776 workspace=self, 777 destination_response=destination, 778 ) 779 for destination in destinations 780 ]
List all destinations in the workspace, with an optional limit.
782 def publish_custom_source_definition( 783 self, 784 name: str, 785 *, 786 manifest_yaml: dict[str, Any] | Path | str | None = None, 787 docker_image: str | None = None, 788 docker_tag: str | None = None, 789 unique: bool = True, 790 pre_validate: bool = True, 791 testing_values: dict[str, Any] | None = None, 792 ) -> CustomCloudSourceDefinition: 793 """Publish a custom source connector definition. 794 795 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 796 and docker_tag (for Docker connectors), but not both. 797 798 Args: 799 name: Display name for the connector definition 800 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 801 docker_image: Docker repository (e.g., 'airbyte/source-custom') 802 docker_tag: Docker image tag (e.g., '1.0.0') 803 unique: Whether to enforce name uniqueness 804 pre_validate: Whether to validate manifest client-side (YAML only) 805 testing_values: Optional configuration values to use for testing in the 806 Connector Builder UI. If provided, these values are stored as the complete 807 testing values object for the connector builder project (replaces any existing 808 values), allowing immediate test read operations. 809 810 Returns: 811 CustomCloudSourceDefinition object representing the created definition 812 813 Raises: 814 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 815 AirbyteDuplicateResourcesError: If unique=True and name already exists 816 """ 817 is_yaml = manifest_yaml is not None 818 is_docker = docker_image is not None 819 820 if is_yaml == is_docker: 821 raise exc.PyAirbyteInputError( 822 message=( 823 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 824 "docker_image + docker_tag (for Docker connectors), but not both" 825 ), 826 context={ 827 "manifest_yaml_provided": is_yaml, 828 "docker_image_provided": is_docker, 829 }, 830 ) 831 832 if is_docker and docker_tag is None: 833 raise exc.PyAirbyteInputError( 834 message="docker_tag is required when docker_image is specified", 835 context={"docker_image": docker_image}, 836 ) 837 838 if unique: 839 existing = self.list_custom_source_definitions( 840 definition_type="yaml" if is_yaml else "docker", 841 ) 842 if any(d.name == name for d in existing): 843 raise exc.AirbyteDuplicateResourcesError( 844 resource_type="custom_source_definition", 845 resource_name=name, 846 ) 847 848 if is_yaml: 849 manifest_dict: dict[str, Any] 850 if isinstance(manifest_yaml, Path): 851 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 852 elif isinstance(manifest_yaml, str): 853 manifest_dict = yaml.safe_load(manifest_yaml) 854 elif manifest_yaml is not None: 855 manifest_dict = manifest_yaml 856 else: 857 raise exc.PyAirbyteInputError( 858 message="manifest_yaml is required for YAML connectors", 859 context={"name": name}, 860 ) 861 862 if pre_validate: 863 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 864 865 result = api_util.create_custom_yaml_source_definition( 866 name=name, 867 workspace_id=self.workspace_id, 868 manifest=manifest_dict, 869 api_root=self.api_root, 870 client_id=self.client_id, 871 client_secret=self.client_secret, 872 bearer_token=self.bearer_token, 873 ) 874 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 875 self, result 876 ) 877 878 # Set testing values if provided 879 if testing_values is not None: 880 custom_definition.set_testing_values(testing_values) 881 882 return custom_definition 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
889 def list_custom_source_definitions( 890 self, 891 *, 892 definition_type: Literal["yaml", "docker"], 893 ) -> list[CustomCloudSourceDefinition]: 894 """List custom source connector definitions. 895 896 Args: 897 definition_type: Connector type to list ("yaml" or "docker"). Required. 898 899 Returns: 900 List of CustomCloudSourceDefinition objects matching the specified type 901 """ 902 if definition_type == "yaml": 903 yaml_definitions = api_util.list_custom_yaml_source_definitions( 904 workspace_id=self.workspace_id, 905 api_root=self.api_root, 906 client_id=self.client_id, 907 client_secret=self.client_secret, 908 bearer_token=self.bearer_token, 909 ) 910 return [ 911 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 912 for d in yaml_definitions 913 ] 914 915 raise NotImplementedError( 916 "Docker custom source definitions are not yet supported. " 917 "Only YAML manifest-based custom sources are currently available." 918 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
920 def get_custom_source_definition( 921 self, 922 definition_id: str, 923 *, 924 definition_type: Literal["yaml", "docker"], 925 ) -> CustomCloudSourceDefinition: 926 """Get a specific custom source definition by ID. 927 928 Args: 929 definition_id: The definition ID 930 definition_type: Connector type ("yaml" or "docker"). Required. 931 932 Returns: 933 CustomCloudSourceDefinition object 934 """ 935 if definition_type == "yaml": 936 result = api_util.get_custom_yaml_source_definition( 937 workspace_id=self.workspace_id, 938 definition_id=definition_id, 939 api_root=self.api_root, 940 client_id=self.client_id, 941 client_secret=self.client_secret, 942 bearer_token=self.bearer_token, 943 ) 944 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 945 946 raise NotImplementedError( 947 "Docker custom source definitions are not yet supported. " 948 "Only YAML manifest-based custom sources are currently available." 949 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object