airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
   1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
   2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
   3
   4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
   5both OSS and Enterprise.
   6
   7## Usage Examples
   8
   9Get a new workspace object and deploy a source to it:
  10
  11```python
  12import airbyte as ab
  13from airbyte import cloud
  14
  15workspace = cloud.CloudWorkspace(
  16    workspace_id="...",
  17    client_id="...",
  18    client_secret="...",
  19)
  20
  21# Deploy a source to the workspace
  22source = ab.get_source("source-faker", config={"count": 100})
  23deployed_source = workspace.deploy_source(
  24    name="test-source",
  25    source=source,
  26)
  27
  28# Run a check on the deployed source and raise an exception if the check fails
  29check_result = deployed_source.check(raise_on_error=True)
  30
  31# Permanently delete the newly-created source
  32workspace.permanently_delete_source(deployed_source)
  33```
  34"""
  35
  36from __future__ import annotations
  37
  38from dataclasses import dataclass, field
  39from functools import cached_property
  40from pathlib import Path
  41from typing import TYPE_CHECKING, Any, Literal, overload
  42
  43import yaml
  44
  45from airbyte import exceptions as exc
  46from airbyte._util import api_util, text_util
  47from airbyte._util.api_util import get_web_url_root
  48from airbyte.cloud.auth import (
  49    resolve_cloud_api_url,
  50    resolve_cloud_bearer_token,
  51    resolve_cloud_client_id,
  52    resolve_cloud_client_secret,
  53    resolve_cloud_workspace_id,
  54)
  55from airbyte.cloud.client_config import CloudClientConfig
  56from airbyte.cloud.connections import CloudConnection
  57from airbyte.cloud.connectors import (
  58    CloudDestination,
  59    CloudSource,
  60    CustomCloudSourceDefinition,
  61)
  62from airbyte.destinations.base import Destination
  63from airbyte.exceptions import AirbyteError
  64from airbyte.secrets.base import SecretString
  65
  66
  67if TYPE_CHECKING:
  68    from collections.abc import Callable
  69
  70    from airbyte.sources.base import Source
  71
  72
  73class CloudOrganization:
  74    """Information about an organization in Airbyte Cloud.
  75
  76    This class provides lazy loading of organization attributes including billing status.
  77    It is typically created via CloudWorkspace.get_organization().
  78    """
  79
  80    def __init__(
  81        self,
  82        organization_id: str,
  83        organization_name: str | None = None,
  84        email: str | None = None,
  85        *,
  86        api_root: str = api_util.CLOUD_API_ROOT,
  87        client_id: SecretString | None = None,
  88        client_secret: SecretString | None = None,
  89        bearer_token: SecretString | None = None,
  90    ) -> None:
  91        """Initialize a CloudOrganization.
  92
  93        Args:
  94            organization_id: The organization ID.
  95            organization_name: Display name of the organization.
  96            email: Email associated with the organization.
  97            api_root: The API root URL.
  98            client_id: OAuth client ID for authentication.
  99            client_secret: OAuth client secret for authentication.
 100            bearer_token: Bearer token for authentication (alternative to client credentials).
 101        """
 102        self.organization_id = organization_id
 103        """The organization ID."""
 104
 105        self._organization_name = organization_name
 106        """Display name of the organization."""
 107
 108        self._email = email
 109        """Email associated with the organization."""
 110
 111        self._api_root = api_root
 112        self._client_id = client_id
 113        self._client_secret = client_secret
 114        self._bearer_token = bearer_token
 115
 116        # Cached organization info (billing, etc.)
 117        self._organization_info: dict[str, Any] | None = None
 118        # Flag to remember if fetching organization info failed (e.g., permission issues)
 119        self._organization_info_fetch_failed: bool = False
 120
 121    def _fetch_organization_info(self, *, force_refresh: bool = False) -> dict[str, Any]:
 122        """Fetch and cache organization info including billing status.
 123
 124        If fetching fails (e.g., due to permission issues), the failure is cached and
 125        subsequent calls will return an empty dict without retrying.
 126
 127        Args:
 128            force_refresh: If True, always fetch from the API even if cached.
 129
 130        Returns:
 131            Dictionary containing organization info including billing data.
 132            Returns empty dict if fetching failed or is not permitted.
 133        """
 134        # Reset failure flag if force_refresh is requested
 135        if force_refresh:
 136            self._organization_info_fetch_failed = False
 137
 138        # If we already know fetching failed, return empty dict without retrying
 139        if self._organization_info_fetch_failed:
 140            return {}
 141
 142        if not force_refresh and self._organization_info is not None:
 143            return self._organization_info
 144
 145        try:
 146            self._organization_info = api_util.get_organization_info(
 147                organization_id=self.organization_id,
 148                api_root=self._api_root,
 149                client_id=self._client_id,
 150                client_secret=self._client_secret,
 151                bearer_token=self._bearer_token,
 152            )
 153        except Exception:
 154            # Cache the failure so we don't retry on subsequent property accesses
 155            self._organization_info_fetch_failed = True
 156            return {}
 157        else:
 158            return self._organization_info
 159
 160    @property
 161    def organization_name(self) -> str | None:
 162        """Display name of the organization."""
 163        if self._organization_name is not None:
 164            return self._organization_name
 165        # Try to fetch from API if not set (returns empty dict on failure)
 166        info = self._fetch_organization_info()
 167        return info.get("organizationName")
 168
 169    @property
 170    def email(self) -> str | None:
 171        """Email associated with the organization."""
 172        if self._email is not None:
 173            return self._email
 174        # Try to fetch from API if not set (returns empty dict on failure)
 175        info = self._fetch_organization_info()
 176        return info.get("email")
 177
 178    @property
 179    def payment_status(self) -> str | None:
 180        """Payment status of the organization.
 181
 182        Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'.
 183        When 'disabled', syncs are blocked due to unpaid invoices.
 184        Returns None if billing info is not available (e.g., due to permission issues).
 185        """
 186        info = self._fetch_organization_info()
 187        return (info.get("billing") or {}).get("paymentStatus")
 188
 189    @property
 190    def subscription_status(self) -> str | None:
 191        """Subscription status of the organization.
 192
 193        Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'.
 194        Returns None if billing info is not available (e.g., due to permission issues).
 195        """
 196        info = self._fetch_organization_info()
 197        return (info.get("billing") or {}).get("subscriptionStatus")
 198
 199    @property
 200    def is_account_locked(self) -> bool:
 201        """Whether the account is locked due to billing issues.
 202
 203        Returns True if payment_status is 'disabled'/'locked' or subscription_status is
 204        'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.
 205        """
 206        return api_util.is_account_locked(self.payment_status, self.subscription_status)
 207
 208
 209@dataclass
 210class CloudWorkspace:
 211    """A remote workspace on the Airbyte Cloud.
 212
 213    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 214    instances, both OSS and Enterprise.
 215
 216    Two authentication methods are supported (mutually exclusive):
 217    1. OAuth2 client credentials (client_id + client_secret)
 218    2. Bearer token authentication
 219
 220    Example with client credentials:
 221        ```python
 222        workspace = CloudWorkspace(
 223            workspace_id="...",
 224            client_id="...",
 225            client_secret="...",
 226        )
 227        ```
 228
 229    Example with bearer token:
 230        ```python
 231        workspace = CloudWorkspace(
 232            workspace_id="...",
 233            bearer_token="...",
 234        )
 235        ```
 236    """
 237
 238    workspace_id: str
 239    client_id: SecretString | None = None
 240    client_secret: SecretString | None = None
 241    api_root: str = api_util.CLOUD_API_ROOT
 242    bearer_token: SecretString | None = None
 243
 244    # Internal credentials object (set in __post_init__, excluded from __init__)
 245    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
 246
 247    def __post_init__(self) -> None:
 248        """Validate and initialize credentials."""
 249        # Wrap secrets in SecretString if provided
 250        if self.client_id is not None:
 251            self.client_id = SecretString(self.client_id)
 252        if self.client_secret is not None:
 253            self.client_secret = SecretString(self.client_secret)
 254        if self.bearer_token is not None:
 255            self.bearer_token = SecretString(self.bearer_token)
 256
 257        # Create internal CloudClientConfig object (validates mutual exclusivity)
 258        self._credentials = CloudClientConfig(
 259            client_id=self.client_id,
 260            client_secret=self.client_secret,
 261            bearer_token=self.bearer_token,
 262            api_root=self.api_root,
 263        )
 264
 265    @classmethod
 266    def from_env(
 267        cls,
 268        workspace_id: str | None = None,
 269        *,
 270        api_root: str | None = None,
 271    ) -> CloudWorkspace:
 272        """Create a CloudWorkspace using credentials from environment variables.
 273
 274        This factory method resolves credentials from environment variables,
 275        providing a convenient way to create a workspace without explicitly
 276        passing credentials.
 277
 278        Two authentication methods are supported (mutually exclusive):
 279        1. Bearer token (checked first)
 280        2. OAuth2 client credentials (fallback)
 281
 282        Environment variables used:
 283            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
 284            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
 285            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
 286            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
 287            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
 288
 289        Args:
 290            workspace_id: The workspace ID. If not provided, will be resolved from
 291                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
 292            api_root: The API root URL. If not provided, will be resolved from
 293                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
 294                the Airbyte Cloud API.
 295
 296        Returns:
 297            A CloudWorkspace instance configured with credentials from the environment.
 298
 299        Raises:
 300            PyAirbyteSecretNotFoundError: If required credentials are not found in
 301                the environment.
 302
 303        Example:
 304            ```python
 305            # With workspace_id from environment
 306            workspace = CloudWorkspace.from_env()
 307
 308            # With explicit workspace_id
 309            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
 310            ```
 311        """
 312        resolved_api_root = resolve_cloud_api_url(api_root)
 313
 314        # Try bearer token first
 315        bearer_token = resolve_cloud_bearer_token()
 316        if bearer_token:
 317            return cls(
 318                workspace_id=resolve_cloud_workspace_id(workspace_id),
 319                bearer_token=bearer_token,
 320                api_root=resolved_api_root,
 321            )
 322
 323        # Fall back to client credentials
 324        return cls(
 325            workspace_id=resolve_cloud_workspace_id(workspace_id),
 326            client_id=resolve_cloud_client_id(),
 327            client_secret=resolve_cloud_client_secret(),
 328            api_root=resolved_api_root,
 329        )
 330
 331    @property
 332    def workspace_url(self) -> str | None:
 333        """The web URL of the workspace."""
 334        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 335
 336    @cached_property
 337    def _organization_info(self) -> dict[str, Any]:
 338        """Fetch and cache organization info for this workspace.
 339
 340        Uses the Config API endpoint for an efficient O(1) lookup.
 341        This is an internal method; use get_organization() for public access.
 342        """
 343        return api_util.get_workspace_organization_info(
 344            workspace_id=self.workspace_id,
 345            api_root=self.api_root,
 346            client_id=self.client_id,
 347            client_secret=self.client_secret,
 348            bearer_token=self.bearer_token,
 349        )
 350
 351    @overload
 352    def get_organization(self) -> CloudOrganization: ...
 353
 354    @overload
 355    def get_organization(
 356        self,
 357        *,
 358        raise_on_error: Literal[True],
 359    ) -> CloudOrganization: ...
 360
 361    @overload
 362    def get_organization(
 363        self,
 364        *,
 365        raise_on_error: Literal[False],
 366    ) -> CloudOrganization | None: ...
 367
 368    def get_organization(
 369        self,
 370        *,
 371        raise_on_error: bool = True,
 372    ) -> CloudOrganization | None:
 373        """Get the organization this workspace belongs to.
 374
 375        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
 376        which may not be available with workspace-scoped credentials.
 377
 378        Args:
 379            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
 380                If False, returns None instead of raising.
 381
 382        Returns:
 383            CloudOrganization object with organization_id and organization_name,
 384            or None if raise_on_error=False and an error occurred.
 385
 386        Raises:
 387            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
 388                (e.g., due to insufficient permissions or missing data).
 389        """
 390        try:
 391            info = self._organization_info
 392        except (AirbyteError, NotImplementedError):
 393            if raise_on_error:
 394                raise
 395            return None
 396
 397        organization_id = info.get("organizationId")
 398        organization_name = info.get("organizationName")
 399
 400        # Validate that both organization_id and organization_name are non-null and non-empty
 401        if not organization_id or not organization_name:
 402            if raise_on_error:
 403                raise AirbyteError(
 404                    message="Organization info is incomplete.",
 405                    context={
 406                        "organization_id": organization_id,
 407                        "organization_name": organization_name,
 408                    },
 409                )
 410            return None
 411
 412        return CloudOrganization(
 413            organization_id=organization_id,
 414            organization_name=organization_name,
 415            api_root=self.api_root,
 416            client_id=self.client_id,
 417            client_secret=self.client_secret,
 418            bearer_token=self.bearer_token,
 419        )
 420
 421    # Test connection and creds
 422
 423    def connect(self) -> None:
 424        """Check that the workspace is reachable and raise an exception otherwise.
 425
 426        Note: It is not necessary to call this method before calling other operations. It
 427              serves primarily as a simple check to ensure that the workspace is reachable
 428              and credentials are correct.
 429        """
 430        _ = api_util.get_workspace(
 431            api_root=self.api_root,
 432            workspace_id=self.workspace_id,
 433            client_id=self.client_id,
 434            client_secret=self.client_secret,
 435            bearer_token=self.bearer_token,
 436        )
 437        print(f"Successfully connected to workspace: {self.workspace_url}")
 438
 439    # Get sources, destinations, and connections
 440
 441    def get_connection(
 442        self,
 443        connection_id: str,
 444    ) -> CloudConnection:
 445        """Get a connection by ID.
 446
 447        This method does not fetch data from the API. It returns a `CloudConnection` object,
 448        which will be loaded lazily as needed.
 449        """
 450        return CloudConnection(
 451            workspace=self,
 452            connection_id=connection_id,
 453        )
 454
 455    def get_source(
 456        self,
 457        source_id: str,
 458    ) -> CloudSource:
 459        """Get a source by ID.
 460
 461        This method does not fetch data from the API. It returns a `CloudSource` object,
 462        which will be loaded lazily as needed.
 463        """
 464        return CloudSource(
 465            workspace=self,
 466            connector_id=source_id,
 467        )
 468
 469    def get_destination(
 470        self,
 471        destination_id: str,
 472    ) -> CloudDestination:
 473        """Get a destination by ID.
 474
 475        This method does not fetch data from the API. It returns a `CloudDestination` object,
 476        which will be loaded lazily as needed.
 477        """
 478        return CloudDestination(
 479            workspace=self,
 480            connector_id=destination_id,
 481        )
 482
 483    # Deploy sources and destinations
 484
 485    def deploy_source(
 486        self,
 487        name: str,
 488        source: Source,
 489        *,
 490        unique: bool = True,
 491        random_name_suffix: bool = False,
 492    ) -> CloudSource:
 493        """Deploy a source to the workspace.
 494
 495        Returns the newly deployed source.
 496
 497        Args:
 498            name: The name to use when deploying.
 499            source: The source object to deploy.
 500            unique: Whether to require a unique name. If `True`, duplicate names
 501                are not allowed. Defaults to `True`.
 502            random_name_suffix: Whether to append a random suffix to the name.
 503        """
 504        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 505        source_config_dict["sourceType"] = source.name.replace("source-", "")
 506
 507        if random_name_suffix:
 508            name += f" (ID: {text_util.generate_random_suffix()})"
 509
 510        if unique:
 511            existing = self.list_sources(name=name)
 512            if existing:
 513                raise exc.AirbyteDuplicateResourcesError(
 514                    resource_type="source",
 515                    resource_name=name,
 516                )
 517
 518        deployed_source = api_util.create_source(
 519            name=name,
 520            api_root=self.api_root,
 521            workspace_id=self.workspace_id,
 522            config=source_config_dict,
 523            client_id=self.client_id,
 524            client_secret=self.client_secret,
 525            bearer_token=self.bearer_token,
 526        )
 527        return CloudSource(
 528            workspace=self,
 529            connector_id=deployed_source.source_id,
 530        )
 531
 532    def deploy_destination(
 533        self,
 534        name: str,
 535        destination: Destination | dict[str, Any],
 536        *,
 537        unique: bool = True,
 538        random_name_suffix: bool = False,
 539    ) -> CloudDestination:
 540        """Deploy a destination to the workspace.
 541
 542        Returns the newly deployed destination ID.
 543
 544        Args:
 545            name: The name to use when deploying.
 546            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
 547                dictionary of configuration values.
 548            unique: Whether to require a unique name. If `True`, duplicate names
 549                are not allowed. Defaults to `True`.
 550            random_name_suffix: Whether to append a random suffix to the name.
 551        """
 552        if isinstance(destination, Destination):
 553            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 554            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
 555            # raise ValueError(destination_conf_dict)
 556        else:
 557            destination_conf_dict = destination.copy()
 558            if "destinationType" not in destination_conf_dict:
 559                raise exc.PyAirbyteInputError(
 560                    message="Missing `destinationType` in configuration dictionary.",
 561                )
 562
 563        if random_name_suffix:
 564            name += f" (ID: {text_util.generate_random_suffix()})"
 565
 566        if unique:
 567            existing = self.list_destinations(name=name)
 568            if existing:
 569                raise exc.AirbyteDuplicateResourcesError(
 570                    resource_type="destination",
 571                    resource_name=name,
 572                )
 573
 574        deployed_destination = api_util.create_destination(
 575            name=name,
 576            api_root=self.api_root,
 577            workspace_id=self.workspace_id,
 578            config=destination_conf_dict,  # Wants a dataclass but accepts dict
 579            client_id=self.client_id,
 580            client_secret=self.client_secret,
 581            bearer_token=self.bearer_token,
 582        )
 583        return CloudDestination(
 584            workspace=self,
 585            connector_id=deployed_destination.destination_id,
 586        )
 587
 588    def permanently_delete_source(
 589        self,
 590        source: str | CloudSource,
 591        *,
 592        safe_mode: bool = True,
 593    ) -> None:
 594        """Delete a source from the workspace.
 595
 596        You can pass either the source ID `str` or a deployed `Source` object.
 597
 598        Args:
 599            source: The source ID or CloudSource object to delete
 600            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
 601                (case insensitive) to prevent accidental deletion. Defaults to True.
 602        """
 603        if not isinstance(source, (str, CloudSource)):
 604            raise exc.PyAirbyteInputError(
 605                message="Invalid source type.",
 606                input_value=type(source).__name__,
 607            )
 608
 609        api_util.delete_source(
 610            source_id=source.connector_id if isinstance(source, CloudSource) else source,
 611            source_name=source.name if isinstance(source, CloudSource) else None,
 612            api_root=self.api_root,
 613            client_id=self.client_id,
 614            client_secret=self.client_secret,
 615            bearer_token=self.bearer_token,
 616            safe_mode=safe_mode,
 617        )
 618
 619    # Deploy and delete destinations
 620
 621    def permanently_delete_destination(
 622        self,
 623        destination: str | CloudDestination,
 624        *,
 625        safe_mode: bool = True,
 626    ) -> None:
 627        """Delete a deployed destination from the workspace.
 628
 629        You can pass either the `Cache` class or the deployed destination ID as a `str`.
 630
 631        Args:
 632            destination: The destination ID or CloudDestination object to delete
 633            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
 634                (case insensitive) to prevent accidental deletion. Defaults to True.
 635        """
 636        if not isinstance(destination, (str, CloudDestination)):
 637            raise exc.PyAirbyteInputError(
 638                message="Invalid destination type.",
 639                input_value=type(destination).__name__,
 640            )
 641
 642        api_util.delete_destination(
 643            destination_id=(
 644                destination if isinstance(destination, str) else destination.destination_id
 645            ),
 646            destination_name=(
 647                destination.name if isinstance(destination, CloudDestination) else None
 648            ),
 649            api_root=self.api_root,
 650            client_id=self.client_id,
 651            client_secret=self.client_secret,
 652            bearer_token=self.bearer_token,
 653            safe_mode=safe_mode,
 654        )
 655
 656    # Deploy and delete connections
 657
 658    def deploy_connection(
 659        self,
 660        connection_name: str,
 661        *,
 662        source: CloudSource | str,
 663        selected_streams: list[str],
 664        destination: CloudDestination | str,
 665        table_prefix: str | None = None,
 666    ) -> CloudConnection:
 667        """Create a new connection between an already deployed source and destination.
 668
 669        Returns the newly deployed connection object.
 670
 671        Args:
 672            connection_name: The name of the connection.
 673            source: The deployed source. You can pass a source ID or a CloudSource object.
 674            destination: The deployed destination. You can pass a destination ID or a
 675                CloudDestination object.
 676            table_prefix: Optional. The table prefix to use when syncing to the destination.
 677            selected_streams: The selected stream names to sync within the connection.
 678        """
 679        if not selected_streams:
 680            raise exc.PyAirbyteInputError(
 681                guidance="You must provide `selected_streams` when creating a connection."
 682            )
 683
 684        source_id: str = source if isinstance(source, str) else source.connector_id
 685        destination_id: str = (
 686            destination if isinstance(destination, str) else destination.connector_id
 687        )
 688
 689        deployed_connection = api_util.create_connection(
 690            name=connection_name,
 691            source_id=source_id,
 692            destination_id=destination_id,
 693            api_root=self.api_root,
 694            workspace_id=self.workspace_id,
 695            selected_stream_names=selected_streams,
 696            prefix=table_prefix or "",
 697            client_id=self.client_id,
 698            client_secret=self.client_secret,
 699            bearer_token=self.bearer_token,
 700        )
 701
 702        return CloudConnection(
 703            workspace=self,
 704            connection_id=deployed_connection.connection_id,
 705            source=deployed_connection.source_id,
 706            destination=deployed_connection.destination_id,
 707        )
 708
 709    def permanently_delete_connection(
 710        self,
 711        connection: str | CloudConnection,
 712        *,
 713        cascade_delete_source: bool = False,
 714        cascade_delete_destination: bool = False,
 715        safe_mode: bool = True,
 716    ) -> None:
 717        """Delete a deployed connection from the workspace.
 718
 719        Args:
 720            connection: The connection ID or CloudConnection object to delete
 721            cascade_delete_source: If True, also delete the source after deleting the connection
 722            cascade_delete_destination: If True, also delete the destination after deleting
 723                the connection
 724            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
 725                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
 726                to cascade deletes.
 727        """
 728        if connection is None:
 729            raise ValueError("No connection ID provided.")
 730
 731        if isinstance(connection, str):
 732            connection = CloudConnection(
 733                workspace=self,
 734                connection_id=connection,
 735            )
 736
 737        api_util.delete_connection(
 738            connection_id=connection.connection_id,
 739            connection_name=connection.name,
 740            api_root=self.api_root,
 741            workspace_id=self.workspace_id,
 742            client_id=self.client_id,
 743            client_secret=self.client_secret,
 744            bearer_token=self.bearer_token,
 745            safe_mode=safe_mode,
 746        )
 747
 748        if cascade_delete_source:
 749            self.permanently_delete_source(
 750                source=connection.source_id,
 751                safe_mode=safe_mode,
 752            )
 753        if cascade_delete_destination:
 754            self.permanently_delete_destination(
 755                destination=connection.destination_id,
 756                safe_mode=safe_mode,
 757            )
 758
 759    # List sources, destinations, and connections
 760
 761    def list_connections(
 762        self,
 763        name: str | None = None,
 764        *,
 765        name_filter: Callable | None = None,
 766    ) -> list[CloudConnection]:
 767        """List connections by name in the workspace.
 768
 769        TODO: Add pagination support
 770        """
 771        connections = api_util.list_connections(
 772            api_root=self.api_root,
 773            workspace_id=self.workspace_id,
 774            name=name,
 775            name_filter=name_filter,
 776            client_id=self.client_id,
 777            client_secret=self.client_secret,
 778            bearer_token=self.bearer_token,
 779        )
 780        return [
 781            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
 782                workspace=self,
 783                connection_response=connection,
 784            )
 785            for connection in connections
 786            if name is None or connection.name == name
 787        ]
 788
 789    def list_sources(
 790        self,
 791        name: str | None = None,
 792        *,
 793        name_filter: Callable | None = None,
 794    ) -> list[CloudSource]:
 795        """List all sources in the workspace.
 796
 797        TODO: Add pagination support
 798        """
 799        sources = api_util.list_sources(
 800            api_root=self.api_root,
 801            workspace_id=self.workspace_id,
 802            name=name,
 803            name_filter=name_filter,
 804            client_id=self.client_id,
 805            client_secret=self.client_secret,
 806            bearer_token=self.bearer_token,
 807        )
 808        return [
 809            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
 810                workspace=self,
 811                source_response=source,
 812            )
 813            for source in sources
 814            if name is None or source.name == name
 815        ]
 816
 817    def list_destinations(
 818        self,
 819        name: str | None = None,
 820        *,
 821        name_filter: Callable | None = None,
 822    ) -> list[CloudDestination]:
 823        """List all destinations in the workspace.
 824
 825        TODO: Add pagination support
 826        """
 827        destinations = api_util.list_destinations(
 828            api_root=self.api_root,
 829            workspace_id=self.workspace_id,
 830            name=name,
 831            name_filter=name_filter,
 832            client_id=self.client_id,
 833            client_secret=self.client_secret,
 834            bearer_token=self.bearer_token,
 835        )
 836        return [
 837            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
 838                workspace=self,
 839                destination_response=destination,
 840            )
 841            for destination in destinations
 842            if name is None or destination.name == name
 843        ]
 844
 845    def publish_custom_source_definition(
 846        self,
 847        name: str,
 848        *,
 849        manifest_yaml: dict[str, Any] | Path | str | None = None,
 850        docker_image: str | None = None,
 851        docker_tag: str | None = None,
 852        unique: bool = True,
 853        pre_validate: bool = True,
 854        testing_values: dict[str, Any] | None = None,
 855    ) -> CustomCloudSourceDefinition:
 856        """Publish a custom source connector definition.
 857
 858        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
 859        and docker_tag (for Docker connectors), but not both.
 860
 861        Args:
 862            name: Display name for the connector definition
 863            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
 864            docker_image: Docker repository (e.g., 'airbyte/source-custom')
 865            docker_tag: Docker image tag (e.g., '1.0.0')
 866            unique: Whether to enforce name uniqueness
 867            pre_validate: Whether to validate manifest client-side (YAML only)
 868            testing_values: Optional configuration values to use for testing in the
 869                Connector Builder UI. If provided, these values are stored as the complete
 870                testing values object for the connector builder project (replaces any existing
 871                values), allowing immediate test read operations.
 872
 873        Returns:
 874            CustomCloudSourceDefinition object representing the created definition
 875
 876        Raises:
 877            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
 878            AirbyteDuplicateResourcesError: If unique=True and name already exists
 879        """
 880        is_yaml = manifest_yaml is not None
 881        is_docker = docker_image is not None
 882
 883        if is_yaml == is_docker:
 884            raise exc.PyAirbyteInputError(
 885                message=(
 886                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
 887                    "docker_image + docker_tag (for Docker connectors), but not both"
 888                ),
 889                context={
 890                    "manifest_yaml_provided": is_yaml,
 891                    "docker_image_provided": is_docker,
 892                },
 893            )
 894
 895        if is_docker and docker_tag is None:
 896            raise exc.PyAirbyteInputError(
 897                message="docker_tag is required when docker_image is specified",
 898                context={"docker_image": docker_image},
 899            )
 900
 901        if unique:
 902            existing = self.list_custom_source_definitions(
 903                definition_type="yaml" if is_yaml else "docker",
 904            )
 905            if any(d.name == name for d in existing):
 906                raise exc.AirbyteDuplicateResourcesError(
 907                    resource_type="custom_source_definition",
 908                    resource_name=name,
 909                )
 910
 911        if is_yaml:
 912            manifest_dict: dict[str, Any]
 913            if isinstance(manifest_yaml, Path):
 914                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
 915            elif isinstance(manifest_yaml, str):
 916                manifest_dict = yaml.safe_load(manifest_yaml)
 917            elif manifest_yaml is not None:
 918                manifest_dict = manifest_yaml
 919            else:
 920                raise exc.PyAirbyteInputError(
 921                    message="manifest_yaml is required for YAML connectors",
 922                    context={"name": name},
 923                )
 924
 925            if pre_validate:
 926                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
 927
 928            result = api_util.create_custom_yaml_source_definition(
 929                name=name,
 930                workspace_id=self.workspace_id,
 931                manifest=manifest_dict,
 932                api_root=self.api_root,
 933                client_id=self.client_id,
 934                client_secret=self.client_secret,
 935                bearer_token=self.bearer_token,
 936            )
 937            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
 938                self, result
 939            )
 940
 941            # Set testing values if provided
 942            if testing_values is not None:
 943                custom_definition.set_testing_values(testing_values)
 944
 945            return custom_definition
 946
 947        raise NotImplementedError(
 948            "Docker custom source definitions are not yet supported. "
 949            "Only YAML manifest-based custom sources are currently available."
 950        )
 951
 952    def list_custom_source_definitions(
 953        self,
 954        *,
 955        definition_type: Literal["yaml", "docker"],
 956    ) -> list[CustomCloudSourceDefinition]:
 957        """List custom source connector definitions.
 958
 959        Args:
 960            definition_type: Connector type to list ("yaml" or "docker"). Required.
 961
 962        Returns:
 963            List of CustomCloudSourceDefinition objects matching the specified type
 964        """
 965        if definition_type == "yaml":
 966            yaml_definitions = api_util.list_custom_yaml_source_definitions(
 967                workspace_id=self.workspace_id,
 968                api_root=self.api_root,
 969                client_id=self.client_id,
 970                client_secret=self.client_secret,
 971                bearer_token=self.bearer_token,
 972            )
 973            return [
 974                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
 975                for d in yaml_definitions
 976            ]
 977
 978        raise NotImplementedError(
 979            "Docker custom source definitions are not yet supported. "
 980            "Only YAML manifest-based custom sources are currently available."
 981        )
 982
 983    def get_custom_source_definition(
 984        self,
 985        definition_id: str,
 986        *,
 987        definition_type: Literal["yaml", "docker"],
 988    ) -> CustomCloudSourceDefinition:
 989        """Get a specific custom source definition by ID.
 990
 991        Args:
 992            definition_id: The definition ID
 993            definition_type: Connector type ("yaml" or "docker"). Required.
 994
 995        Returns:
 996            CustomCloudSourceDefinition object
 997        """
 998        if definition_type == "yaml":
 999            result = api_util.get_custom_yaml_source_definition(
1000                workspace_id=self.workspace_id,
1001                definition_id=definition_id,
1002                api_root=self.api_root,
1003                client_id=self.client_id,
1004                client_secret=self.client_secret,
1005                bearer_token=self.bearer_token,
1006            )
1007            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1008
1009        raise NotImplementedError(
1010            "Docker custom source definitions are not yet supported. "
1011            "Only YAML manifest-based custom sources are currently available."
1012        )
class CloudOrganization:
 74class CloudOrganization:
 75    """Information about an organization in Airbyte Cloud.
 76
 77    This class provides lazy loading of organization attributes including billing status.
 78    It is typically created via CloudWorkspace.get_organization().
 79    """
 80
 81    def __init__(
 82        self,
 83        organization_id: str,
 84        organization_name: str | None = None,
 85        email: str | None = None,
 86        *,
 87        api_root: str = api_util.CLOUD_API_ROOT,
 88        client_id: SecretString | None = None,
 89        client_secret: SecretString | None = None,
 90        bearer_token: SecretString | None = None,
 91    ) -> None:
 92        """Initialize a CloudOrganization.
 93
 94        Args:
 95            organization_id: The organization ID.
 96            organization_name: Display name of the organization.
 97            email: Email associated with the organization.
 98            api_root: The API root URL.
 99            client_id: OAuth client ID for authentication.
