airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
name="test-source",
source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass, field 39from functools import cached_property 40from pathlib import Path 41from typing import TYPE_CHECKING, Any, Literal, overload 42 43import yaml 44 45from airbyte import exceptions as exc 46from airbyte._util import api_util, text_util 47from airbyte._util.api_util import get_web_url_root 48from airbyte.cloud.auth import ( 49 resolve_cloud_api_url, 50 resolve_cloud_bearer_token, 51 resolve_cloud_client_id, 52 resolve_cloud_client_secret, 53 resolve_cloud_workspace_id, 54) 55from airbyte.cloud.client_config import CloudClientConfig 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.connectors import ( 58 CloudDestination, 59 CloudSource, 60 CustomCloudSourceDefinition, 61) 62from airbyte.destinations.base import Destination 63from airbyte.exceptions import AirbyteError 64from airbyte.secrets.base import SecretString 65 66 67if TYPE_CHECKING: 68 from collections.abc import Callable 69 70 from airbyte.sources.base import Source 71 72 73@dataclass 74class CloudOrganization: 75 """Information about an organization in Airbyte Cloud. 76 77 This is a minimal value object returned by CloudWorkspace.get_organization(). 78 """ 79 80 organization_id: str 81 """The organization ID.""" 82 83 organization_name: str | None = None 84 """Display name of the organization.""" 85 86 87@dataclass 88class CloudWorkspace: 89 """A remote workspace on the Airbyte Cloud. 90 91 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 92 instances, both OSS and Enterprise. 93 94 Two authentication methods are supported (mutually exclusive): 95 1. OAuth2 client credentials (client_id + client_secret) 96 2. Bearer token authentication 97 98 Example with client credentials: 99 ```python 100 workspace = CloudWorkspace( 101 workspace_id="...", 102 client_id="...", 103 client_secret="...", 104 ) 105 ``` 106 107 Example with bearer token: 108 ```python 109 workspace = CloudWorkspace( 110 workspace_id="...", 111 bearer_token="...", 112 ) 113 ``` 114 """ 115 116 workspace_id: str 117 client_id: SecretString | None = None 118 client_secret: SecretString | None = None 119 api_root: str = api_util.CLOUD_API_ROOT 120 bearer_token: SecretString | None = None 121 122 # Internal credentials object (set in __post_init__, excluded from __init__) 123 _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) 124 125 def __post_init__(self) -> None: 126 """Validate and initialize credentials.""" 127 # Wrap secrets in SecretString if provided 128 if self.client_id is not None: 129 self.client_id = SecretString(self.client_id) 130 if self.client_secret is not None: 131 self.client_secret = SecretString(self.client_secret) 132 if self.bearer_token is not None: 133 self.bearer_token = SecretString(self.bearer_token) 134 135 # Create internal CloudClientConfig object (validates mutual exclusivity) 136 self._credentials = CloudClientConfig( 137 client_id=self.client_id, 138 client_secret=self.client_secret, 139 bearer_token=self.bearer_token, 140 api_root=self.api_root, 141 ) 142 143 @classmethod 144 def from_env( 145 cls, 146 workspace_id: str | None = None, 147 *, 148 api_root: str | None = None, 149 ) -> CloudWorkspace: 150 """Create a CloudWorkspace using credentials from environment variables. 151 152 This factory method resolves credentials from environment variables, 153 providing a convenient way to create a workspace without explicitly 154 passing credentials. 155 156 Two authentication methods are supported (mutually exclusive): 157 1. Bearer token (checked first) 158 2. OAuth2 client credentials (fallback) 159 160 Environment variables used: 161 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 162 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 163 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 164 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 165 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 166 167 Args: 168 workspace_id: The workspace ID. If not provided, will be resolved from 169 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 170 api_root: The API root URL. If not provided, will be resolved from 171 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 172 the Airbyte Cloud API. 173 174 Returns: 175 A CloudWorkspace instance configured with credentials from the environment. 176 177 Raises: 178 PyAirbyteSecretNotFoundError: If required credentials are not found in 179 the environment. 180 181 Example: 182 ```python 183 # With workspace_id from environment 184 workspace = CloudWorkspace.from_env() 185 186 # With explicit workspace_id 187 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 188 ``` 189 """ 190 resolved_api_root = resolve_cloud_api_url(api_root) 191 192 # Try bearer token first 193 bearer_token = resolve_cloud_bearer_token() 194 if bearer_token: 195 return cls( 196 workspace_id=resolve_cloud_workspace_id(workspace_id), 197 bearer_token=bearer_token, 198 api_root=resolved_api_root, 199 ) 200 201 # Fall back to client credentials 202 return cls( 203 workspace_id=resolve_cloud_workspace_id(workspace_id), 204 client_id=resolve_cloud_client_id(), 205 client_secret=resolve_cloud_client_secret(), 206 api_root=resolved_api_root, 207 ) 208 209 @property 210 def workspace_url(self) -> str | None: 211 """The web URL of the workspace.""" 212 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 213 214 @cached_property 215 def _organization_info(self) -> dict[str, Any]: 216 """Fetch and cache organization info for this workspace. 217 218 Uses the Config API endpoint for an efficient O(1) lookup. 219 This is an internal method; use get_organization() for public access. 220 """ 221 return api_util.get_workspace_organization_info( 222 workspace_id=self.workspace_id, 223 api_root=self.api_root, 224 client_id=self.client_id, 225 client_secret=self.client_secret, 226 bearer_token=self.bearer_token, 227 ) 228 229 @overload 230 def get_organization(self) -> CloudOrganization: ... 231 232 @overload 233 def get_organization( 234 self, 235 *, 236 raise_on_error: Literal[True], 237 ) -> CloudOrganization: ... 238 239 @overload 240 def get_organization( 241 self, 242 *, 243 raise_on_error: Literal[False], 244 ) -> CloudOrganization | None: ... 245 246 def get_organization( 247 self, 248 *, 249 raise_on_error: bool = True, 250 ) -> CloudOrganization | None: 251 """Get the organization this workspace belongs to. 252 253 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 254 which may not be available with workspace-scoped credentials. 255 256 Args: 257 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 258 If False, returns None instead of raising. 259 260 Returns: 261 CloudOrganization object with organization_id and organization_name, 262 or None if raise_on_error=False and an error occurred. 263 264 Raises: 265 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 266 (e.g., due to insufficient permissions or missing data). 267 """ 268 try: 269 info = self._organization_info 270 except (AirbyteError, NotImplementedError): 271 if raise_on_error: 272 raise 273 return None 274 275 organization_id = info.get("organizationId") 276 organization_name = info.get("organizationName") 277 278 # Validate that both organization_id and organization_name are non-null and non-empty 279 if not organization_id or not organization_name: 280 if raise_on_error: 281 raise AirbyteError( 282 message="Organization info is incomplete.", 283 context={ 284 "organization_id": organization_id, 285 "organization_name": organization_name, 286 }, 287 ) 288 return None 289 290 return CloudOrganization( 291 organization_id=organization_id, 292 organization_name=organization_name, 293 ) 294 295 # Test connection and creds 296 297 def connect(self) -> None: 298 """Check that the workspace is reachable and raise an exception otherwise. 299 300 Note: It is not necessary to call this method before calling other operations. It 301 serves primarily as a simple check to ensure that the workspace is reachable 302 and credentials are correct. 303 """ 304 _ = api_util.get_workspace( 305 api_root=self.api_root, 306 workspace_id=self.workspace_id, 307 client_id=self.client_id, 308 client_secret=self.client_secret, 309 bearer_token=self.bearer_token, 310 ) 311 print(f"Successfully connected to workspace: {self.workspace_url}") 312 313 # Get sources, destinations, and connections 314 315 def get_connection( 316 self, 317 connection_id: str, 318 ) -> CloudConnection: 319 """Get a connection by ID. 320 321 This method does not fetch data from the API. It returns a `CloudConnection` object, 322 which will be loaded lazily as needed. 323 """ 324 return CloudConnection( 325 workspace=self, 326 connection_id=connection_id, 327 ) 328 329 def get_source( 330 self, 331 source_id: str, 332 ) -> CloudSource: 333 """Get a source by ID. 334 335 This method does not fetch data from the API. It returns a `CloudSource` object, 336 which will be loaded lazily as needed. 337 """ 338 return CloudSource( 339 workspace=self, 340 connector_id=source_id, 341 ) 342 343 def get_destination( 344 self, 345 destination_id: str, 346 ) -> CloudDestination: 347 """Get a destination by ID. 348 349 This method does not fetch data from the API. It returns a `CloudDestination` object, 350 which will be loaded lazily as needed. 351 """ 352 return CloudDestination( 353 workspace=self, 354 connector_id=destination_id, 355 ) 356 357 # Deploy sources and destinations 358 359 def deploy_source( 360 self, 361 name: str, 362 source: Source, 363 *, 364 unique: bool = True, 365 random_name_suffix: bool = False, 366 ) -> CloudSource: 367 """Deploy a source to the workspace. 368 369 Returns the newly deployed source. 370 371 Args: 372 name: The name to use when deploying. 373 source: The source object to deploy. 374 unique: Whether to require a unique name. If `True`, duplicate names 375 are not allowed. Defaults to `True`. 376 random_name_suffix: Whether to append a random suffix to the name. 377 """ 378 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 379 source_config_dict["sourceType"] = source.name.replace("source-", "") 380 381 if random_name_suffix: 382 name += f" (ID: {text_util.generate_random_suffix()})" 383 384 if unique: 385 existing = self.list_sources(name=name) 386 if existing: 387 raise exc.AirbyteDuplicateResourcesError( 388 resource_type="source", 389 resource_name=name, 390 ) 391 392 deployed_source = api_util.create_source( 393 name=name, 394 api_root=self.api_root, 395 workspace_id=self.workspace_id, 396 config=source_config_dict, 397 client_id=self.client_id, 398 client_secret=self.client_secret, 399 bearer_token=self.bearer_token, 400 ) 401 return CloudSource( 402 workspace=self, 403 connector_id=deployed_source.source_id, 404 ) 405 406 def deploy_destination( 407 self, 408 name: str, 409 destination: Destination | dict[str, Any], 410 *, 411 unique: bool = True, 412 random_name_suffix: bool = False, 413 ) -> CloudDestination: 414 """Deploy a destination to the workspace. 415 416 Returns the newly deployed destination ID. 417 418 Args: 419 name: The name to use when deploying. 420 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 421 dictionary of configuration values. 422 unique: Whether to require a unique name. If `True`, duplicate names 423 are not allowed. Defaults to `True`. 424 random_name_suffix: Whether to append a random suffix to the name. 425 """ 426 if isinstance(destination, Destination): 427 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 428 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 429 # raise ValueError(destination_conf_dict) 430 else: 431 destination_conf_dict = destination.copy() 432 if "destinationType" not in destination_conf_dict: 433 raise exc.PyAirbyteInputError( 434 message="Missing `destinationType` in configuration dictionary.", 435 ) 436 437 if random_name_suffix: 438 name += f" (ID: {text_util.generate_random_suffix()})" 439 440 if unique: 441 existing = self.list_destinations(name=name) 442 if existing: 443 raise exc.AirbyteDuplicateResourcesError( 444 resource_type="destination", 445 resource_name=name, 446 ) 447 448 deployed_destination = api_util.create_destination( 449 name=name, 450 api_root=self.api_root, 451 workspace_id=self.workspace_id, 452 config=destination_conf_dict, # Wants a dataclass but accepts dict 453 client_id=self.client_id, 454 client_secret=self.client_secret, 455 bearer_token=self.bearer_token, 456 ) 457 return CloudDestination( 458 workspace=self, 459 connector_id=deployed_destination.destination_id, 460 ) 461 462 def permanently_delete_source( 463 self, 464 source: str | CloudSource, 465 *, 466 safe_mode: bool = True, 467 ) -> None: 468 """Delete a source from the workspace. 469 470 You can pass either the source ID `str` or a deployed `Source` object. 471 472 Args: 473 source: The source ID or CloudSource object to delete 474 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 475 (case insensitive) to prevent accidental deletion. Defaults to True. 476 """ 477 if not isinstance(source, (str, CloudSource)): 478 raise exc.PyAirbyteInputError( 479 message="Invalid source type.", 480 input_value=type(source).__name__, 481 ) 482 483 api_util.delete_source( 484 source_id=source.connector_id if isinstance(source, CloudSource) else source, 485 source_name=source.name if isinstance(source, CloudSource) else None, 486 api_root=self.api_root, 487 client_id=self.client_id, 488 client_secret=self.client_secret, 489 bearer_token=self.bearer_token, 490 safe_mode=safe_mode, 491 ) 492 493 # Deploy and delete destinations 494 495 def permanently_delete_destination( 496 self, 497 destination: str | CloudDestination, 498 *, 499 safe_mode: bool = True, 500 ) -> None: 501 """Delete a deployed destination from the workspace. 502 503 You can pass either the `Cache` class or the deployed destination ID as a `str`. 504 505 Args: 506 destination: The destination ID or CloudDestination object to delete 507 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 508 (case insensitive) to prevent accidental deletion. Defaults to True. 509 """ 510 if not isinstance(destination, (str, CloudDestination)): 511 raise exc.PyAirbyteInputError( 512 message="Invalid destination type.", 513 input_value=type(destination).__name__, 514 ) 515 516 api_util.delete_destination( 517 destination_id=( 518 destination if isinstance(destination, str) else destination.destination_id 519 ), 520 destination_name=( 521 destination.name if isinstance(destination, CloudDestination) else None 522 ), 523 api_root=self.api_root, 524 client_id=self.client_id, 525 client_secret=self.client_secret, 526 bearer_token=self.bearer_token, 527 safe_mode=safe_mode, 528 ) 529 530 # Deploy and delete connections 531 532 def deploy_connection( 533 self, 534 connection_name: str, 535 *, 536 source: CloudSource | str, 537 selected_streams: list[str], 538 destination: CloudDestination | str, 539 table_prefix: str | None = None, 540 ) -> CloudConnection: 541 """Create a new connection between an already deployed source and destination. 542 543 Returns the newly deployed connection object. 544 545 Args: 546 connection_name: The name of the connection. 547 source: The deployed source. You can pass a source ID or a CloudSource object. 548 destination: The deployed destination. You can pass a destination ID or a 549 CloudDestination object. 550 table_prefix: Optional. The table prefix to use when syncing to the destination. 551 selected_streams: The selected stream names to sync within the connection. 552 """ 553 if not selected_streams: 554 raise exc.PyAirbyteInputError( 555 guidance="You must provide `selected_streams` when creating a connection." 556 ) 557 558 source_id: str = source if isinstance(source, str) else source.connector_id 559 destination_id: str = ( 560 destination if isinstance(destination, str) else destination.connector_id 561 ) 562 563 deployed_connection = api_util.create_connection( 564 name=connection_name, 565 source_id=source_id, 566 destination_id=destination_id, 567 api_root=self.api_root, 568 workspace_id=self.workspace_id, 569 selected_stream_names=selected_streams, 570 prefix=table_prefix or "", 571 client_id=self.client_id, 572 client_secret=self.client_secret, 573 bearer_token=self.bearer_token, 574 ) 575 576 return CloudConnection( 577 workspace=self, 578 connection_id=deployed_connection.connection_id, 579 source=deployed_connection.source_id, 580 destination=deployed_connection.destination_id, 581 ) 582 583 def permanently_delete_connection( 584 self, 585 connection: str | CloudConnection, 586 *, 587 cascade_delete_source: bool = False, 588 cascade_delete_destination: bool = False, 589 safe_mode: bool = True, 590 ) -> None: 591 """Delete a deployed connection from the workspace. 592 593 Args: 594 connection: The connection ID or CloudConnection object to delete 595 cascade_delete_source: If True, also delete the source after deleting the connection 596 cascade_delete_destination: If True, also delete the destination after deleting 597 the connection 598 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 599 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 600 to cascade deletes. 601 """ 602 if connection is None: 603 raise ValueError("No connection ID provided.") 604 605 if isinstance(connection, str): 606 connection = CloudConnection( 607 workspace=self, 608 connection_id=connection, 609 ) 610 611 api_util.delete_connection( 612 connection_id=connection.connection_id, 613 connection_name=connection.name, 614 api_root=self.api_root, 615 workspace_id=self.workspace_id, 616 client_id=self.client_id, 617 client_secret=self.client_secret, 618 bearer_token=self.bearer_token, 619 safe_mode=safe_mode, 620 ) 621 622 if cascade_delete_source: 623 self.permanently_delete_source( 624 source=connection.source_id, 625 safe_mode=safe_mode, 626 ) 627 if cascade_delete_destination: 628 self.permanently_delete_destination( 629 destination=connection.destination_id, 630 safe_mode=safe_mode, 631 ) 632 633 # List sources, destinations, and connections 634 635 def list_connections( 636 self, 637 name: str | None = None, 638 *, 639 name_filter: Callable | None = None, 640 ) -> list[CloudConnection]: 641 """List connections by name in the workspace. 642 643 TODO: Add pagination support 644 """ 645 connections = api_util.list_connections( 646 api_root=self.api_root, 647 workspace_id=self.workspace_id, 648 name=name, 649 name_filter=name_filter, 650 client_id=self.client_id, 651 client_secret=self.client_secret, 652 bearer_token=self.bearer_token, 653 ) 654 return [ 655 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 656 workspace=self, 657 connection_response=connection, 658 ) 659 for connection in connections 660 if name is None or connection.name == name 661 ] 662 663 def list_sources( 664 self, 665 name: str | None = None, 666 *, 667 name_filter: Callable | None = None, 668 ) -> list[CloudSource]: 669 """List all sources in the workspace. 670 671 TODO: Add pagination support 672 """ 673 sources = api_util.list_sources( 674 api_root=self.api_root, 675 workspace_id=self.workspace_id, 676 name=name, 677 name_filter=name_filter, 678 client_id=self.client_id, 679 client_secret=self.client_secret, 680 bearer_token=self.bearer_token, 681 ) 682 return [ 683 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 684 workspace=self, 685 source_response=source, 686 ) 687 for source in sources 688 if name is None or source.name == name 689 ] 690 691 def list_destinations( 692 self, 693 name: str | None = None, 694 *, 695 name_filter: Callable | None = None, 696 ) -> list[CloudDestination]: 697 """List all destinations in the workspace. 698 699 TODO: Add pagination support 700 """ 701 destinations = api_util.list_destinations( 702 api_root=self.api_root, 703 workspace_id=self.workspace_id, 704 name=name, 705 name_filter=name_filter, 706 client_id=self.client_id, 707 client_secret=self.client_secret, 708 bearer_token=self.bearer_token, 709 ) 710 return [ 711 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 712 workspace=self, 713 destination_response=destination, 714 ) 715 for destination in destinations 716 if name is None or destination.name == name 717 ] 718 719 def publish_custom_source_definition( 720 self, 721 name: str, 722 *, 723 manifest_yaml: dict[str, Any] | Path | str | None = None, 724 docker_image: str | None = None, 725 docker_tag: str | None = None, 726 unique: bool = True, 727 pre_validate: bool = True, 728 testing_values: dict[str, Any] | None = None, 729 ) -> CustomCloudSourceDefinition: 730 """Publish a custom source connector definition. 731 732 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 733 and docker_tag (for Docker connectors), but not both. 734 735 Args: 736 name: Display name for the connector definition 737 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 738 docker_image: Docker repository (e.g., 'airbyte/source-custom') 739 docker_tag: Docker image tag (e.g., '1.0.0') 740 unique: Whether to enforce name uniqueness 741 pre_validate: Whether to validate manifest client-side (YAML only) 742 testing_values: Optional configuration values to use for testing in the 743 Connector Builder UI. If provided, these values are stored as the complete 744 testing values object for the connector builder project (replaces any existing 745 values), allowing immediate test read operations. 746 747 Returns: 748 CustomCloudSourceDefinition object representing the created definition 749 750 Raises: 751 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 752 AirbyteDuplicateResourcesError: If unique=True and name already exists 753 """ 754 is_yaml = manifest_yaml is not None 755 is_docker = docker_image is not None 756 757 if is_yaml == is_docker: 758 raise exc.PyAirbyteInputError( 759 message=( 760 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 761 "docker_image + docker_tag (for Docker connectors), but not both" 762 ), 763 context={ 764 "manifest_yaml_provided": is_yaml, 765 "docker_image_provided": is_docker, 766 }, 767 ) 768 769 if is_docker and docker_tag is None: 770 raise exc.PyAirbyteInputError( 771 message="docker_tag is required when docker_image is specified", 772 context={"docker_image": docker_image}, 773 ) 774 775 if unique: 776 existing = self.list_custom_source_definitions( 777 definition_type="yaml" if is_yaml else "docker", 778 ) 779 if any(d.name == name for d in existing): 780 raise exc.AirbyteDuplicateResourcesError( 781 resource_type="custom_source_definition", 782 resource_name=name, 783 ) 784 785 if is_yaml: 786 manifest_dict: dict[str, Any] 787 if isinstance(manifest_yaml, Path): 788 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 789 elif isinstance(manifest_yaml, str): 790 manifest_dict = yaml.safe_load(manifest_yaml) 791 elif manifest_yaml is not None: 792 manifest_dict = manifest_yaml 793 else: 794 raise exc.PyAirbyteInputError( 795 message="manifest_yaml is required for YAML connectors", 796 context={"name": name}, 797 ) 798 799 if pre_validate: 800 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 801 802 result = api_util.create_custom_yaml_source_definition( 803 name=name, 804 workspace_id=self.workspace_id, 805 manifest=manifest_dict, 806 api_root=self.api_root, 807 client_id=self.client_id, 808 client_secret=self.client_secret, 809 bearer_token=self.bearer_token, 810 ) 811 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 812 self, result 813 ) 814 815 # Set testing values if provided 816 if testing_values is not None: 817 custom_definition.set_testing_values(testing_values) 818 819 return custom_definition 820 821 raise NotImplementedError( 822 "Docker custom source definitions are not yet supported. " 823 "Only YAML manifest-based custom sources are currently available." 824 ) 825 826 def list_custom_source_definitions( 827 self, 828 *, 829 definition_type: Literal["yaml", "docker"], 830 ) -> list[CustomCloudSourceDefinition]: 831 """List custom source connector definitions. 832 833 Args: 834 definition_type: Connector type to list ("yaml" or "docker"). Required. 835 836 Returns: 837 List of CustomCloudSourceDefinition objects matching the specified type 838 """ 839 if definition_type == "yaml": 840 yaml_definitions = api_util.list_custom_yaml_source_definitions( 841 workspace_id=self.workspace_id, 842 api_root=self.api_root, 843 client_id=self.client_id, 844 client_secret=self.client_secret, 845 bearer_token=self.bearer_token, 846 ) 847 return [ 848 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 849 for d in yaml_definitions 850 ] 851 852 raise NotImplementedError( 853 "Docker custom source definitions are not yet supported. " 854 "Only YAML manifest-based custom sources are currently available." 855 ) 856 857 def get_custom_source_definition( 858 self, 859 definition_id: str, 860 *, 861 definition_type: Literal["yaml", "docker"], 862 ) -> CustomCloudSourceDefinition: 863 """Get a specific custom source definition by ID. 864 865 Args: 866 definition_id: The definition ID 867 definition_type: Connector type ("yaml" or "docker"). Required. 868 869 Returns: 870 CustomCloudSourceDefinition object 871 """ 872 if definition_type == "yaml": 873 result = api_util.get_custom_yaml_source_definition( 874 workspace_id=self.workspace_id, 875 definition_id=definition_id, 876 api_root=self.api_root, 877 client_id=self.client_id, 878 client_secret=self.client_secret, 879 bearer_token=self.bearer_token, 880 ) 881 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 882 883 raise NotImplementedError( 884 "Docker custom source definitions are not yet supported. " 885 "Only YAML manifest-based custom sources are currently available." 886 )
74@dataclass 75class CloudOrganization: 76 """Information about an organization in Airbyte Cloud. 77 78 This is a minimal value object returned by CloudWorkspace.get_organization(). 79 """ 80 81 organization_id: str 82 """The organization ID.""" 83 84 organization_name: str | None = None 85 """Display name of the organization."""
Information about an organization in Airbyte Cloud.
This is a minimal value object returned by CloudWorkspace.get_organization().
88@dataclass 89class CloudWorkspace: 90 """A remote workspace on the Airbyte Cloud. 91 92 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 93 instances, both OSS and Enterprise. 94 95 Two authentication methods are supported (mutually exclusive): 96 1. OAuth2 client credentials (client_id + client_secret) 97 2. Bearer token authentication 98 99 Example with client credentials: 100 ```python 101 workspace = CloudWorkspace( 102 workspace_id="...", 103 client_id="...", 104 client_secret="...", 105 ) 106 ``` 107 108 Example with bearer token: 109 ```python 110 workspace = CloudWorkspace( 111 workspace_id="...", 112 bearer_token="...", 113 ) 114 ``` 115 """ 116 117 workspace_id: str 118 client_id: SecretString | None = None 119 client_secret: SecretString | None = None 120 api_root: str = api_util.CLOUD_API_ROOT 121 bearer_token: SecretString | None = None 122 123 # Internal credentials object (set in __post_init__, excluded from __init__) 124 _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) 125 126 def __post_init__(self) -> None: 127 """Validate and initialize credentials.""" 128 # Wrap secrets in SecretString if provided 129 if self.client_id is not None: 130 self.client_id = SecretString(self.client_id) 131 if self.client_secret is not None: 132 self.client_secret = SecretString(self.client_secret) 133 if self.bearer_token is not None: 134 self.bearer_token = SecretString(self.bearer_token) 135 136 # Create internal CloudClientConfig object (validates mutual exclusivity) 137 self._credentials = CloudClientConfig( 138 client_id=self.client_id, 139 client_secret=self.client_secret, 140 bearer_token=self.bearer_token, 141 api_root=self.api_root, 142 ) 143 144 @classmethod 145 def from_env( 146 cls, 147 workspace_id: str | None = None, 148 *, 149 api_root: str | None = None, 150 ) -> CloudWorkspace: 151 """Create a CloudWorkspace using credentials from environment variables. 152 153 This factory method resolves credentials from environment variables, 154 providing a convenient way to create a workspace without explicitly 155 passing credentials. 156 157 Two authentication methods are supported (mutually exclusive): 158 1. Bearer token (checked first) 159 2. OAuth2 client credentials (fallback) 160 161 Environment variables used: 162 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 163 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 164 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 165 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 166 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 167 168 Args: 169 workspace_id: The workspace ID. If not provided, will be resolved from 170 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 171 api_root: The API root URL. If not provided, will be resolved from 172 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 173 the Airbyte Cloud API. 174 175 Returns: 176 A CloudWorkspace instance configured with credentials from the environment. 177 178 Raises: 179 PyAirbyteSecretNotFoundError: If required credentials are not found in 180 the environment. 181 182 Example: 183 ```python 184 # With workspace_id from environment 185 workspace = CloudWorkspace.from_env() 186 187 # With explicit workspace_id 188 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 189 ``` 190 """ 191 resolved_api_root = resolve_cloud_api_url(api_root) 192 193 # Try bearer token first 194 bearer_token = resolve_cloud_bearer_token() 195 if bearer_token: 196 return cls( 197 workspace_id=resolve_cloud_workspace_id(workspace_id), 198 bearer_token=bearer_token, 199 api_root=resolved_api_root, 200 ) 201 202 # Fall back to client credentials 203 return cls( 204 workspace_id=resolve_cloud_workspace_id(workspace_id), 205 client_id=resolve_cloud_client_id(), 206 client_secret=resolve_cloud_client_secret(), 207 api_root=resolved_api_root, 208 ) 209 210 @property 211 def workspace_url(self) -> str | None: 212 """The web URL of the workspace.""" 213 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 214 215 @cached_property 216 def _organization_info(self) -> dict[str, Any]: 217 """Fetch and cache organization info for this workspace. 218 219 Uses the Config API endpoint for an efficient O(1) lookup. 220 This is an internal method; use get_organization() for public access. 221 """ 222 return api_util.get_workspace_organization_info( 223 workspace_id=self.workspace_id, 224 api_root=self.api_root, 225 client_id=self.client_id, 226 client_secret=self.client_secret, 227 bearer_token=self.bearer_token, 228 ) 229 230 @overload 231 def get_organization(self) -> CloudOrganization: ... 232 233 @overload 234 def get_organization( 235 self, 236 *, 237 raise_on_error: Literal[True], 238 ) -> CloudOrganization: ... 239 240 @overload 241 def get_organization( 242 self, 243 *, 244 raise_on_error: Literal[False], 245 ) -> CloudOrganization | None: ... 246 247 def get_organization( 248 self, 249 *, 250 raise_on_error: bool = True, 251 ) -> CloudOrganization | None: 252 """Get the organization this workspace belongs to. 253 254 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 255 which may not be available with workspace-scoped credentials. 256 257 Args: 258 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 259 If False, returns None instead of raising. 260 261 Returns: 262 CloudOrganization object with organization_id and organization_name, 263 or None if raise_on_error=False and an error occurred. 264 265 Raises: 266 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 267 (e.g., due to insufficient permissions or missing data). 268 """ 269 try: 270 info = self._organization_info 271 except (AirbyteError, NotImplementedError): 272 if raise_on_error: 273 raise 274 return None 275 276 organization_id = info.get("organizationId") 277 organization_name = info.get("organizationName") 278 279 # Validate that both organization_id and organization_name are non-null and non-empty 280 if not organization_id or not organization_name: 281 if raise_on_error: 282 raise AirbyteError( 283 message="Organization info is incomplete.", 284 context={ 285 "organization_id": organization_id, 286 "organization_name": organization_name, 287 }, 288 ) 289 return None 290 291 return CloudOrganization( 292 organization_id=organization_id, 293 organization_name=organization_name, 294 ) 295 296 # Test connection and creds 297 298 def connect(self) -> None: 299 """Check that the workspace is reachable and raise an exception otherwise. 300 301 Note: It is not necessary to call this method before calling other operations. It 302 serves primarily as a simple check to ensure that the workspace is reachable 303 and credentials are correct. 304 """ 305 _ = api_util.get_workspace( 306 api_root=self.api_root, 307 workspace_id=self.workspace_id, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 print(f"Successfully connected to workspace: {self.workspace_url}") 313 314 # Get sources, destinations, and connections 315 316 def get_connection( 317 self, 318 connection_id: str, 319 ) -> CloudConnection: 320 """Get a connection by ID. 321 322 This method does not fetch data from the API. It returns a `CloudConnection` object, 323 which will be loaded lazily as needed. 324 """ 325 return CloudConnection( 326 workspace=self, 327 connection_id=connection_id, 328 ) 329 330 def get_source( 331 self, 332 source_id: str, 333 ) -> CloudSource: 334 """Get a source by ID. 335 336 This method does not fetch data from the API. It returns a `CloudSource` object, 337 which will be loaded lazily as needed. 338 """ 339 return CloudSource( 340 workspace=self, 341 connector_id=source_id, 342 ) 343 344 def get_destination( 345 self, 346 destination_id: str, 347 ) -> CloudDestination: 348 """Get a destination by ID. 349 350 This method does not fetch data from the API. It returns a `CloudDestination` object, 351 which will be loaded lazily as needed. 352 """ 353 return CloudDestination( 354 workspace=self, 355 connector_id=destination_id, 356 ) 357 358 # Deploy sources and destinations 359 360 def deploy_source( 361 self, 362 name: str, 363 source: Source, 364 *, 365 unique: bool = True, 366 random_name_suffix: bool = False, 367 ) -> CloudSource: 368 """Deploy a source to the workspace. 369 370 Returns the newly deployed source. 371 372 Args: 373 name: The name to use when deploying. 374 source: The source object to deploy. 375 unique: Whether to require a unique name. If `True`, duplicate names 376 are not allowed. Defaults to `True`. 377 random_name_suffix: Whether to append a random suffix to the name. 378 """ 379 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 380 source_config_dict["sourceType"] = source.name.replace("source-", "") 381 382 if random_name_suffix: 383 name += f" (ID: {text_util.generate_random_suffix()})" 384 385 if unique: 386 existing = self.list_sources(name=name) 387 if existing: 388 raise exc.AirbyteDuplicateResourcesError( 389 resource_type="source", 390 resource_name=name, 391 ) 392 393 deployed_source = api_util.create_source( 394 name=name, 395 api_root=self.api_root, 396 workspace_id=self.workspace_id, 397 config=source_config_dict, 398 client_id=self.client_id, 399 client_secret=self.client_secret, 400 bearer_token=self.bearer_token, 401 ) 402 return CloudSource( 403 workspace=self, 404 connector_id=deployed_source.source_id, 405 ) 406 407 def deploy_destination( 408 self, 409 name: str, 410 destination: Destination | dict[str, Any], 411 *, 412 unique: bool = True, 413 random_name_suffix: bool = False, 414 ) -> CloudDestination: 415 """Deploy a destination to the workspace. 416 417 Returns the newly deployed destination ID. 418 419 Args: 420 name: The name to use when deploying. 421 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 422 dictionary of configuration values. 423 unique: Whether to require a unique name. If `True`, duplicate names 424 are not allowed. Defaults to `True`. 425 random_name_suffix: Whether to append a random suffix to the name. 426 """ 427 if isinstance(destination, Destination): 428 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 429 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 430 # raise ValueError(destination_conf_dict) 431 else: 432 destination_conf_dict = destination.copy() 433 if "destinationType" not in destination_conf_dict: 434 raise exc.PyAirbyteInputError( 435 message="Missing `destinationType` in configuration dictionary.", 436 ) 437 438 if random_name_suffix: 439 name += f" (ID: {text_util.generate_random_suffix()})" 440 441 if unique: 442 existing = self.list_destinations(name=name) 443 if existing: 444 raise exc.AirbyteDuplicateResourcesError( 445 resource_type="destination", 446 resource_name=name, 447 ) 448 449 deployed_destination = api_util.create_destination( 450 name=name, 451 api_root=self.api_root, 452 workspace_id=self.workspace_id, 453 config=destination_conf_dict, # Wants a dataclass but accepts dict 454 client_id=self.client_id, 455 client_secret=self.client_secret, 456 bearer_token=self.bearer_token, 457 ) 458 return CloudDestination( 459 workspace=self, 460 connector_id=deployed_destination.destination_id, 461 ) 462 463 def permanently_delete_source( 464 self, 465 source: str | CloudSource, 466 *, 467 safe_mode: bool = True, 468 ) -> None: 469 """Delete a source from the workspace. 470 471 You can pass either the source ID `str` or a deployed `Source` object. 472 473 Args: 474 source: The source ID or CloudSource object to delete 475 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 476 (case insensitive) to prevent accidental deletion. Defaults to True. 477 """ 478 if not isinstance(source, (str, CloudSource)): 479 raise exc.PyAirbyteInputError( 480 message="Invalid source type.", 481 input_value=type(source).__name__, 482 ) 483 484 api_util.delete_source( 485 source_id=source.connector_id if isinstance(source, CloudSource) else source, 486 source_name=source.name if isinstance(source, CloudSource) else None, 487 api_root=self.api_root, 488 client_id=self.client_id, 489 client_secret=self.client_secret, 490 bearer_token=self.bearer_token, 491 safe_mode=safe_mode, 492 ) 493 494 # Deploy and delete destinations 495 496 def permanently_delete_destination( 497 self, 498 destination: str | CloudDestination, 499 *, 500 safe_mode: bool = True, 501 ) -> None: 502 """Delete a deployed destination from the workspace. 503 504 You can pass either the `Cache` class or the deployed destination ID as a `str`. 505 506 Args: 507 destination: The destination ID or CloudDestination object to delete 508 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 509 (case insensitive) to prevent accidental deletion. Defaults to True. 510 """ 511 if not isinstance(destination, (str, CloudDestination)): 512 raise exc.PyAirbyteInputError( 513 message="Invalid destination type.", 514 input_value=type(destination).__name__, 515 ) 516 517 api_util.delete_destination( 518 destination_id=( 519 destination if isinstance(destination, str) else destination.destination_id 520 ), 521 destination_name=( 522 destination.name if isinstance(destination, CloudDestination) else None 523 ), 524 api_root=self.api_root, 525 client_id=self.client_id, 526 client_secret=self.client_secret, 527 bearer_token=self.bearer_token, 528 safe_mode=safe_mode, 529 ) 530 531 # Deploy and delete connections 532 533 def deploy_connection( 534 self, 535 connection_name: str, 536 *, 537 source: CloudSource | str, 538 selected_streams: list[str], 539 destination: CloudDestination | str, 540 table_prefix: str | None = None, 541 ) -> CloudConnection: 542 """Create a new connection between an already deployed source and destination. 543 544 Returns the newly deployed connection object. 545 546 Args: 547 connection_name: The name of the connection. 548 source: The deployed source. You can pass a source ID or a CloudSource object. 549 destination: The deployed destination. You can pass a destination ID or a 550 CloudDestination object. 551 table_prefix: Optional. The table prefix to use when syncing to the destination. 552 selected_streams: The selected stream names to sync within the connection. 553 """ 554 if not selected_streams: 555 raise exc.PyAirbyteInputError( 556 guidance="You must provide `selected_streams` when creating a connection." 557 ) 558 559 source_id: str = source if isinstance(source, str) else source.connector_id 560 destination_id: str = ( 561 destination if isinstance(destination, str) else destination.connector_id 562 ) 563 564 deployed_connection = api_util.create_connection( 565 name=connection_name, 566 source_id=source_id, 567 destination_id=destination_id, 568 api_root=self.api_root, 569 workspace_id=self.workspace_id, 570 selected_stream_names=selected_streams, 571 prefix=table_prefix or "", 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 bearer_token=self.bearer_token, 575 ) 576 577 return CloudConnection( 578 workspace=self, 579 connection_id=deployed_connection.connection_id, 580 source=deployed_connection.source_id, 581 destination=deployed_connection.destination_id, 582 ) 583 584 def permanently_delete_connection( 585 self, 586 connection: str | CloudConnection, 587 *, 588 cascade_delete_source: bool = False, 589 cascade_delete_destination: bool = False, 590 safe_mode: bool = True, 591 ) -> None: 592 """Delete a deployed connection from the workspace. 593 594 Args: 595 connection: The connection ID or CloudConnection object to delete 596 cascade_delete_source: If True, also delete the source after deleting the connection 597 cascade_delete_destination: If True, also delete the destination after deleting 598 the connection 599 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 600 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 601 to cascade deletes. 602 """ 603 if connection is None: 604 raise ValueError("No connection ID provided.") 605 606 if isinstance(connection, str): 607 connection = CloudConnection( 608 workspace=self, 609 connection_id=connection, 610 ) 611 612 api_util.delete_connection( 613 connection_id=connection.connection_id, 614 connection_name=connection.name, 615 api_root=self.api_root, 616 workspace_id=self.workspace_id, 617 client_id=self.client_id, 618 client_secret=self.client_secret, 619 bearer_token=self.bearer_token, 620 safe_mode=safe_mode, 621 ) 622 623 if cascade_delete_source: 624 self.permanently_delete_source( 625 source=connection.source_id, 626 safe_mode=safe_mode, 627 ) 628 if cascade_delete_destination: 629 self.permanently_delete_destination( 630 destination=connection.destination_id, 631 safe_mode=safe_mode, 632 ) 633 634 # List sources, destinations, and connections 635 636 def list_connections( 637 self, 638 name: str | None = None, 639 *, 640 name_filter: Callable | None = None, 641 ) -> list[CloudConnection]: 642 """List connections by name in the workspace. 643 644 TODO: Add pagination support 645 """ 646 connections = api_util.list_connections( 647 api_root=self.api_root, 648 workspace_id=self.workspace_id, 649 name=name, 650 name_filter=name_filter, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 ) 655 return [ 656 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 657 workspace=self, 658 connection_response=connection, 659 ) 660 for connection in connections 661 if name is None or connection.name == name 662 ] 663 664 def list_sources( 665 self, 666 name: str | None = None, 667 *, 668 name_filter: Callable | None = None, 669 ) -> list[CloudSource]: 670 """List all sources in the workspace. 671 672 TODO: Add pagination support 673 """ 674 sources = api_util.list_sources( 675 api_root=self.api_root, 676 workspace_id=self.workspace_id, 677 name=name, 678 name_filter=name_filter, 679 client_id=self.client_id, 680 client_secret=self.client_secret, 681 bearer_token=self.bearer_token, 682 ) 683 return [ 684 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 685 workspace=self, 686 source_response=source, 687 ) 688 for source in sources 689 if name is None or source.name == name 690 ] 691 692 def list_destinations( 693 self, 694 name: str | None = None, 695 *, 696 name_filter: Callable | None = None, 697 ) -> list[CloudDestination]: 698 """List all destinations in the workspace. 699 700 TODO: Add pagination support 701 """ 702 destinations = api_util.list_destinations( 703 api_root=self.api_root, 704 workspace_id=self.workspace_id, 705 name=name, 706 name_filter=name_filter, 707 client_id=self.client_id, 708 client_secret=self.client_secret, 709 bearer_token=self.bearer_token, 710 ) 711 return [ 712 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 713 workspace=self, 714 destination_response=destination, 715 ) 716 for destination in destinations 717 if name is None or destination.name == name 718 ] 719 720 def publish_custom_source_definition( 721 self, 722 name: str, 723 *, 724 manifest_yaml: dict[str, Any] | Path | str | None = None, 725 docker_image: str | None = None, 726 docker_tag: str | None = None, 727 unique: bool = True, 728 pre_validate: bool = True, 729 testing_values: dict[str, Any] | None = None, 730 ) -> CustomCloudSourceDefinition: 731 """Publish a custom source connector definition. 732 733 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 734 and docker_tag (for Docker connectors), but not both. 735 736 Args: 737 name: Display name for the connector definition 738 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 739 docker_image: Docker repository (e.g., 'airbyte/source-custom') 740 docker_tag: Docker image tag (e.g., '1.0.0') 741 unique: Whether to enforce name uniqueness 742 pre_validate: Whether to validate manifest client-side (YAML only) 743 testing_values: Optional configuration values to use for testing in the 744 Connector Builder UI. If provided, these values are stored as the complete 745 testing values object for the connector builder project (replaces any existing 746 values), allowing immediate test read operations. 747 748 Returns: 749 CustomCloudSourceDefinition object representing the created definition 750 751 Raises: 752 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 753 AirbyteDuplicateResourcesError: If unique=True and name already exists 754 """ 755 is_yaml = manifest_yaml is not None 756 is_docker = docker_image is not None 757 758 if is_yaml == is_docker: 759 raise exc.PyAirbyteInputError( 760 message=( 761 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 762 "docker_image + docker_tag (for Docker connectors), but not both" 763 ), 764 context={ 765 "manifest_yaml_provided": is_yaml, 766 "docker_image_provided": is_docker, 767 }, 768 ) 769 770 if is_docker and docker_tag is None: 771 raise exc.PyAirbyteInputError( 772 message="docker_tag is required when docker_image is specified", 773 context={"docker_image": docker_image}, 774 ) 775 776 if unique: 777 existing = self.list_custom_source_definitions( 778 definition_type="yaml" if is_yaml else "docker", 779 ) 780 if any(d.name == name for d in existing): 781 raise exc.AirbyteDuplicateResourcesError( 782 resource_type="custom_source_definition", 783 resource_name=name, 784 ) 785 786 if is_yaml: 787 manifest_dict: dict[str, Any] 788 if isinstance(manifest_yaml, Path): 789 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 790 elif isinstance(manifest_yaml, str): 791 manifest_dict = yaml.safe_load(manifest_yaml) 792 elif manifest_yaml is not None: 793 manifest_dict = manifest_yaml 794 else: 795 raise exc.PyAirbyteInputError( 796 message="manifest_yaml is required for YAML connectors", 797 context={"name": name}, 798 ) 799 800 if pre_validate: 801 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 802 803 result = api_util.create_custom_yaml_source_definition( 804 name=name, 805 workspace_id=self.workspace_id, 806 manifest=manifest_dict, 807 api_root=self.api_root, 808 client_id=self.client_id, 809 client_secret=self.client_secret, 810 bearer_token=self.bearer_token, 811 ) 812 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 813 self, result 814 ) 815 816 # Set testing values if provided 817 if testing_values is not None: 818 custom_definition.set_testing_values(testing_values) 819 820 return custom_definition 821 822 raise NotImplementedError( 823 "Docker custom source definitions are not yet supported. " 824 "Only YAML manifest-based custom sources are currently available." 825 ) 826 827 def list_custom_source_definitions( 828 self, 829 *, 830 definition_type: Literal["yaml", "docker"], 831 ) -> list[CustomCloudSourceDefinition]: 832 """List custom source connector definitions. 833 834 Args: 835 definition_type: Connector type to list ("yaml" or "docker"). Required. 836 837 Returns: 838 List of CustomCloudSourceDefinition objects matching the specified type 839 """ 840 if definition_type == "yaml": 841 yaml_definitions = api_util.list_custom_yaml_source_definitions( 842 workspace_id=self.workspace_id, 843 api_root=self.api_root, 844 client_id=self.client_id, 845 client_secret=self.client_secret, 846 bearer_token=self.bearer_token, 847 ) 848 return [ 849 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 850 for d in yaml_definitions 851 ] 852 853 raise NotImplementedError( 854 "Docker custom source definitions are not yet supported. " 855 "Only YAML manifest-based custom sources are currently available." 856 ) 857 858 def get_custom_source_definition( 859 self, 860 definition_id: str, 861 *, 862 definition_type: Literal["yaml", "docker"], 863 ) -> CustomCloudSourceDefinition: 864 """Get a specific custom source definition by ID. 865 866 Args: 867 definition_id: The definition ID 868 definition_type: Connector type ("yaml" or "docker"). Required. 869 870 Returns: 871 CustomCloudSourceDefinition object 872 """ 873 if definition_type == "yaml": 874 result = api_util.get_custom_yaml_source_definition( 875 workspace_id=self.workspace_id, 876 definition_id=definition_id, 877 api_root=self.api_root, 878 client_id=self.client_id, 879 client_secret=self.client_secret, 880 bearer_token=self.bearer_token, 881 ) 882 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
Two authentication methods are supported (mutually exclusive):
- OAuth2 client credentials (client_id + client_secret)
- Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", )
Example with bearer token:
workspace = CloudWorkspace( workspace_id="...", bearer_token="...", )
144 @classmethod 145 def from_env( 146 cls, 147 workspace_id: str | None = None, 148 *, 149 api_root: str | None = None, 150 ) -> CloudWorkspace: 151 """Create a CloudWorkspace using credentials from environment variables. 152 153 This factory method resolves credentials from environment variables, 154 providing a convenient way to create a workspace without explicitly 155 passing credentials. 156 157 Two authentication methods are supported (mutually exclusive): 158 1. Bearer token (checked first) 159 2. OAuth2 client credentials (fallback) 160 161 Environment variables used: 162 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 163 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 164 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 165 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 166 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 167 168 Args: 169 workspace_id: The workspace ID. If not provided, will be resolved from 170 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 171 api_root: The API root URL. If not provided, will be resolved from 172 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 173 the Airbyte Cloud API. 174 175 Returns: 176 A CloudWorkspace instance configured with credentials from the environment. 177 178 Raises: 179 PyAirbyteSecretNotFoundError: If required credentials are not found in 180 the environment. 181 182 Example: 183 ```python 184 # With workspace_id from environment 185 workspace = CloudWorkspace.from_env() 186 187 # With explicit workspace_id 188 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 189 ``` 190 """ 191 resolved_api_root = resolve_cloud_api_url(api_root) 192 193 # Try bearer token first 194 bearer_token = resolve_cloud_bearer_token() 195 if bearer_token: 196 return cls( 197 workspace_id=resolve_cloud_workspace_id(workspace_id), 198 bearer_token=bearer_token, 199 api_root=resolved_api_root, 200 ) 201 202 # Fall back to client credentials 203 return cls( 204 workspace_id=resolve_cloud_workspace_id(workspace_id), 205 client_id=resolve_cloud_client_id(), 206 client_secret=resolve_cloud_client_secret(), 207 api_root=resolved_api_root, 208 )
Create a CloudWorkspace using credentials from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.
Two authentication methods are supported (mutually exclusive):
- Bearer token (checked first)
- OAuth2 client credentials (fallback)
Environment variables used:
AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
- workspace_id: The workspace ID. If not provided, will be resolved from
the
AIRBYTE_CLOUD_WORKSPACE_IDenvironment variable. - api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API.
Returns:
A CloudWorkspace instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
210 @property 211 def workspace_url(self) -> str | None: 212 """The web URL of the workspace.""" 213 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
247 def get_organization( 248 self, 249 *, 250 raise_on_error: bool = True, 251 ) -> CloudOrganization | None: 252 """Get the organization this workspace belongs to. 253 254 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 255 which may not be available with workspace-scoped credentials. 256 257 Args: 258 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 259 If False, returns None instead of raising. 260 261 Returns: 262 CloudOrganization object with organization_id and organization_name, 263 or None if raise_on_error=False and an error occurred. 264 265 Raises: 266 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 267 (e.g., due to insufficient permissions or missing data). 268 """ 269 try: 270 info = self._organization_info 271 except (AirbyteError, NotImplementedError): 272 if raise_on_error: 273 raise 274 return None 275 276 organization_id = info.get("organizationId") 277 organization_name = info.get("organizationName") 278 279 # Validate that both organization_id and organization_name are non-null and non-empty 280 if not organization_id or not organization_name: 281 if raise_on_error: 282 raise AirbyteError( 283 message="Organization info is incomplete.", 284 context={ 285 "organization_id": organization_id, 286 "organization_name": organization_name, 287 }, 288 ) 289 return None 290 291 return CloudOrganization( 292 organization_id=organization_id, 293 organization_name=organization_name, 294 )
Get the organization this workspace belongs to.
Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.
Arguments:
- raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:
CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.
Raises:
- AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
298 def connect(self) -> None: 299 """Check that the workspace is reachable and raise an exception otherwise. 300 301 Note: It is not necessary to call this method before calling other operations. It 302 serves primarily as a simple check to ensure that the workspace is reachable 303 and credentials are correct. 304 """ 305 _ = api_util.get_workspace( 306 api_root=self.api_root, 307 workspace_id=self.workspace_id, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
316 def get_connection( 317 self, 318 connection_id: str, 319 ) -> CloudConnection: 320 """Get a connection by ID. 321 322 This method does not fetch data from the API. It returns a `CloudConnection` object, 323 which will be loaded lazily as needed. 324 """ 325 return CloudConnection( 326 workspace=self, 327 connection_id=connection_id, 328 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
330 def get_source( 331 self, 332 source_id: str, 333 ) -> CloudSource: 334 """Get a source by ID. 335 336 This method does not fetch data from the API. It returns a `CloudSource` object, 337 which will be loaded lazily as needed. 338 """ 339 return CloudSource( 340 workspace=self, 341 connector_id=source_id, 342 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
344 def get_destination( 345 self, 346 destination_id: str, 347 ) -> CloudDestination: 348 """Get a destination by ID. 349 350 This method does not fetch data from the API. It returns a `CloudDestination` object, 351 which will be loaded lazily as needed. 352 """ 353 return CloudDestination( 354 workspace=self, 355 connector_id=destination_id, 356 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
360 def deploy_source( 361 self, 362 name: str, 363 source: Source, 364 *, 365 unique: bool = True, 366 random_name_suffix: bool = False, 367 ) -> CloudSource: 368 """Deploy a source to the workspace. 369 370 Returns the newly deployed source. 371 372 Args: 373 name: The name to use when deploying. 374 source: The source object to deploy. 375 unique: Whether to require a unique name. If `True`, duplicate names 376 are not allowed. Defaults to `True`. 377 random_name_suffix: Whether to append a random suffix to the name. 378 """ 379 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 380 source_config_dict["sourceType"] = source.name.replace("source-", "") 381 382 if random_name_suffix: 383 name += f" (ID: {text_util.generate_random_suffix()})" 384 385 if unique: 386 existing = self.list_sources(name=name) 387 if existing: 388 raise exc.AirbyteDuplicateResourcesError( 389 resource_type="source", 390 resource_name=name, 391 ) 392 393 deployed_source = api_util.create_source( 394 name=name, 395 api_root=self.api_root, 396 workspace_id=self.workspace_id, 397 config=source_config_dict, 398 client_id=self.client_id, 399 client_secret=self.client_secret, 400 bearer_token=self.bearer_token, 401 ) 402 return CloudSource( 403 workspace=self, 404 connector_id=deployed_source.source_id, 405 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
407 def deploy_destination( 408 self, 409 name: str, 410 destination: Destination | dict[str, Any], 411 *, 412 unique: bool = True, 413 random_name_suffix: bool = False, 414 ) -> CloudDestination: 415 """Deploy a destination to the workspace. 416 417 Returns the newly deployed destination ID. 418 419 Args: 420 name: The name to use when deploying. 421 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 422 dictionary of configuration values. 423 unique: Whether to require a unique name. If `True`, duplicate names 424 are not allowed. Defaults to `True`. 425 random_name_suffix: Whether to append a random suffix to the name. 426 """ 427 if isinstance(destination, Destination): 428 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 429 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 430 # raise ValueError(destination_conf_dict) 431 else: 432 destination_conf_dict = destination.copy() 433 if "destinationType" not in destination_conf_dict: 434 raise exc.PyAirbyteInputError( 435 message="Missing `destinationType` in configuration dictionary.", 436 ) 437 438 if random_name_suffix: 439 name += f" (ID: {text_util.generate_random_suffix()})" 440 441 if unique: 442 existing = self.list_destinations(name=name) 443 if existing: 444 raise exc.AirbyteDuplicateResourcesError( 445 resource_type="destination", 446 resource_name=name, 447 ) 448 449 deployed_destination = api_util.create_destination( 450 name=name, 451 api_root=self.api_root, 452 workspace_id=self.workspace_id, 453 config=destination_conf_dict, # Wants a dataclass but accepts dict 454 client_id=self.client_id, 455 client_secret=self.client_secret, 456 bearer_token=self.bearer_token, 457 ) 458 return CloudDestination( 459 workspace=self, 460 connector_id=deployed_destination.destination_id, 461 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
463 def permanently_delete_source( 464 self, 465 source: str | CloudSource, 466 *, 467 safe_mode: bool = True, 468 ) -> None: 469 """Delete a source from the workspace. 470 471 You can pass either the source ID `str` or a deployed `Source` object. 472 473 Args: 474 source: The source ID or CloudSource object to delete 475 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 476 (case insensitive) to prevent accidental deletion. Defaults to True. 477 """ 478 if not isinstance(source, (str, CloudSource)): 479 raise exc.PyAirbyteInputError( 480 message="Invalid source type.", 481 input_value=type(source).__name__, 482 ) 483 484 api_util.delete_source( 485 source_id=source.connector_id if isinstance(source, CloudSource) else source, 486 source_name=source.name if isinstance(source, CloudSource) else None, 487 api_root=self.api_root, 488 client_id=self.client_id, 489 client_secret=self.client_secret, 490 bearer_token=self.bearer_token, 491 safe_mode=safe_mode, 492 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
496 def permanently_delete_destination( 497 self, 498 destination: str | CloudDestination, 499 *, 500 safe_mode: bool = True, 501 ) -> None: 502 """Delete a deployed destination from the workspace. 503 504 You can pass either the `Cache` class or the deployed destination ID as a `str`. 505 506 Args: 507 destination: The destination ID or CloudDestination object to delete 508 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 509 (case insensitive) to prevent accidental deletion. Defaults to True. 510 """ 511 if not isinstance(destination, (str, CloudDestination)): 512 raise exc.PyAirbyteInputError( 513 message="Invalid destination type.", 514 input_value=type(destination).__name__, 515 ) 516 517 api_util.delete_destination( 518 destination_id=( 519 destination if isinstance(destination, str) else destination.destination_id 520 ), 521 destination_name=( 522 destination.name if isinstance(destination, CloudDestination) else None 523 ), 524 api_root=self.api_root, 525 client_id=self.client_id, 526 client_secret=self.client_secret, 527 bearer_token=self.bearer_token, 528 safe_mode=safe_mode, 529 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
533 def deploy_connection( 534 self, 535 connection_name: str, 536 *, 537 source: CloudSource | str, 538 selected_streams: list[str], 539 destination: CloudDestination | str, 540 table_prefix: str | None = None, 541 ) -> CloudConnection: 542 """Create a new connection between an already deployed source and destination. 543 544 Returns the newly deployed connection object. 545 546 Args: 547 connection_name: The name of the connection. 548 source: The deployed source. You can pass a source ID or a CloudSource object. 549 destination: The deployed destination. You can pass a destination ID or a 550 CloudDestination object. 551 table_prefix: Optional. The table prefix to use when syncing to the destination. 552 selected_streams: The selected stream names to sync within the connection. 553 """ 554 if not selected_streams: 555 raise exc.PyAirbyteInputError( 556 guidance="You must provide `selected_streams` when creating a connection." 557 ) 558 559 source_id: str = source if isinstance(source, str) else source.connector_id 560 destination_id: str = ( 561 destination if isinstance(destination, str) else destination.connector_id 562 ) 563 564 deployed_connection = api_util.create_connection( 565 name=connection_name, 566 source_id=source_id, 567 destination_id=destination_id, 568 api_root=self.api_root, 569 workspace_id=self.workspace_id, 570 selected_stream_names=selected_streams, 571 prefix=table_prefix or "", 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 bearer_token=self.bearer_token, 575 ) 576 577 return CloudConnection( 578 workspace=self, 579 connection_id=deployed_connection.connection_id, 580 source=deployed_connection.source_id, 581 destination=deployed_connection.destination_id, 582 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
584 def permanently_delete_connection( 585 self, 586 connection: str | CloudConnection, 587 *, 588 cascade_delete_source: bool = False, 589 cascade_delete_destination: bool = False, 590 safe_mode: bool = True, 591 ) -> None: 592 """Delete a deployed connection from the workspace. 593 594 Args: 595 connection: The connection ID or CloudConnection object to delete 596 cascade_delete_source: If True, also delete the source after deleting the connection 597 cascade_delete_destination: If True, also delete the destination after deleting 598 the connection 599 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 600 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 601 to cascade deletes. 602 """ 603 if connection is None: 604 raise ValueError("No connection ID provided.") 605 606 if isinstance(connection, str): 607 connection = CloudConnection( 608 workspace=self, 609 connection_id=connection, 610 ) 611 612 api_util.delete_connection( 613 connection_id=connection.connection_id, 614 connection_name=connection.name, 615 api_root=self.api_root, 616 workspace_id=self.workspace_id, 617 client_id=self.client_id, 618 client_secret=self.client_secret, 619 bearer_token=self.bearer_token, 620 safe_mode=safe_mode, 621 ) 622 623 if cascade_delete_source: 624 self.permanently_delete_source( 625 source=connection.source_id, 626 safe_mode=safe_mode, 627 ) 628 if cascade_delete_destination: 629 self.permanently_delete_destination( 630 destination=connection.destination_id, 631 safe_mode=safe_mode, 632 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
636 def list_connections( 637 self, 638 name: str | None = None, 639 *, 640 name_filter: Callable | None = None, 641 ) -> list[CloudConnection]: 642 """List connections by name in the workspace. 643 644 TODO: Add pagination support 645 """ 646 connections = api_util.list_connections( 647 api_root=self.api_root, 648 workspace_id=self.workspace_id, 649 name=name, 650 name_filter=name_filter, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 ) 655 return [ 656 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 657 workspace=self, 658 connection_response=connection, 659 ) 660 for connection in connections 661 if name is None or connection.name == name 662 ]
List connections by name in the workspace.
TODO: Add pagination support
664 def list_sources( 665 self, 666 name: str | None = None, 667 *, 668 name_filter: Callable | None = None, 669 ) -> list[CloudSource]: 670 """List all sources in the workspace. 671 672 TODO: Add pagination support 673 """ 674 sources = api_util.list_sources( 675 api_root=self.api_root, 676 workspace_id=self.workspace_id, 677 name=name, 678 name_filter=name_filter, 679 client_id=self.client_id, 680 client_secret=self.client_secret, 681 bearer_token=self.bearer_token, 682 ) 683 return [ 684 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 685 workspace=self, 686 source_response=source, 687 ) 688 for source in sources 689 if name is None or source.name == name 690 ]
List all sources in the workspace.
TODO: Add pagination support
692 def list_destinations( 693 self, 694 name: str | None = None, 695 *, 696 name_filter: Callable | None = None, 697 ) -> list[CloudDestination]: 698 """List all destinations in the workspace. 699 700 TODO: Add pagination support 701 """ 702 destinations = api_util.list_destinations( 703 api_root=self.api_root, 704 workspace_id=self.workspace_id, 705 name=name, 706 name_filter=name_filter, 707 client_id=self.client_id, 708 client_secret=self.client_secret, 709 bearer_token=self.bearer_token, 710 ) 711 return [ 712 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 713 workspace=self, 714 destination_response=destination, 715 ) 716 for destination in destinations 717 if name is None or destination.name == name 718 ]
List all destinations in the workspace.
TODO: Add pagination support
720 def publish_custom_source_definition( 721 self, 722 name: str, 723 *, 724 manifest_yaml: dict[str, Any] | Path | str | None = None, 725 docker_image: str | None = None, 726 docker_tag: str | None = None, 727 unique: bool = True, 728 pre_validate: bool = True, 729 testing_values: dict[str, Any] | None = None, 730 ) -> CustomCloudSourceDefinition: 731 """Publish a custom source connector definition. 732 733 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 734 and docker_tag (for Docker connectors), but not both. 735 736 Args: 737 name: Display name for the connector definition 738 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 739 docker_image: Docker repository (e.g., 'airbyte/source-custom') 740 docker_tag: Docker image tag (e.g., '1.0.0') 741 unique: Whether to enforce name uniqueness 742 pre_validate: Whether to validate manifest client-side (YAML only) 743 testing_values: Optional configuration values to use for testing in the 744 Connector Builder UI. If provided, these values are stored as the complete 745 testing values object for the connector builder project (replaces any existing 746 values), allowing immediate test read operations. 747 748 Returns: 749 CustomCloudSourceDefinition object representing the created definition 750 751 Raises: 752 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 753 AirbyteDuplicateResourcesError: If unique=True and name already exists 754 """ 755 is_yaml = manifest_yaml is not None 756 is_docker = docker_image is not None 757 758 if is_yaml == is_docker: 759 raise exc.PyAirbyteInputError( 760 message=( 761 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 762 "docker_image + docker_tag (for Docker connectors), but not both" 763 ), 764 context={ 765 "manifest_yaml_provided": is_yaml, 766 "docker_image_provided": is_docker, 767 }, 768 ) 769 770 if is_docker and docker_tag is None: 771 raise exc.PyAirbyteInputError( 772 message="docker_tag is required when docker_image is specified", 773 context={"docker_image": docker_image}, 774 ) 775 776 if unique: 777 existing = self.list_custom_source_definitions( 778 definition_type="yaml" if is_yaml else "docker", 779 ) 780 if any(d.name == name for d in existing): 781 raise exc.AirbyteDuplicateResourcesError( 782 resource_type="custom_source_definition", 783 resource_name=name, 784 ) 785 786 if is_yaml: 787 manifest_dict: dict[str, Any] 788 if isinstance(manifest_yaml, Path): 789 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 790 elif isinstance(manifest_yaml, str): 791 manifest_dict = yaml.safe_load(manifest_yaml) 792 elif manifest_yaml is not None: 793 manifest_dict = manifest_yaml 794 else: 795 raise exc.PyAirbyteInputError( 796 message="manifest_yaml is required for YAML connectors", 797 context={"name": name}, 798 ) 799 800 if pre_validate: 801 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 802 803 result = api_util.create_custom_yaml_source_definition( 804 name=name, 805 workspace_id=self.workspace_id, 806 manifest=manifest_dict, 807 api_root=self.api_root, 808 client_id=self.client_id, 809 client_secret=self.client_secret, 810 bearer_token=self.bearer_token, 811 ) 812 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 813 self, result 814 ) 815 816 # Set testing values if provided 817 if testing_values is not None: 818 custom_definition.set_testing_values(testing_values) 819 820 return custom_definition 821 822 raise NotImplementedError( 823 "Docker custom source definitions are not yet supported. " 824 "Only YAML manifest-based custom sources are currently available." 825 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
827 def list_custom_source_definitions( 828 self, 829 *, 830 definition_type: Literal["yaml", "docker"], 831 ) -> list[CustomCloudSourceDefinition]: 832 """List custom source connector definitions. 833 834 Args: 835 definition_type: Connector type to list ("yaml" or "docker"). Required. 836 837 Returns: 838 List of CustomCloudSourceDefinition objects matching the specified type 839 """ 840 if definition_type == "yaml": 841 yaml_definitions = api_util.list_custom_yaml_source_definitions( 842 workspace_id=self.workspace_id, 843 api_root=self.api_root, 844 client_id=self.client_id, 845 client_secret=self.client_secret, 846 bearer_token=self.bearer_token, 847 ) 848 return [ 849 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 850 for d in yaml_definitions 851 ] 852 853 raise NotImplementedError( 854 "Docker custom source definitions are not yet supported. " 855 "Only YAML manifest-based custom sources are currently available." 856 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
858 def get_custom_source_definition( 859 self, 860 definition_id: str, 861 *, 862 definition_type: Literal["yaml", "docker"], 863 ) -> CustomCloudSourceDefinition: 864 """Get a specific custom source definition by ID. 865 866 Args: 867 definition_id: The definition ID 868 definition_type: Connector type ("yaml" or "docker"). Required. 869 870 Returns: 871 CustomCloudSourceDefinition object 872 """ 873 if definition_type == "yaml": 874 result = api_util.get_custom_yaml_source_definition( 875 workspace_id=self.workspace_id, 876 definition_id=definition_id, 877 api_root=self.api_root, 878 client_id=self.client_id, 879 client_secret=self.client_secret, 880 bearer_token=self.bearer_token, 881 ) 882 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object