100            client_secret: OAuth client secret for authentication.
101            bearer_token: Bearer token for authentication (alternative to client credentials).
102        """
103        self.organization_id = organization_id
104        """The organization ID."""
105
106        self._organization_name = organization_name
107        """Display name of the organization."""
108
109        self._email = email
110        """Email associated with the organization."""
111
112        self._api_root = api_root
113        self._client_id = client_id
114        self._client_secret = client_secret
115        self._bearer_token = bearer_token
116
117        # Cached organization info (billing, etc.)
118        self._organization_info: dict[str, Any] | None = None
119        # Flag to remember if fetching organization info failed (e.g., permission issues)
120        self._organization_info_fetch_failed: bool = False
121
122    def _fetch_organization_info(self, *, force_refresh: bool = False) -> dict[str, Any]:
123        """Fetch and cache organization info including billing status.
124
125        If fetching fails (e.g., due to permission issues), the failure is cached and
126        subsequent calls will return an empty dict without retrying.
127
128        Args:
129            force_refresh: If True, always fetch from the API even if cached.
130
131        Returns:
132            Dictionary containing organization info including billing data.
133            Returns empty dict if fetching failed or is not permitted.
134        """
135        # Reset failure flag if force_refresh is requested
136        if force_refresh:
137            self._organization_info_fetch_failed = False
138
139        # If we already know fetching failed, return empty dict without retrying
140        if self._organization_info_fetch_failed:
141            return {}
142
143        if not force_refresh and self._organization_info is not None:
144            return self._organization_info
145
146        try:
147            self._organization_info = api_util.get_organization_info(
148                organization_id=self.organization_id,
149                api_root=self._api_root,
150                client_id=self._client_id,
151                client_secret=self._client_secret,
152                bearer_token=self._bearer_token,
153            )
154        except Exception:
155            # Cache the failure so we don't retry on subsequent property accesses
156            self._organization_info_fetch_failed = True
157            return {}
158        else:
159            return self._organization_info
160
161    @property
162    def organization_name(self) -> str | None:
163        """Display name of the organization."""
164        if self._organization_name is not None:
165            return self._organization_name
166        # Try to fetch from API if not set (returns empty dict on failure)
167        info = self._fetch_organization_info()
168        return info.get("organizationName")
169
170    @property
171    def email(self) -> str | None:
172        """Email associated with the organization."""
173        if self._email is not None:
174            return self._email
175        # Try to fetch from API if not set (returns empty dict on failure)
176        info = self._fetch_organization_info()
177        return info.get("email")
178
179    @property
180    def payment_status(self) -> str | None:
181        """Payment status of the organization.
182
183        Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'.
184        When 'disabled', syncs are blocked due to unpaid invoices.
185        Returns None if billing info is not available (e.g., due to permission issues).
186        """
187        info = self._fetch_organization_info()
188        return (info.get("billing") or {}).get("paymentStatus")
189
190    @property
191    def subscription_status(self) -> str | None:
192        """Subscription status of the organization.
193
194        Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'.
195        Returns None if billing info is not available (e.g., due to permission issues).
196        """
197        info = self._fetch_organization_info()
198        return (info.get("billing") or {}).get("subscriptionStatus")
199
200    @property
201    def is_account_locked(self) -> bool:
202        """Whether the account is locked due to billing issues.
203
204        Returns True if payment_status is 'disabled'/'locked' or subscription_status is
205        'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.
206        """
207        return api_util.is_account_locked(self.payment_status, self.subscription_status)

Information about an organization in Airbyte Cloud.

This class provides lazy loading of organization attributes including billing status. It is typically created via CloudWorkspace.get_organization().

CloudOrganization( organization_id: str, organization_name: str | None = None, email: str | None = None, *, api_root: str = 'https://api.airbyte.com/v1', client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, bearer_token: airbyte.secrets.SecretString | None = None)
 81    def __init__(
 82        self,
 83        organization_id: str,
 84        organization_name: str | None = None,
 85        email: str | None = None,
 86        *,
 87        api_root: str = api_util.CLOUD_API_ROOT,
 88        client_id: SecretString | None = None,
 89        client_secret: SecretString | None = None,
 90        bearer_token: SecretString | None = None,
 91    ) -> None:
 92        """Initialize a CloudOrganization.
 93
 94        Args:
 95            organization_id: The organization ID.
 96            organization_name: Display name of the organization.
 97            email: Email associated with the organization.
 98            api_root: The API root URL.
 99            client_id: OAuth client ID for authentication.
100            client_secret: OAuth client secret for authentication.
101            bearer_token: Bearer token for authentication (alternative to client credentials).
102        """
103        self.organization_id = organization_id
104        """The organization ID."""
105
106        self._organization_name = organization_name
107        """Display name of the organization."""
108
109        self._email = email
110        """Email associated with the organization."""
111
112        self._api_root = api_root
113        self._client_id = client_id
114        self._client_secret = client_secret
115        self._bearer_token = bearer_token
116
117        # Cached organization info (billing, etc.)
118        self._organization_info: dict[str, Any] | None = None
119        # Flag to remember if fetching organization info failed (e.g., permission issues)
120        self._organization_info_fetch_failed: bool = False

Initialize a CloudOrganization.

Arguments:
  • organization_id: The organization ID.
  • organization_name: Display name of the organization.
  • email: Email associated with the organization.
  • api_root: The API root URL.
  • client_id: OAuth client ID for authentication.
  • client_secret: OAuth client secret for authentication.
  • bearer_token: Bearer token for authentication (alternative to client credentials).
organization_id

The organization ID.

organization_name: str | None
161    @property
162    def organization_name(self) -> str | None:
163        """Display name of the organization."""
164        if self._organization_name is not None:
165            return self._organization_name
166        # Try to fetch from API if not set (returns empty dict on failure)
167        info = self._fetch_organization_info()
168        return info.get("organizationName")

Display name of the organization.

email: str | None
170    @property
171    def email(self) -> str | None:
172        """Email associated with the organization."""
173        if self._email is not None:
174            return self._email
175        # Try to fetch from API if not set (returns empty dict on failure)
176        info = self._fetch_organization_info()
177        return info.get("email")

Email associated with the organization.

payment_status: str | None
179    @property
180    def payment_status(self) -> str | None:
181        """Payment status of the organization.
182
183        Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'.
184        When 'disabled', syncs are blocked due to unpaid invoices.
185        Returns None if billing info is not available (e.g., due to permission issues).
186        """
187        info = self._fetch_organization_info()
188        return (info.get("billing") or {}).get("paymentStatus")

Payment status of the organization.

Possible values: 'uninitialized', 'okay', 'grace_period', 'disabled', 'locked', 'manual'. When 'disabled', syncs are blocked due to unpaid invoices. Returns None if billing info is not available (e.g., due to permission issues).

subscription_status: str | None
190    @property
191    def subscription_status(self) -> str | None:
192        """Subscription status of the organization.
193
194        Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'.
195        Returns None if billing info is not available (e.g., due to permission issues).
196        """
197        info = self._fetch_organization_info()
198        return (info.get("billing") or {}).get("subscriptionStatus")

Subscription status of the organization.

Possible values: 'pre_subscription', 'subscribed', 'unsubscribed'. Returns None if billing info is not available (e.g., due to permission issues).

is_account_locked: bool
200    @property
201    def is_account_locked(self) -> bool:
202        """Whether the account is locked due to billing issues.
203
204        Returns True if payment_status is 'disabled'/'locked' or subscription_status is
205        'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.
206        """
207        return api_util.is_account_locked(self.payment_status, self.subscription_status)

Whether the account is locked due to billing issues.

Returns True if payment_status is 'disabled'/'locked' or subscription_status is 'unsubscribed'. Defaults to False unless we have affirmative evidence of a locked state.

@dataclass
class CloudWorkspace:
 210@dataclass
 211class CloudWorkspace:
 212    """A remote workspace on the Airbyte Cloud.
 213
 214    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 215    instances, both OSS and Enterprise.
 216
 217    Two authentication methods are supported (mutually exclusive):
 218    1. OAuth2 client credentials (client_id + client_secret)
 219    2. Bearer token authentication
 220
 221    Example with client credentials:
 222        ```python
 223        workspace = CloudWorkspace(
 224            workspace_id="...",
 225            client_id="...",
 226            client_secret="...",
 227        )
 228        ```
 229
 230    Example with bearer token:
 231        ```python
 232        workspace = CloudWorkspace(
 233            workspace_id="...",
 234            bearer_token="...",
 235        )
 236        ```
 237    """
 238
 239    workspace_id: str
 240    client_id: SecretString | None = None
 241    client_secret: SecretString | None = None
 242    api_root: str = api_util.CLOUD_API_ROOT
 243    bearer_token: SecretString | None = None
 244
 245    # Internal credentials object (set in __post_init__, excluded from __init__)
 246    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
 247
 248    def __post_init__(self) -> None:
 249        """Validate and initialize credentials."""
 250        # Wrap secrets in SecretString if provided
 251        if self.client_id is not None:
 252            self.client_id = SecretString(self.client_id)
 253        if self.client_secret is not None:
 254            self.client_secret = SecretString(self.client_secret)
 255        if self.bearer_token is not None:
 256            self.bearer_token = SecretString(self.bearer_token)
 257
 258        # Create internal CloudClientConfig object (validates mutual exclusivity)
 259        self._credentials = CloudClientConfig(
 260            client_id=self.client_id,
 261            client_secret=self.client_secret,
 262            bearer_token=self.bearer_token,
 263            api_root=self.api_root,
 264        )
 265
 266    @classmethod
 267    def from_env(
 268        cls,
 269        workspace_id: str | None = None,
 270        *,
 271        api_root: str | None = None,
 272    ) -> CloudWorkspace:
 273        """Create a CloudWorkspace using credentials from environment variables.
 274
 275        This factory method resolves credentials from environment variables,
 276        providing a convenient way to create a workspace without explicitly
 277        passing credentials.
 278
 279        Two authentication methods are supported (mutually exclusive):
 280        1. Bearer token (checked first)
 281        2. OAuth2 client credentials (fallback)
 282
 283        Environment variables used:
 284            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
 285            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
 286            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
 287            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
 288            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
 289
 290        Args:
 291            workspace_id: The workspace ID. If not provided, will be resolved from
 292                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
 293            api_root: The API root URL. If not provided, will be resolved from
 294                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
 295                the Airbyte Cloud API.
 296
 297        Returns:
 298            A CloudWorkspace instance configured with credentials from the environment.
 299
 300        Raises:
 301            PyAirbyteSecretNotFoundError: If required credentials are not found in
 302                the environment.
 303
 304        Example:
 305            ```python
 306            # With workspace_id from environment
 307            workspace = CloudWorkspace.from_env()
 308
 309            # With explicit workspace_id
 310            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
 311            ```
 312        """
 313        resolved_api_root = resolve_cloud_api_url(api_root)
 314
 315        # Try bearer token first
 316        bearer_token = resolve_cloud_bearer_token()
 317        if bearer_token:
 318            return cls(
 319                workspace_id=resolve_cloud_workspace_id(workspace_id),
 320                bearer_token=bearer_token,
 321                api_root=resolved_api_root,
 322            )
 323
 324        # Fall back to client credentials
 325        return cls(
 326            workspace_id=resolve_cloud_workspace_id(workspace_id),
 327            client_id=resolve_cloud_client_id(),
 328            client_secret=resolve_cloud_client_secret(),
 329            api_root=resolved_api_root,
 330        )
 331
 332    @property
 333    def workspace_url(self) -> str | None:
 334        """The web URL of the workspace."""
 335        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 336
 337    @cached_property
 338    def _organization_info(self) -> dict[str, Any]:
 339        """Fetch and cache organization info for this workspace.
 340
 341        Uses the Config API endpoint for an efficient O(1) lookup.
 342        This is an internal method; use get_organization() for public access.
 343        """
 344        return api_util.get_workspace_organization_info(
 345            workspace_id=self.workspace_id,
 346            api_root=self.api_root,
 347            client_id=self.client_id,
 348            client_secret=self.client_secret,
 349            bearer_token=self.bearer_token,
 350        )
 351
 352    @overload
 353    def get_organization(self) -> CloudOrganization: ...
 354
 355    @overload
 356    def get_organization(
 357        self,
 358        *,
 359        raise_on_error: Literal[True],
 360    ) -> CloudOrganization: ...
 361
 362    @overload
 363    def get_organization(
 364        self,
 365        *,
 366        raise_on_error: Literal[False],
 367    ) -> CloudOrganization | None: ...
 368
 369    def get_organization(
 370        self,
 371        *,
 372        raise_on_error: bool = True,
 373    ) -> CloudOrganization | None:
 374        """Get the organization this workspace belongs to.
 375
 376        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
 377        which may not be available with workspace-scoped credentials.
 378
 379        Args:
 380            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
 381                If False, returns None instead of raising.
 382
 383        Returns:
 384            CloudOrganization object with organization_id and organization_name,
 385            or None if raise_on_error=False and an error occurred.
 386
 387        Raises:
 388            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
 389                (e.g., due to insufficient permissions or missing data).
 390        """
 391        try:
 392            info = self._organization_info
 393        except (AirbyteError, NotImplementedError):
 394            if raise_on_error:
 395                raise
 396            return None
 397
 398        organization_id = info.get("organizationId")
 399        organization_name = info.get("organizationName")
 400
 401        # Validate that both organization_id and organization_name are non-null and non-empty
 402        if not organization_id or not organization_name:
 403            if raise_on_error:
 404                raise AirbyteError(
 405                    message="Organization info is incomplete.",
 406                    context={
 407                        "organization_id": organization_id,
 408                        "organization_name": organization_name,
 409                    },
 410                )
 411            return None
 412
 413        return CloudOrganization(
 414            organization_id=organization_id,
 415            organization_name=organization_name,
 416            api_root=self.api_root,
 417            client_id=self.client_id,
 418            client_secret=self.client_secret,
 419            bearer_token=self.bearer_token,
 420        )
 421
 422    # Test connection and creds
 423
 424    def connect(self) -> None:
 425        """Check that the workspace is reachable and raise an exception otherwise.
 426
 427        Note: It is not necessary to call this method before calling other operations. It
 428              serves primarily as a simple check to ensure that the workspace is reachable
 429              and credentials are correct.
 430        """
 431        _ = api_util.get_workspace(
 432            api_root=self.api_root,
 433            workspace_id=self.workspace_id,
 434            client_id=self.client_id,
 435            client_secret=self.client_secret,
 436            bearer_token=self.bearer_token,
 437        )
 438        print(f"Successfully connected to workspace: {self.workspace_url}")
 439
 440    # Get sources, destinations, and connections
 441
 442    def get_connection(
 443        self,
 444        connection_id: str,
 445    ) -> CloudConnection:
 446        """Get a connection by ID.
 447
 448        This method does not fetch data from the API. It returns a `CloudConnection` object,
 449        which will be loaded lazily as needed.
 450        """
 451        return CloudConnection(
 452            workspace=self,
 453            connection_id=connection_id,
 454        )
 455
 456    def get_source(
 457        self,
 458        source_id: str,
 459    ) -> CloudSource:
 460        """Get a source by ID.
 461
 462        This method does not fetch data from the API. It returns a `CloudSource` object,
 463        which will be loaded lazily as needed.
 464        """
 465        return CloudSource(
 466            workspace=self,
 467            connector_id=source_id,
 468        )
 469
 470    def get_destination(
 471        self,
 472        destination_id: str,
 473    ) -> CloudDestination:
 474        """Get a destination by ID.
 475
 476        This method does not fetch data from the API. It returns a `CloudDestination` object,
 477        which will be loaded lazily as needed.
 478        """
 479        return CloudDestination(
 480            workspace=self,
 481            connector_id=destination_id,
 482        )
 483
 484    # Deploy sources and destinations
 485
 486    def deploy_source(
 487        self,
 488        name: str,
 489        source: Source,
 490        *,
 491        unique: bool = True,
 492        random_name_suffix: bool = False,
 493    ) -> CloudSource:
 494        """Deploy a source to the workspace.
 495
 496        Returns the newly deployed source.
 497
 498        Args:
 499            name: The name to use when deploying.
 500            source: The source object to deploy.
 501            unique: Whether to require a unique name. If `True`, duplicate names
 502                are not allowed. Defaults to `True`.
 503            random_name_suffix: Whether to append a random suffix to the name.
 504        """
 505        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 506        source_config_dict["sourceType"] = source.name.replace("source-", "")
 507
 508        if random_name_suffix:
 509            name += f" (ID: {text_util.generate_random_suffix()})"
 510
 511        if unique:
 512            existing = self.list_sources(name=name)
 513            if existing:
 514                raise exc.AirbyteDuplicateResourcesError(
 515                    resource_type="source",
 516                    resource_name=name,
 517                )
 518
 519        deployed_source = api_util.create_source(
 520            name=name,
 521            api_root=self.api_root,
 522            workspace_id=self.workspace_id,
 523            config=source_config_dict,
 524            client_id=self.client_id,
 525            client_secret=self.client_secret,
 526            bearer_token=self.bearer_token,
 527        )
 528        return CloudSource(
 529            workspace=self,
 530            connector_id=deployed_source.source_id,
 531        )
 532
 533    def deploy_destination(
 534        self,
 535        name: str,
 536        destination: Destination | dict[str, Any],
 537        *,
 538        unique: bool = True,
 539        random_name_suffix: bool = False,
 540    ) -> CloudDestination:
 541        """Deploy a destination to the workspace.
 542
 543        Returns the newly deployed destination ID.
 544
 545        Args:
 546            name: The name to use when deploying.
 547            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
 548                dictionary of configuration values.
 549            unique: Whether to require a unique name. If `True`, duplicate names
 550                are not allowed. Defaults to `True`.
 551            random_name_suffix: Whether to append a random suffix to the name.
 552        """
 553        if isinstance(destination, Destination):
 554            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 555            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
 556            # raise ValueError(destination_conf_dict)
 557        else:
 558            destination_conf_dict = destination.copy()
 559            if "destinationType" not in destination_conf_dict:
 560                raise exc.PyAirbyteInputError(
 561                    message="Missing `destinationType` in configuration dictionary.",
 562                )
 563
 564        if random_name_suffix:
 565            name += f" (ID: {text_util.generate_random_suffix()})"
 566
 567        if unique:
 568            existing = self.list_destinations(name=name)
 569            if existing:
 570                raise exc.AirbyteDuplicateResourcesError(
 571                    resource_type="destination",
 572                    resource_name=name,
 573                )
 574
 575        deployed_destination = api_util.create_destination(
 576            name=name,
 577            api_root=self.api_root,
 578            workspace_id=self.workspace_id,
 579            config=destination_conf_dict,  # Wants a dataclass but accepts dict
 580            client_id=self.client_id,
 581            client_secret=self.client_secret,
 582            bearer_token=self.bearer_token,
 583        )
 584        return CloudDestination(
 585            workspace=self,
 586            connector_id=deployed_destination.destination_id,
 587        )
 588
 589    def permanently_delete_source(
 590        self,
 591        source: str | CloudSource,
 592        *,
 593        safe_mode: bool = True,
 594    ) -> None:
 595        """Delete a source from the workspace.
 596
 597        You can pass either the source ID `str` or a deployed `Source` object.
 598
 599        Args:
 600            source: The source ID or CloudSource object to delete
 601            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
 602                (case insensitive) to prevent accidental deletion. Defaults to True.
 603        """
 604        if not isinstance(source, (str, CloudSource)):
 605            raise exc.PyAirbyteInputError(
 606                message="Invalid source type.",
 607                input_value=type(source).__name__,
 608            )
 609
 610        api_util.delete_source(
 611            source_id=source.connector_id if isinstance(source, CloudSource) else source,
 612            source_name=source.name if isinstance(source, CloudSource) else None,
 613            api_root=self.api_root,
 614            client_id=self.client_id,
 615            client_secret=self.client_secret,
 616            bearer_token=self.bearer_token,
 617            safe_mode=safe_mode,
 618        )
 619
 620    # Deploy and delete destinations
 621
 622    def permanently_delete_destination(
 623        self,
 624        destination: str | CloudDestination,
 625        *,
 626        safe_mode: bool = True,
 627    ) -> None:
 628        """Delete a deployed destination from the workspace.
 629
 630        You can pass either the `Cache` class or the deployed destination ID as a `str`.
 631
 632        Args:
 633            destination: The destination ID or CloudDestination object to delete
 634            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
 635                (case insensitive) to prevent accidental deletion. Defaults to True.
 636        """
 637        if not isinstance(destination, (str, CloudDestination)):
 638            raise exc.PyAirbyteInputError(
 639                message="Invalid destination type.",
 640                input_value=type(destination).__name__,
 641            )
 642
 643        api_util.delete_destination(
 644            destination_id=(
 645                destination if isinstance(destination, str) else destination.destination_id
 646            ),
 647            destination_name=(
 648                destination.name if isinstance(destination, CloudDestination) else None
 649            ),
 650            api_root=self.api_root,
 651            client_id=self.client_id,
 652            client_secret=self.client_secret,
 653            bearer_token=self.bearer_token,
 654            safe_mode=safe_mode,
 655        )
 656
 657    # Deploy and delete connections
 658
 659    def deploy_connection(
 660        self,
 661        connection_name: str,
 662        *,
 663        source: CloudSource | str,
 664        selected_streams: list[str],
 665        destination: CloudDestination | str,
 666        table_prefix: str | None = None,
 667    ) -> CloudConnection:
 668        """Create a new connection between an already deployed source and destination.
 669
 670        Returns the newly deployed connection object.
 671
 672        Args:
 673            connection_name: The name of the connection.
 674            source: The deployed source. You can pass a source ID or a CloudSource object.
 675            destination: The deployed destination. You can pass a destination ID or a
 676                CloudDestination object.
 677            table_prefix: Optional. The table prefix to use when syncing to the destination.
 678            selected_streams: The selected stream names to sync within the connection.
 679        """
 680        if not selected_streams:
 681            raise exc.PyAirbyteInputError(
 682                guidance="You must provide `selected_streams` when creating a connection."
 683            )
 684
 685        source_id: str = source if isinstance(source, str) else source.connector_id
 686        destination_id: str = (
 687            destination if isinstance(destination, str) else destination.connector_id
 688        )
 689
 690        deployed_connection = api_util.create_connection(
 691            name=connection_name,
 692            source_id=source_id,
 693            destination_id=destination_id,
 694            api_root=self.api_root,
 695            workspace_id=self.workspace_id,
 696            selected_stream_names=selected_streams,
 697            prefix=table_prefix or "",
 698            client_id=self.client_id,
 699            client_secret=self.client_secret,
 700            bearer_token=self.bearer_token,
 701        )
 702
 703        return CloudConnection(
 704            workspace=self,
 705            connection_id=deployed_connection.connection_id,
 706            source=deployed_connection.source_id,
 707            destination=deployed_connection.destination_id,
 708        )
 709
 710    def permanently_delete_connection(
 711        self,
 712        connection: str | CloudConnection,
 713        *,
 714        cascade_delete_source: bool = False,
 715        cascade_delete_destination: bool = False,
 716        safe_mode: bool = True,
 717    ) -> None:
 718        """Delete a deployed connection from the workspace.
 719
 720        Args:
 721            connection: The connection ID or CloudConnection object to delete
 722            cascade_delete_source: If True, also delete the source after deleting the connection
 723            cascade_delete_destination: If True, also delete the destination after deleting
 724                the connection
 725            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
 726                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
 727                to cascade deletes.
 728        """
 729        if connection is None:
 730            raise ValueError("No connection ID provided.")
 731
 732        if isinstance(connection, str):
 733            connection = CloudConnection(
 734                workspace=self,
 735                connection_id=connection,
 736            )
 737
 738        api_util.delete_connection(
 739            connection_id=connection.connection_id,
 740            connection_name=connection.name,
 741            api_root=self.api_root,
 742            workspace_id=self.workspace_id,
 743            client_id=self.client_id,
 744            client_secret=self.client_secret,
 745            bearer_token=self.bearer_token,
 746            safe_mode=safe_mode,
 747        )
 748
 749        if cascade_delete_source:
 750            self.permanently_delete_source(
 751                source=connection.source_id,
 752                safe_mode=safe_mode,
 753            )
 754        if cascade_delete_destination:
 755            self.permanently_delete_destination(
 756                destination=connection.destination_id,
 757                safe_mode=safe_mode,
 758            )
 759
 760    # List sources, destinations, and connections
 761
 762    def list_connections(
 763        self,
 764        name: str | None = None,
 765        *,
 766        name_filter: Callable | None = None,
 767    ) -> list[CloudConnection]:
 768        """List connections by name in the workspace.
 769
 770        TODO: Add pagination support
 771        """
 772        connections = api_util.list_connections(
 773            api_root=self.api_root,
 774            workspace_id=self.workspace_id,
 775            name=name,
 776            name_filter=name_filter,
 777            client_id=self.client_id,
 778            client_secret=self.client_secret,
 779            bearer_token=self.bearer_token,
 780        )
 781        return [
 782            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
 783                workspace=self,
 784                connection_response=connection,
 785            )
 786            for connection in connections
 787            if name is None or connection.name == name
 788        ]
 789
 790    def list_sources(
 791        self,
 792        name: str | None = None,
 793        *,
 794        name_filter: Callable | None = None,
 795    ) -> list[CloudSource]:
 796        """List all sources in the workspace.
 797
 798        TODO: Add pagination support
 799        """
 800        sources = api_util.list_sources(
 801            api_root=self.api_root,
 802            workspace_id=self.workspace_id,
 803            name=name,
 804            name_filter=name_filter,
 805            client_id=self.client_id,
 806            client_secret=self.client_secret,
 807            bearer_token=self.bearer_token,
 808        )
 809        return [
 810            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
 811                workspace=self,
 812                source_response=source,
 813            )
 814            for source in sources
 815            if name is None or source.name == name
 816        ]
 817
 818    def list_destinations(
 819        self,
 820        name: str | None = None,
 821        *,
 822        name_filter: Callable | None = None,
 823    ) -> list[CloudDestination]:
 824        """List all destinations in the workspace.
 825
 826        TODO: Add pagination support
 827        """
 828        destinations = api_util.list_destinations(
 829            api_root=self.api_root,
 830            workspace_id=self.workspace_id,
 831            name=name,
 832            name_filter=name_filter,
 833            client_id=self.client_id,
 834            client_secret=self.client_secret,
 835            bearer_token=self.bearer_token,
 836        )
 837        return [
 838            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
 839                workspace=self,
 840                destination_response=destination,
 841            )
 842            for destination in destinations
 843            if name is None or destination.name == name
 844        ]
 845
 846    def publish_custom_source_definition(
 847        self,
 848        name: str,
 849        *,
 850        manifest_yaml: dict[str, Any] | Path | str | None = None,
 851        docker_image: str | None = None,
 852        docker_tag: str | None = None,
 853        unique: bool = True,
 854        pre_validate: bool = True,
 855        testing_values: dict[str, Any] | None = None,
 856    ) -> CustomCloudSourceDefinition:
 857        """Publish a custom source connector definition.
 858
 859        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
 860        and docker_tag (for Docker connectors), but not both.
 861
 862        Args:
 863            name: Display name for the connector definition
 864            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
 865            docker_image: Docker repository (e.g., 'airbyte/source-custom')
 866            docker_tag: Docker image tag (e.g., '1.0.0')
 867            unique: Whether to enforce name uniqueness
 868            pre_validate: Whether to validate manifest client-side (YAML only)
 869            testing_values: Optional configuration values to use for testing in the
 870                Connector Builder UI. If provided, these values are stored as the complete
 871                testing values object for the connector builder project (replaces any existing
 872                values), allowing immediate test read operations.
 873
 874        Returns:
 875            CustomCloudSourceDefinition object representing the created definition
 876
 877        Raises:
 878            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
 879            AirbyteDuplicateResourcesError: If unique=True and name already exists
 880        """
 881        is_yaml = manifest_yaml is not None
 882        is_docker = docker_image is not None
 883
 884        if is_yaml == is_docker:
 885            raise exc.PyAirbyteInputError(
 886                message=(
 887                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
 888                    "docker_image + docker_tag (for Docker connectors), but not both"
 889                ),
 890                context={
 891                    "manifest_yaml_provided": is_yaml,
 892                    "docker_image_provided": is_docker,
 893                },
 894            )
 895
 896        if is_docker and docker_tag is None:
 897            raise exc.PyAirbyteInputError(
 898                message="docker_tag is required when docker_image is specified",
 899                context={"docker_image": docker_image},
 900            )
 901
 902        if unique:
 903            existing = self.list_custom_source_definitions(
 904                definition_type="yaml" if is_yaml else "docker",
 905            )
 906            if any(d.name == name for d in existing):
 907                raise exc.AirbyteDuplicateResourcesError(
 908                    resource_type="custom_source_definition",
 909                    resource_name=name,
 910                )
 911
 912        if is_yaml:
 913            manifest_dict: dict[str, Any]
 914            if isinstance(manifest_yaml, Path):
 915                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
 916            elif isinstance(manifest_yaml, str):
 917                manifest_dict = yaml.safe_load(manifest_yaml)
 918            elif manifest_yaml is not None:
 919                manifest_dict = manifest_yaml
 920            else:
 921                raise exc.PyAirbyteInputError(
 922                    message="manifest_yaml is required for YAML connectors",
 923                    context={"name": name},
 924                )
 925
 926            if pre_validate:
 927                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
 928
 929            result = api_util.create_custom_yaml_source_definition(
 930                name=name,
 931                workspace_id=self.workspace_id,
 932                manifest=manifest_dict,
 933                api_root=self.api_root,
 934                client_id=self.client_id,
 935                client_secret=self.client_secret,
 936                bearer_token=self.bearer_token,
 937            )
 938            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
 939                self, result
 940            )
 941
 942            # Set testing values if provided
 943            if testing_values is not None:
 944                custom_definition.set_testing_values(testing_values)
 945
 946            return custom_definition
 947
 948        raise NotImplementedError(
 949            "Docker custom source definitions are not yet supported. "
 950            "Only YAML manifest-based custom sources are currently available."
 951        )
 952
 953    def list_custom_source_definitions(
 954        self,
 955        *,
 956        definition_type: Literal["yaml", "docker"],
 957    ) -> list[CustomCloudSourceDefinition]:
 958        """List custom source connector definitions.
 959
 960        Args:
 961            definition_type: Connector type to list ("yaml" or "docker"). Required.
 962
 963        Returns:
 964            List of CustomCloudSourceDefinition objects matching the specified type
 965        """
 966        if definition_type == "yaml":
 967            yaml_definitions = api_util.list_custom_yaml_source_definitions(
 968                workspace_id=self.workspace_id,
 969                api_root=self.api_root,
 970                client_id=self.client_id,
 971                client_secret=self.client_secret,
 972                bearer_token=self.bearer_token,
 973            )
 974            return [
 975                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
 976                for d in yaml_definitions
 977            ]
 978
 979        raise NotImplementedError(
 980            "Docker custom source definitions are not yet supported. "
 981            "Only YAML manifest-based custom sources are currently available."
 982        )
 983
 984    def get_custom_source_definition(
 985        self,
 986        definition_id: str,
 987        *,
 988        definition_type: Literal["yaml", "docker"],
 989    ) -> CustomCloudSourceDefinition:
 990        """Get a specific custom source definition by ID.
 991
 992        Args:
 993            definition_id: The definition ID
 994            definition_type: Connector type ("yaml" or "docker"). Required.
 995
 996        Returns:
 997            CustomCloudSourceDefinition object
 998        """
 999        if definition_type == "yaml":
1000            result = api_util.get_custom_yaml_source_definition(
1001                workspace_id=self.workspace_id,
1002                definition_id=definition_id,
1003                api_root=self.api_root,
1004                client_id=self.client_id,
1005                client_secret=self.client_secret,
1006                bearer_token=self.bearer_token,
1007            )
1008            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1009
1010        raise NotImplementedError(
1011            "Docker custom source definitions are not yet supported. "
1012            "Only YAML manifest-based custom sources are currently available."
1013        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1', bearer_token: airbyte.secrets.SecretString | None = None)
workspace_id: str
client_id: airbyte.secrets.SecretString | None = None
client_secret: airbyte.secrets.SecretString | None = None
api_root: str = 'https://api.airbyte.com/v1'
bearer_token: airbyte.secrets.SecretString | None = None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None) -> CloudWorkspace:
266    @classmethod
267    def from_env(
268        cls,
269        workspace_id: str | None = None,
270        *,
271        api_root: str | None = None,
272    ) -> CloudWorkspace:
273        """Create a CloudWorkspace using credentials from environment variables.
274
275        This factory method resolves credentials from environment variables,
276        providing a convenient way to create a workspace without explicitly
277        passing credentials.
278
279        Two authentication methods are supported (mutually exclusive):
280        1. Bearer token (checked first)
281        2. OAuth2 client credentials (fallback)
282
283        Environment variables used:
284            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
285            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
286            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
287            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
288            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
289
290        Args:
291            workspace_id: The workspace ID. If not provided, will be resolved from
292                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
293            api_root: The API root URL. If not provided, will be resolved from
294                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
295                the Airbyte Cloud API.
296
297        Returns:
298            A CloudWorkspace instance configured with credentials from the environment.
299
300        Raises:
301            PyAirbyteSecretNotFoundError: If required credentials are not found in
302                the environment.
303
304        Example:
305            ```python
306            # With workspace_id from environment
307            workspace = CloudWorkspace.from_env()
308
309            # With explicit workspace_id
310            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
311            ```
312        """
313        resolved_api_root = resolve_cloud_api_url(api_root)
314
315        # Try bearer token first
316        bearer_token = resolve_cloud_bearer_token()
317        if bearer_token:
318            return cls(
319                workspace_id=resolve_cloud_workspace_id(workspace_id),
320                bearer_token=bearer_token,
321                api_root=resolved_api_root,
322            )
323
324        # Fall back to client credentials
325        return cls(
326            workspace_id=resolve_cloud_workspace_id(workspace_id),
327            client_id=resolve_cloud_client_id(),
328            client_secret=resolve_cloud_client_secret(),
329            api_root=resolved_api_root,
330        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
332    @property
333    def workspace_url(self) -> str | None:
334        """The web URL of the workspace."""
335        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> CloudOrganization | None:
369    def get_organization(
370        self,
371        *,
372        raise_on_error: bool = True,
373    ) -> CloudOrganization | None:
374        """Get the organization this workspace belongs to.
375
376        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
377        which may not be available with workspace-scoped credentials.
378
379        Args:
380            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
381                If False, returns None instead of raising.
382
383        Returns:
384            CloudOrganization object with organization_id and organization_name,
385            or None if raise_on_error=False and an error occurred.
386
387        Raises:
388            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
389                (e.g., due to insufficient permissions or missing data).
390        """
391        try:
392            info = self._organization_info
393        except (AirbyteError, NotImplementedError):
394            if raise_on_error:
395                raise
396            return None
397
398        organization_id = info.get("organizationId")
399        organization_name = info.get("organizationName")
400
401        # Validate that both organization_id and organization_name are non-null and non-empty
402        if not organization_id or not organization_name:
403            if raise_on_error:
404                raise AirbyteError(
405                    message="Organization info is incomplete.",
406                    context={
407                        "organization_id": organization_id,
408                        "organization_name": organization_name,
409                    },
410                )
411            return None
412
413        return CloudOrganization(
414            organization_id=organization_id,
415            organization_name=organization_name,
416            api_root=self.api_root,
417            client_id=self.client_id,
418            client_secret=self.client_secret,
419            bearer_token=self.bearer_token,
420        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
424    def connect(self) -> None:
425        """Check that the workspace is reachable and raise an exception otherwise.
426
427        Note: It is not necessary to call this method before calling other operations. It
428              serves primarily as a simple check to ensure that the workspace is reachable
429              and credentials are correct.
430        """
431        _ = api_util.get_workspace(
432            api_root=self.api_root,
433            workspace_id=self.workspace_id,
434            client_id=self.client_id,
435            client_secret=self.client_secret,
436            bearer_token=self.bearer_token,
437        )
438        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
442    def get_connection(
443        self,
444        connection_id: str,
445    ) -> CloudConnection:
446        """Get a connection by ID.
447
448        This method does not fetch data from the API. It returns a `CloudConnection` object,
449        which will be loaded lazily as needed.
450        """
451        return CloudConnection(
452            workspace=self,
453            connection_id=connection_id,
454        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
456    def get_source(
457        self,
458        source_id: str,
459    ) -> CloudSource:
460        """Get a source by ID.
461
462        This method does not fetch data from the API. It returns a `CloudSource` object,
463        which will be loaded lazily as needed.
464        """
465        return CloudSource(
466            workspace=self,
467            connector_id=source_id,
468        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
470    def get_destination(
471        self,
472        destination_id: str,
473    ) -> CloudDestination:
474        """Get a destination by ID.
475
476        This method does not fetch data from the API. It returns a `CloudDestination` object,
477        which will be loaded lazily as needed.
478        """
479        return CloudDestination(
480            workspace=self,
481            connector_id=destination_id,
482        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
486    def deploy_source(
487        self,
488        name: str,
489        source: Source,
490        *,
491        unique: bool = True,
492        random_name_suffix: bool = False,
493    ) -> CloudSource:
494        """Deploy a source to the workspace.
495
496        Returns the newly deployed source.
497
498        Args:
499            name: The name to use when deploying.
500            source: The source object to deploy.
501            unique: Whether to require a unique name. If `True`, duplicate names
502                are not allowed. Defaults to `True`.
503            random_name_suffix: Whether to append a random suffix to the name.
504        """
505        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
506        source_config_dict["sourceType"] = source.name.replace("source-", "")
507
508        if random_name_suffix:
509            name += f" (ID: {text_util.generate_random_suffix()})"
510
511        if unique:
512            existing = self.list_sources(name=name)
513            if existing:
514                raise exc.AirbyteDuplicateResourcesError(
515                    resource_type="source",
516                    resource_name=name,
517                )
518
519        deployed_source = api_util.create_source(
520            name=name,
521            api_root=self.api_root,
522            workspace_id=self.workspace_id,
523            config=source_config_dict,
524            client_id=self.client_id,
525            client_secret=self.client_secret,
526            bearer_token=self.bearer_token,
527        )
528        return CloudSource(
529            workspace=self,
530            connector_id=deployed_source.source_id,
531        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
533    def deploy_destination(
534        self,
535        name: str,
536        destination: Destination | dict[str, Any],
537        *,
538        unique: bool = True,
539        random_name_suffix: bool = False,
540    ) -> CloudDestination:
541        """Deploy a destination to the workspace.
542
543        Returns the newly deployed destination ID.
544
545        Args:
546            name: The name to use when deploying.
547            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
548                dictionary of configuration values.
549            unique: Whether to require a unique name. If `True`, duplicate names
550                are not allowed. Defaults to `True`.
551            random_name_suffix: Whether to append a random suffix to the name.
552        """
553        if isinstance(destination, Destination):
554            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
555            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
556            # raise ValueError(destination_conf_dict)
557        else:
558            destination_conf_dict = destination.copy()
559            if "destinationType" not in destination_conf_dict:
560                raise exc.PyAirbyteInputError(
561                    message="Missing `destinationType` in configuration dictionary.",
562                )
563
564        if random_name_suffix:
565            name += f" (ID: {text_util.generate_random_suffix()})"
566
567        if unique:
568            existing = self.list_destinations(name=name)
569            if existing:
570                raise exc.AirbyteDuplicateResourcesError(
571                    resource_type="destination",
572                    resource_name=name,
573                )
574
575        deployed_destination = api_util.create_destination(
576            name=name,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            config=destination_conf_dict,  # Wants a dataclass but accepts dict
580            client_id=self.client_id,
581            client_secret=self.client_secret,
582            bearer_token=self.bearer_token,
583        )
584        return CloudDestination(
585            workspace=self,
586            connector_id=deployed_destination.destination_id,
587        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
589    def permanently_delete_source(
590        self,
591        source: str | CloudSource,
592        *,
593        safe_mode: bool = True,
594    ) -> None:
595        """Delete a source from the workspace.
596
597        You can pass either the source ID `str` or a deployed `Source` object.
598
599        Args:
600            source: The source ID or CloudSource object to delete
601            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
602                (case insensitive) to prevent accidental deletion. Defaults to True.
603        """
604        if not isinstance(source, (str, CloudSource)):
605            raise exc.PyAirbyteInputError(
606                message="Invalid source type.",
607                input_value=type(source).__name__,
608            )
609
610        api_util.delete_source(
611            source_id=source.connector_id if isinstance(source, CloudSource) else source,
612            source_name=source.name if isinstance(source, CloudSource) else None,
613            api_root=self.api_root,
614            client_id=self.client_id,
615            client_secret=self.client_secret,
616            bearer_token=self.bearer_token,
617            safe_mode=safe_mode,
618        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
622    def permanently_delete_destination(
623        self,
624        destination: str | CloudDestination,
625        *,
626        safe_mode: bool = True,
627    ) -> None:
628        """Delete a deployed destination from the workspace.
629
630        You can pass either the `Cache` class or the deployed destination ID as a `str`.
631
632        Args:
633            destination: The destination ID or CloudDestination object to delete
634            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
635                (case insensitive) to prevent accidental deletion. Defaults to True.
636        """
637        if not isinstance(destination, (str, CloudDestination)):
638            raise exc.PyAirbyteInputError(
639                message="Invalid destination type.",
640                input_value=type(destination).__name__,
641            )
642
643        api_util.delete_destination(
644            destination_id=(
645                destination if isinstance(destination, str) else destination.destination_id
646            ),
647            destination_name=(
648                destination.name if isinstance(destination, CloudDestination) else None
649            ),
650            api_root=self.api_root,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654            safe_mode=safe_mode,
655        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
659    def deploy_connection(
660        self,
661        connection_name: str,
662        *,
663        source: CloudSource | str,
664        selected_streams: list[str],
665        destination: CloudDestination | str,
666        table_prefix: str | None = None,
667    ) -> CloudConnection:
668        """Create a new connection between an already deployed source and destination.
669
670        Returns the newly deployed connection object.
671
672        Args:
673            connection_name: The name of the connection.
674            source: The deployed source. You can pass a source ID or a CloudSource object.
675            destination: The deployed destination. You can pass a destination ID or a
676                CloudDestination object.
677            table_prefix: Optional. The table prefix to use when syncing to the destination.
678            selected_streams: The selected stream names to sync within the connection.
679        """
680        if not selected_streams:
681            raise exc.PyAirbyteInputError(
682                guidance="You must provide `selected_streams` when creating a connection."
683            )
684
685        source_id: str = source if isinstance(source, str) else source.connector_id
686        destination_id: str = (
687            destination if isinstance(destination, str) else destination.connector_id
688        )
689
690        deployed_connection = api_util.create_connection(
691            name=connection_name,
692            source_id=source_id,
693            destination_id=destination_id,
694            api_root=self.api_root,
695            workspace_id=self.workspace_id,
696            selected_stream_names=selected_streams,
697            prefix=table_prefix or "",
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701        )
702
703        return CloudConnection(
704            workspace=self,
705            connection_id=deployed_connection.connection_id,
706            source=deployed_connection.source_id,
707            destination=deployed_connection.destination_id,
708        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
710    def permanently_delete_connection(
711        self,
712        connection: str | CloudConnection,
713        *,
714        cascade_delete_source: bool = False,
715        cascade_delete_destination: bool = False,
716        safe_mode: bool = True,
717    ) -> None:
718        """Delete a deployed connection from the workspace.
719
720        Args:
721            connection: The connection ID or CloudConnection object to delete
722            cascade_delete_source: If True, also delete the source after deleting the connection
723            cascade_delete_destination: If True, also delete the destination after deleting
724                the connection
725            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
726                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
727                to cascade deletes.
728        """
729        if connection is None:
730            raise ValueError("No connection ID provided.")
731
732        if isinstance(connection, str):
733            connection = CloudConnection(
734                workspace=self,
735                connection_id=connection,
736            )
737
738        api_util.delete_connection(
739            connection_id=connection.connection_id,
740            connection_name=connection.name,
741            api_root=self.api_root,
742            workspace_id=self.workspace_id,
743            client_id=self.client_id,
744            client_secret=self.client_secret,
745            bearer_token=self.bearer_token,
746            safe_mode=safe_mode,
747        )
748
749        if cascade_delete_source:
750            self.permanently_delete_source(
751                source=connection.source_id,
752                safe_mode=safe_mode,
753            )
754        if cascade_delete_destination:
755            self.permanently_delete_destination(
756                destination=connection.destination_id,
757                safe_mode=safe_mode,
758            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.CloudConnection]:
762    def list_connections(
763        self,
764        name: str | None = None,
765        *,
766        name_filter: Callable | None = None,
767    ) -> list[CloudConnection]:
768        """List connections by name in the workspace.
769
770        TODO: Add pagination support
771        """
772        connections = api_util.list_connections(
773            api_root=self.api_root,
774            workspace_id=self.workspace_id,
775            name=name,
776            name_filter=name_filter,
777            client_id=self.client_id,
778            client_secret=self.client_secret,
779            bearer_token=self.bearer_token,
780        )
781        return [
782            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
783                workspace=self,
784                connection_response=connection,
785            )
786            for connection in connections
787            if name is None or connection.name == name
788        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
790    def list_sources(
791        self,
792        name: str | None = None,
793        *,
794        name_filter: Callable | None = None,
795    ) -> list[CloudSource]:
796        """List all sources in the workspace.
797
798        TODO: Add pagination support
799        """
800        sources = api_util.list_sources(
801            api_root=self.api_root,
802            workspace_id=self.workspace_id,
803            name=name,
804            name_filter=name_filter,
805            client_id=self.client_id,
806            client_secret=self.client_secret,
807            bearer_token=self.bearer_token,
808        )
809        return [
810            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
811                workspace=self,
812                source_response=source,
813            )
814            for source in sources
815            if name is None or source.name == name
816        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
818    def list_destinations(
819        self,
820        name: str | None = None,
821        *,
822        name_filter: Callable | None = None,
823    ) -> list[CloudDestination]:
824        """List all destinations in the workspace.
825
826        TODO: Add pagination support
827        """
828        destinations = api_util.list_destinations(
829            api_root=self.api_root,
830            workspace_id=self.workspace_id,
831            name=name,
832            name_filter=name_filter,
833            client_id=self.client_id,
834            client_secret=self.client_secret,
835            bearer_token=self.bearer_token,
836        )
837        return [
838            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
839                workspace=self,
840                destination_response=destination,
841            )
842            for destination in destinations
843            if name is None or destination.name == name
844        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
846    def publish_custom_source_definition(
847        self,
848        name: str,
849        *,
850        manifest_yaml: dict[str, Any] | Path | str | None = None,
851        docker_image: str | None = None,
852        docker_tag: str | None = None,
853        unique: bool = True,
854        pre_validate: bool = True,
855        testing_values: dict[str, Any] | None = None,
856    ) -> CustomCloudSourceDefinition:
857        """Publish a custom source connector definition.
858
859        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
860        and docker_tag (for Docker connectors), but not both.
861
862        Args:
863            name: Display name for the connector definition
864            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
865            docker_image: Docker repository (e.g., 'airbyte/source-custom')
866            docker_tag: Docker image tag (e.g., '1.0.0')
867            unique: Whether to enforce name uniqueness
868            pre_validate: Whether to validate manifest client-side (YAML only)
869            testing_values: Optional configuration values to use for testing in the
870                Connector Builder UI. If provided, these values are stored as the complete
871                testing values object for the connector builder project (replaces any existing
872                values), allowing immediate test read operations.
873
874        Returns:
875            CustomCloudSourceDefinition object representing the created definition
876
877        Raises:
878            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
879            AirbyteDuplicateResourcesError: If unique=True and name already exists
880        """
881        is_yaml = manifest_yaml is not None
882        is_docker = docker_image is not None
883
884        if is_yaml == is_docker:
885            raise exc.PyAirbyteInputError(
886                message=(
887                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
888                    "docker_image + docker_tag (for Docker connectors), but not both"
889                ),
890                context={
891                    "manifest_yaml_provided": is_yaml,
892                    "docker_image_provided": is_docker,
893                },
894            )
895
896        if is_docker and docker_tag is None:
897            raise exc.PyAirbyteInputError(
898                message="docker_tag is required when docker_image is specified",
899                context={"docker_image": docker_image},
900            )
901
902        if unique:
903            existing = self.list_custom_source_definitions(
904                definition_type="yaml" if is_yaml else "docker",
905            )
906            if any(d.name == name for d in existing):
907                raise exc.AirbyteDuplicateResourcesError(
908                    resource_type="custom_source_definition",
909                    resource_name=name,
910                )
911
912        if is_yaml:
913            manifest_dict: dict[str, Any]
914            if isinstance(manifest_yaml, Path):
915                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
916            elif isinstance(manifest_yaml, str):
917                manifest_dict = yaml.safe_load(manifest_yaml)
918            elif manifest_yaml is not None:
919                manifest_dict = manifest_yaml
920            else:
921                raise exc.PyAirbyteInputError(
922                    message="manifest_yaml is required for YAML connectors",
923                    context={"name": name},
924                )
925
926            if pre_validate:
927                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
928
929            result = api_util.create_custom_yaml_source_definition(
930                name=name,
931                workspace_id=self.workspace_id,
932                manifest=manifest_dict,
933                api_root=self.api_root,
934                client_id=self.client_id,
935                client_secret=self.client_secret,
936                bearer_token=self.bearer_token,
937            )
938            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
939                self, result
940            )
941
942            # Set testing values if provided
943            if testing_values is not None:
944                custom_definition.set_testing_values(testing_values)
945
946            return custom_definition
947
948        raise NotImplementedError(
949            "Docker custom source definitions are not yet supported. "
950            "Only YAML manifest-based custom sources are currently available."
951        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
953    def list_custom_source_definitions(
954        self,
955        *,
956        definition_type: Literal["yaml", "docker"],
957    ) -> list[CustomCloudSourceDefinition]:
958        """List custom source connector definitions.
959
960        Args:
961            definition_type: Connector type to list ("yaml" or "docker"). Required.
962
963        Returns:
964            List of CustomCloudSourceDefinition objects matching the specified type
965        """
966        if definition_type == "yaml":
967            yaml_definitions = api_util.list_custom_yaml_source_definitions(
968                workspace_id=self.workspace_id,
969                api_root=self.api_root,
970                client_id=self.client_id,
971                client_secret=self.client_secret,
972                bearer_token=self.bearer_token,
973            )
974            return [
975                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
976                for d in yaml_definitions
977            ]
978
979        raise NotImplementedError(
980            "Docker custom source definitions are not yet supported. "
981            "Only YAML manifest-based custom sources are currently available."
982        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
 984    def get_custom_source_definition(
 985        self,
 986        definition_id: str,
 987        *,
 988        definition_type: Literal["yaml", "docker"],
 989    ) -> CustomCloudSourceDefinition:
 990        """Get a specific custom source definition by ID.
 991
 992        Args:
 993            definition_id: The definition ID
 994            definition_type: Connector type ("yaml" or "docker"). Required.
 995
 996        Returns:
 997            CustomCloudSourceDefinition object
 998        """
 999        if definition_type == "yaml":
1000            result = api_util.get_custom_yaml_source_definition(
1001                workspace_id=self.workspace_id,
1002                definition_id=definition_id,
1003                api_root=self.api_root,
1004                client_id=self.client_id,
1005                client_secret=self.client_secret,
1006                bearer_token=self.bearer_token,
1007            )
1008            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1009
1010        raise NotImplementedError(
1011            "Docker custom source definitions are not yet supported. "
1012            "Only YAML manifest-based custom sources are currently available."
1013        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object