airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6
  7## Usage Examples
  8
  9Get a new workspace object and deploy a source to it:
 10
 11```python
 12import airbyte as ab
 13from airbyte import cloud
 14
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="...",
 17    client_id="...",
 18    client_secret="...",
 19)
 20
 21# Deploy a source to the workspace
 22source = ab.get_source("source-faker", config={"count": 100})
 23deployed_source = workspace.deploy_source(
 24    name="test-source",
 25    source=source,
 26)
 27
 28# Run a check on the deployed source and raise an exception if the check fails
 29check_result = deployed_source.check(raise_on_error=True)
 30
 31# Permanently delete the newly-created source
 32workspace.permanently_delete_source(deployed_source)
 33```
 34"""
 35
 36from __future__ import annotations
 37
 38from dataclasses import dataclass, field
 39from functools import cached_property
 40from pathlib import Path
 41from typing import TYPE_CHECKING, Any, Literal, overload
 42
 43import yaml
 44
 45from airbyte import exceptions as exc
 46from airbyte._util import api_util, text_util
 47from airbyte._util.api_util import get_web_url_root
 48from airbyte.cloud._credentials import _AirbyteCredentials
 49from airbyte.cloud.client_config import CloudClientConfig
 50from airbyte.cloud.connections import CloudConnection
 51from airbyte.cloud.connectors import (
 52    CloudDestination,
 53    CloudSource,
 54    CustomCloudSourceDefinition,
 55)
 56from airbyte.cloud.models import CloudWorkspaceInfo
 57from airbyte.cloud.organizations import CloudOrganization
 58from airbyte.destinations.base import Destination
 59from airbyte.exceptions import AirbyteError
 60
 61
 62if TYPE_CHECKING:
 63    from collections.abc import Callable
 64
 65    from airbyte.secrets.base import SecretString
 66    from airbyte.sources.base import Source
 67
 68
 69@dataclass(init=False, kw_only=True)  # noqa: PLR0904  # Core cloud API facade.
 70class CloudWorkspace:
 71    """A remote workspace on the Airbyte Cloud.
 72
 73    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 74    instances, both OSS and Enterprise.
 75
 76    Two authentication methods are supported (mutually exclusive):
 77    1. OAuth2 client credentials (client_id + client_secret)
 78    2. Bearer token authentication
 79
 80    Example with client credentials:
 81        ```python
 82        workspace = CloudWorkspace(
 83            workspace_id="...",
 84            client_id="...",
 85            client_secret="...",
 86        )
 87        ```
 88
 89    Example with bearer token:
 90        ```python
 91        workspace = CloudWorkspace(
 92            workspace_id="...",
 93            bearer_token="...",
 94        )
 95        ```
 96    """
 97
 98    workspace_id: str
 99    client_id: SecretString | None
100    client_secret: SecretString | None
101    api_root: str
102    config_api_root: str | None
103    """The Config API root URL."""
104    bearer_token: SecretString | None
105
106    # Internal credentials objects (set in __init__, excluded from repr)
107    _credentials: _AirbyteCredentials = field(init=False, repr=False)
108    _client_config: CloudClientConfig = field(init=False, repr=False)
109
110    def __init__(
111        self,
112        *,
113        workspace_id: str | None = None,
114        client_id: str | SecretString | None = None,
115        client_secret: str | SecretString | None = None,
116        api_root: str | None = None,
117        config_api_root: str | None = None,
118        bearer_token: str | SecretString | None = None,
119    ) -> None:
120        """Validate and initialize credentials."""
121        env_vars = not (client_id or client_secret or bearer_token)
122        credentials = _AirbyteCredentials.from_auth(
123            workspace_id=workspace_id,
124            client_id=client_id,
125            client_secret=client_secret,
126            bearer_token=bearer_token,
127            public_api_root=api_root,
128            config_api_root=config_api_root,
129            env_vars=env_vars,
130        )
131        if not credentials.workspace_id:
132            raise exc.PyAirbyteInputError(
133                message="Workspace ID is required.",
134                guidance="Provide a workspace ID.",
135            )
136
137        self._credentials = credentials
138        self.workspace_id = credentials.workspace_id or ""
139        self.client_id = credentials.client_id
140        self.client_secret = credentials.client_secret
141        self.bearer_token = credentials.bearer_token
142        self.api_root = credentials.public_api_root
143        self.config_api_root = credentials.config_api_root
144
145        # Create internal CloudClientConfig object (validates mutual exclusivity)
146        self._client_config = CloudClientConfig(
147            client_id=self.client_id,
148            client_secret=self.client_secret,
149            bearer_token=self.bearer_token,
150            api_root=self.api_root,
151            config_api_root=self.config_api_root,
152        )
153
154    @classmethod
155    def from_env(
156        cls,
157        workspace_id: str | None = None,
158        *,
159        api_root: str | None = None,
160        config_api_root: str | None = None,
161    ) -> CloudWorkspace:
162        """Create a CloudWorkspace using credentials from environment variables.
163
164        This factory method resolves credentials from environment variables,
165        providing a convenient way to create a workspace without explicitly
166        passing credentials.
167
168        Two authentication methods are supported (mutually exclusive):
169        1. Bearer token (checked first)
170        2. OAuth2 client credentials (fallback)
171
172        Environment variables used:
173            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
174            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
175            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
176            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
177            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
178            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
179
180        Args:
181            workspace_id: The workspace ID. If not provided, will be resolved from
182                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
183            api_root: The API root URL. If not provided, will be resolved from
184                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
185                the Airbyte Cloud API.
186            config_api_root: The Config API root URL. If not provided, will be resolved
187                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
188
189        Returns:
190            A CloudWorkspace instance configured with credentials from the environment.
191
192        Raises:
193            PyAirbyteInputError: If required credentials are not found in
194                the environment or are incomplete.
195
196        Example:
197            ```python
198            # With workspace_id from environment
199            workspace = CloudWorkspace.from_env()
200
201            # With explicit workspace_id
202            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
203            ```
204        """
205        return cls(
206            workspace_id=workspace_id,
207            api_root=api_root,
208            config_api_root=config_api_root,
209        )
210
211    @property
212    def workspace_url(self) -> str | None:
213        """The web URL of the workspace."""
214        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
215
216    @cached_property
217    def _organization_info(self) -> dict[str, Any]:
218        """Fetch and cache organization info for this workspace.
219
220        Uses the Config API endpoint for an efficient O(1) lookup.
221        This is an internal method; use get_organization() for public access.
222        """
223        return api_util.get_workspace_organization_info(
224            workspace_id=self.workspace_id,
225            api_root=self.api_root,
226            config_api_root=self.config_api_root,
227            client_id=self.client_id,
228            client_secret=self.client_secret,
229            bearer_token=self.bearer_token,
230        )
231
232    @overload
233    def get_organization(self) -> CloudOrganization: ...
234
235    @overload
236    def get_organization(
237        self,
238        *,
239        raise_on_error: Literal[True],
240    ) -> CloudOrganization: ...
241
242    @overload
243    def get_organization(
244        self,
245        *,
246        raise_on_error: Literal[False],
247    ) -> CloudOrganization | None: ...
248
249    def get_organization(
250        self,
251        *,
252        raise_on_error: bool = True,
253    ) -> CloudOrganization | None:
254        """Get the organization this workspace belongs to.
255
256        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
257        which may not be available with workspace-scoped credentials.
258
259        Args:
260            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
261                If False, returns None instead of raising.
262
263        Returns:
264            CloudOrganization object with organization_id and organization_name,
265            or None if raise_on_error=False and an error occurred.
266
267        Raises:
268            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
269                (e.g., due to insufficient permissions or missing data).
270        """
271        try:
272            info = self._organization_info
273        except (AirbyteError, NotImplementedError):
274            if raise_on_error:
275                raise
276            return None
277
278        organization_id = info.get("organizationId")
279        organization_name = info.get("organizationName")
280
281        # Validate that both organization_id and organization_name are non-null and non-empty
282        if not organization_id or not organization_name:
283            if raise_on_error:
284                raise AirbyteError(
285                    message="Organization info is incomplete.",
286                    context={
287                        "organization_id": organization_id,
288                        "organization_name": organization_name,
289                    },
290                )
291            return None
292
293        organization_credentials = self._credentials.with_organization_id(organization_id)
294        return CloudOrganization(
295            organization_id=organization_id,
296            organization_name=organization_name,
297            client_id=organization_credentials.client_id,
298            client_secret=organization_credentials.client_secret,
299            bearer_token=organization_credentials.bearer_token,
300            public_api_root=organization_credentials.public_api_root,
301            config_api_root=organization_credentials.config_api_root,
302        )
303
304    # Test connection and creds
305
306    def connect(self) -> None:
307        """Check that the workspace is reachable and raise an exception otherwise.
308
309        Note: It is not necessary to call this method before calling other operations. It
310              serves primarily as a simple check to ensure that the workspace is reachable
311              and credentials are correct.
312        """
313        _ = api_util.get_workspace(
314            api_root=self.api_root,
315            workspace_id=self.workspace_id,
316            client_id=self.client_id,
317            client_secret=self.client_secret,
318            bearer_token=self.bearer_token,
319        )
320        print(f"Successfully connected to workspace: {self.workspace_url}")
321
322    # Get sources, destinations, and connections
323
324    def get_connection(
325        self,
326        connection_id: str,
327    ) -> CloudConnection:
328        """Get a connection by ID.
329
330        This method does not fetch data from the API. It returns a `CloudConnection` object,
331        which will be loaded lazily as needed.
332        """
333        return CloudConnection(
334            workspace=self,
335            connection_id=connection_id,
336        )
337
338    def get_source(
339        self,
340        source_id: str,
341    ) -> CloudSource:
342        """Get a source by ID.
343
344        This method does not fetch data from the API. It returns a `CloudSource` object,
345        which will be loaded lazily as needed.
346        """
347        return CloudSource(
348            workspace=self,
349            connector_id=source_id,
350        )
351
352    def get_destination(
353        self,
354        destination_id: str,
355    ) -> CloudDestination:
356        """Get a destination by ID.
357
358        This method does not fetch data from the API. It returns a `CloudDestination` object,
359        which will be loaded lazily as needed.
360        """
361        return CloudDestination(
362            workspace=self,
363            connector_id=destination_id,
364        )
365
366    # Deploy sources and destinations
367
368    def deploy_source(
369        self,
370        name: str,
371        source: Source,
372        *,
373        unique: bool = True,
374        random_name_suffix: bool = False,
375    ) -> CloudSource:
376        """Deploy a source to the workspace.
377
378        Returns the newly deployed source.
379
380        Args:
381            name: The name to use when deploying.
382            source: The source object to deploy.
383            unique: Whether to require a unique name. If `True`, duplicate names
384                are not allowed. Defaults to `True`.
385            random_name_suffix: Whether to append a random suffix to the name.
386        """
387        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
388        source_config_dict["sourceType"] = source.name.replace("source-", "")
389
390        if random_name_suffix:
391            name += f" (ID: {text_util.generate_random_suffix()})"
392
393        if unique:
394            existing = self.list_sources(name=name)
395            if existing:
396                raise exc.AirbyteDuplicateResourcesError(
397                    resource_type="source",
398                    resource_name=name,
399                )
400
401        deployed_source = api_util.create_source(
402            name=name,
403            api_root=self.api_root,
404            workspace_id=self.workspace_id,
405            config=source_config_dict,
406            client_id=self.client_id,
407            client_secret=self.client_secret,
408            bearer_token=self.bearer_token,
409        )
410        return CloudSource(
411            workspace=self,
412            connector_id=deployed_source.source_id,
413        )
414
415    def deploy_destination(
416        self,
417        name: str,
418        destination: Destination | dict[str, Any],
419        *,
420        unique: bool = True,
421        random_name_suffix: bool = False,
422    ) -> CloudDestination:
423        """Deploy a destination to the workspace.
424
425        Returns the newly deployed destination ID.
426
427        Args:
428            name: The name to use when deploying.
429            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
430                dictionary of configuration values.
431            unique: Whether to require a unique name. If `True`, duplicate names
432                are not allowed. Defaults to `True`.
433            random_name_suffix: Whether to append a random suffix to the name.
434        """
435        if isinstance(destination, Destination):
436            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
437            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
438            # raise ValueError(destination_conf_dict)
439        else:
440            destination_conf_dict = destination.copy()
441            if "destinationType" not in destination_conf_dict:
442                raise exc.PyAirbyteInputError(
443                    message="Missing `destinationType` in configuration dictionary.",
444                )
445
446        if random_name_suffix:
447            name += f" (ID: {text_util.generate_random_suffix()})"
448
449        if unique:
450            existing = self.list_destinations(name=name)
451            if existing:
452                raise exc.AirbyteDuplicateResourcesError(
453                    resource_type="destination",
454                    resource_name=name,
455                )
456
457        deployed_destination = api_util.create_destination(
458            name=name,
459            api_root=self.api_root,
460            workspace_id=self.workspace_id,
461            config=destination_conf_dict,  # Wants a dataclass but accepts dict
462            client_id=self.client_id,
463            client_secret=self.client_secret,
464            bearer_token=self.bearer_token,
465        )
466        return CloudDestination(
467            workspace=self,
468            connector_id=deployed_destination.destination_id,
469        )
470
471    def permanently_delete_source(
472        self,
473        source: str | CloudSource,
474        *,
475        safe_mode: bool = True,
476    ) -> None:
477        """Delete a source from the workspace.
478
479        You can pass either the source ID `str` or a deployed `Source` object.
480
481        Args:
482            source: The source ID or CloudSource object to delete
483            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
484                (case insensitive) to prevent accidental deletion. Defaults to True.
485        """
486        if not isinstance(source, (str, CloudSource)):
487            raise exc.PyAirbyteInputError(
488                message="Invalid source type.",
489                input_value=type(source).__name__,
490            )
491
492        api_util.delete_source(
493            source_id=source.connector_id if isinstance(source, CloudSource) else source,
494            source_name=source.name if isinstance(source, CloudSource) else None,
495            api_root=self.api_root,
496            client_id=self.client_id,
497            client_secret=self.client_secret,
498            bearer_token=self.bearer_token,
499            safe_mode=safe_mode,
500        )
501
502    # Deploy and delete destinations
503
504    def permanently_delete_destination(
505        self,
506        destination: str | CloudDestination,
507        *,
508        safe_mode: bool = True,
509    ) -> None:
510        """Delete a deployed destination from the workspace.
511
512        You can pass either the `Cache` class or the deployed destination ID as a `str`.
513
514        Args:
515            destination: The destination ID or CloudDestination object to delete
516            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
517                (case insensitive) to prevent accidental deletion. Defaults to True.
518        """
519        if not isinstance(destination, (str, CloudDestination)):
520            raise exc.PyAirbyteInputError(
521                message="Invalid destination type.",
522                input_value=type(destination).__name__,
523            )
524
525        api_util.delete_destination(
526            destination_id=(
527                destination if isinstance(destination, str) else destination.destination_id
528            ),
529            destination_name=(
530                destination.name if isinstance(destination, CloudDestination) else None
531            ),
532            api_root=self.api_root,
533            client_id=self.client_id,
534            client_secret=self.client_secret,
535            bearer_token=self.bearer_token,
536            safe_mode=safe_mode,
537        )
538
539    # Deploy and delete connections
540
541    def deploy_connection(
542        self,
543        connection_name: str,
544        *,
545        source: CloudSource | str,
546        selected_streams: list[str],
547        destination: CloudDestination | str,
548        table_prefix: str | None = None,
549    ) -> CloudConnection:
550        """Create a new connection between an already deployed source and destination.
551
552        Returns the newly deployed connection object.
553
554        Args:
555            connection_name: The name of the connection.
556            source: The deployed source. You can pass a source ID or a CloudSource object.
557            destination: The deployed destination. You can pass a destination ID or a
558                CloudDestination object.
559            table_prefix: Optional. The table prefix to use when syncing to the destination.
560            selected_streams: The selected stream names to sync within the connection.
561        """
562        if not selected_streams:
563            raise exc.PyAirbyteInputError(
564                guidance="You must provide `selected_streams` when creating a connection."
565            )
566
567        source_id: str = source if isinstance(source, str) else source.connector_id
568        destination_id: str = (
569            destination if isinstance(destination, str) else destination.connector_id
570        )
571
572        deployed_connection = api_util.create_connection(
573            name=connection_name,
574            source_id=source_id,
575            destination_id=destination_id,
576            api_root=self.api_root,
577            workspace_id=self.workspace_id,
578            selected_stream_names=selected_streams,
579            prefix=table_prefix or "",
580            client_id=self.client_id,
581            client_secret=self.client_secret,
582            bearer_token=self.bearer_token,
583        )
584
585        return CloudConnection(
586            workspace=self,
587            connection_id=deployed_connection.connection_id,
588            source=deployed_connection.source_id,
589            destination=deployed_connection.destination_id,
590        )
591
592    def permanently_delete_connection(
593        self,
594        connection: str | CloudConnection,
595        *,
596        cascade_delete_source: bool = False,
597        cascade_delete_destination: bool = False,
598        safe_mode: bool = True,
599    ) -> None:
600        """Delete a deployed connection from the workspace.
601
602        Args:
603            connection: The connection ID or CloudConnection object to delete
604            cascade_delete_source: If True, also delete the source after deleting the connection
605            cascade_delete_destination: If True, also delete the destination after deleting
606                the connection
607            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
608                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
609                to cascade deletes.
610        """
611        if connection is None:
612            raise ValueError("No connection ID provided.")
613
614        if isinstance(connection, str):
615            connection = CloudConnection(
616                workspace=self,
617                connection_id=connection,
618            )
619
620        api_util.delete_connection(
621            connection_id=connection.connection_id,
622            connection_name=connection.name,
623            api_root=self.api_root,
624            workspace_id=self.workspace_id,
625            client_id=self.client_id,
626            client_secret=self.client_secret,
627            bearer_token=self.bearer_token,
628            safe_mode=safe_mode,
629        )
630
631        if cascade_delete_source:
632            self.permanently_delete_source(
633                source=connection.source_id,
634                safe_mode=safe_mode,
635            )
636        if cascade_delete_destination:
637            self.permanently_delete_destination(
638                destination=connection.destination_id,
639                safe_mode=safe_mode,
640            )
641
642    # List workspaces, sources, destinations, and connections
643
644    def list_workspaces(
645        self,
646        name: str | None = None,
647        *,
648        name_filter: Callable | None = None,
649        limit: int | None = None,
650    ) -> list[CloudWorkspaceInfo]:
651        """List workspaces available to the current credentials, with an optional limit."""
652        return [
653            CloudWorkspaceInfo.from_api_response(workspace)
654            for workspace in api_util.list_workspaces(
655                workspace_id="",
656                api_root=self.api_root,
657                name=name,
658                name_filter=name_filter,
659                client_id=self.client_id,
660                client_secret=self.client_secret,
661                bearer_token=self.bearer_token,
662                limit=limit,
663            )
664        ]
665
666    def rename(
667        self,
668        name: str,
669    ) -> CloudWorkspace:
670        """Rename this workspace."""
671        api_util.rename_workspace(
672            workspace_id=self.workspace_id,
673            name=name,
674            api_root=self.api_root,
675            client_id=self.client_id,
676            client_secret=self.client_secret,
677            bearer_token=self.bearer_token,
678        )
679        return self
680
681    def permanently_delete(
682        self,
683        *,
684        workspace_name: str | None = None,
685        safe_mode: bool = True,
686    ) -> None:
687        """Permanently delete this workspace if it has no connections.
688
689        When `safe_mode` is enabled, the workspace name must contain `delete-me`
690        or `deleteme`. This also checks for existing connections before deleting
691        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
692        """
693        api_util.permanently_delete_workspace(
694            workspace_id=self.workspace_id,
695            workspace_name=workspace_name,
696            api_root=self.api_root,
697            client_id=self.client_id,
698            client_secret=self.client_secret,
699            bearer_token=self.bearer_token,
700            safe_mode=safe_mode,
701        )
702
703    def list_connections(
704        self,
705        name: str | None = None,
706        *,
707        name_filter: Callable | None = None,
708        limit: int | None = None,
709    ) -> list[CloudConnection]:
710        """List connections by name in the workspace, with an optional limit."""
711        connections = api_util.list_connections(
712            api_root=self.api_root,
713            workspace_id=self.workspace_id,
714            name=name,
715            name_filter=name_filter,
716            limit=limit,
717            client_id=self.client_id,
718            client_secret=self.client_secret,
719            bearer_token=self.bearer_token,
720        )
721        return [
722            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
723                workspace=self,
724                connection_response=connection,
725            )
726            for connection in connections
727        ]
728
729    def list_sources(
730        self,
731        name: str | None = None,
732        *,
733        name_filter: Callable | None = None,
734        limit: int | None = None,
735    ) -> list[CloudSource]:
736        """List all sources in the workspace, with an optional limit."""
737        sources = api_util.list_sources(
738            api_root=self.api_root,
739            workspace_id=self.workspace_id,
740            name=name,
741            name_filter=name_filter,
742            limit=limit,
743            client_id=self.client_id,
744            client_secret=self.client_secret,
745            bearer_token=self.bearer_token,
746        )
747        return [
748            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
749                workspace=self,
750                source_response=source,
751            )
752            for source in sources
753        ]
754
755    def list_destinations(
756        self,
757        name: str | None = None,
758        *,
759        name_filter: Callable | None = None,
760        limit: int | None = None,
761    ) -> list[CloudDestination]:
762        """List all destinations in the workspace, with an optional limit."""
763        destinations = api_util.list_destinations(
764            api_root=self.api_root,
765            workspace_id=self.workspace_id,
766            name=name,
767            name_filter=name_filter,
768            limit=limit,
769            client_id=self.client_id,
770            client_secret=self.client_secret,
771            bearer_token=self.bearer_token,
772        )
773        return [
774            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
775                workspace=self,
776                destination_response=destination,
777            )
778            for destination in destinations
779        ]
780
781    def publish_custom_source_definition(
782        self,
783        name: str,
784        *,
785        manifest_yaml: dict[str, Any] | Path | str | None = None,
786        docker_image: str | None = None,
787        docker_tag: str | None = None,
788        unique: bool = True,
789        pre_validate: bool = True,
790        testing_values: dict[str, Any] | None = None,
791    ) -> CustomCloudSourceDefinition:
792        """Publish a custom source connector definition.
793
794        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
795        and docker_tag (for Docker connectors), but not both.
796
797        Args:
798            name: Display name for the connector definition
799            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
800            docker_image: Docker repository (e.g., 'airbyte/source-custom')
801            docker_tag: Docker image tag (e.g., '1.0.0')
802            unique: Whether to enforce name uniqueness
803            pre_validate: Whether to validate manifest client-side (YAML only)
804            testing_values: Optional configuration values to use for testing in the
805                Connector Builder UI. If provided, these values are stored as the complete
806                testing values object for the connector builder project (replaces any existing
807                values), allowing immediate test read operations.
808
809        Returns:
810            CustomCloudSourceDefinition object representing the created definition
811
812        Raises:
813            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
814            AirbyteDuplicateResourcesError: If unique=True and name already exists
815        """
816        is_yaml = manifest_yaml is not None
817        is_docker = docker_image is not None
818
819        if is_yaml == is_docker:
820            raise exc.PyAirbyteInputError(
821                message=(
822                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
823                    "docker_image + docker_tag (for Docker connectors), but not both"
824                ),
825                context={
826                    "manifest_yaml_provided": is_yaml,
827                    "docker_image_provided": is_docker,
828                },
829            )
830
831        if is_docker and docker_tag is None:
832            raise exc.PyAirbyteInputError(
833                message="docker_tag is required when docker_image is specified",
834                context={"docker_image": docker_image},
835            )
836
837        if unique:
838            existing = self.list_custom_source_definitions(
839                definition_type="yaml" if is_yaml else "docker",
840            )
841            if any(d.name == name for d in existing):
842                raise exc.AirbyteDuplicateResourcesError(
843                    resource_type="custom_source_definition",
844                    resource_name=name,
845                )
846
847        if is_yaml:
848            manifest_dict: dict[str, Any]
849            if isinstance(manifest_yaml, Path):
850                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
851            elif isinstance(manifest_yaml, str):
852                manifest_dict = yaml.safe_load(manifest_yaml)
853            elif manifest_yaml is not None:
854                manifest_dict = manifest_yaml
855            else:
856                raise exc.PyAirbyteInputError(
857                    message="manifest_yaml is required for YAML connectors",
858                    context={"name": name},
859                )
860
861            if pre_validate:
862                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
863
864            result = api_util.create_custom_yaml_source_definition(
865                name=name,
866                workspace_id=self.workspace_id,
867                manifest=manifest_dict,
868                api_root=self.api_root,
869                client_id=self.client_id,
870                client_secret=self.client_secret,
871                bearer_token=self.bearer_token,
872            )
873            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
874                self, result
875            )
876
877            # Set testing values if provided
878            if testing_values is not None:
879                custom_definition.set_testing_values(testing_values)
880
881            return custom_definition
882
883        raise NotImplementedError(
884            "Docker custom source definitions are not yet supported. "
885            "Only YAML manifest-based custom sources are currently available."
886        )
887
888    def list_custom_source_definitions(
889        self,
890        *,
891        definition_type: Literal["yaml", "docker"],
892    ) -> list[CustomCloudSourceDefinition]:
893        """List custom source connector definitions.
894
895        Args:
896            definition_type: Connector type to list ("yaml" or "docker"). Required.
897
898        Returns:
899            List of CustomCloudSourceDefinition objects matching the specified type
900        """
901        if definition_type == "yaml":
902            yaml_definitions = api_util.list_custom_yaml_source_definitions(
903                workspace_id=self.workspace_id,
904                api_root=self.api_root,
905                client_id=self.client_id,
906                client_secret=self.client_secret,
907                bearer_token=self.bearer_token,
908            )
909            return [
910                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
911                for d in yaml_definitions
912            ]
913
914        raise NotImplementedError(
915            "Docker custom source definitions are not yet supported. "
916            "Only YAML manifest-based custom sources are currently available."
917        )
918
919    def get_custom_source_definition(
920        self,
921        definition_id: str,
922        *,
923        definition_type: Literal["yaml", "docker"],
924    ) -> CustomCloudSourceDefinition:
925        """Get a specific custom source definition by ID.
926
927        Args:
928            definition_id: The definition ID
929            definition_type: Connector type ("yaml" or "docker"). Required.
930
931        Returns:
932            CustomCloudSourceDefinition object
933        """
934        if definition_type == "yaml":
935            result = api_util.get_custom_yaml_source_definition(
936                workspace_id=self.workspace_id,
937                definition_id=definition_id,
938                api_root=self.api_root,
939                client_id=self.client_id,
940                client_secret=self.client_secret,
941                bearer_token=self.bearer_token,
942            )
943            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
944
945        raise NotImplementedError(
946            "Docker custom source definitions are not yet supported. "
947            "Only YAML manifest-based custom sources are currently available."
948        )
@dataclass(init=False, kw_only=True)
class CloudWorkspace:
 70@dataclass(init=False, kw_only=True)  # noqa: PLR0904  # Core cloud API facade.
 71class CloudWorkspace:
 72    """A remote workspace on the Airbyte Cloud.
 73
 74    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 75    instances, both OSS and Enterprise.
 76
 77    Two authentication methods are supported (mutually exclusive):
 78    1. OAuth2 client credentials (client_id + client_secret)
 79    2. Bearer token authentication
 80
 81    Example with client credentials:
 82        ```python
 83        workspace = CloudWorkspace(
 84            workspace_id="...",
 85            client_id="...",
 86            client_secret="...",
 87        )
 88        ```
 89
 90    Example with bearer token:
 91        ```python
 92        workspace = CloudWorkspace(
 93            workspace_id="...",
 94            bearer_token="...",
 95        )
 96        ```
 97    """
 98
 99    workspace_id: str
100    client_id: SecretString | None
101    client_secret: SecretString | None
102    api_root: str
103    config_api_root: str | None
104    """The Config API root URL."""
105    bearer_token: SecretString | None
106
107    # Internal credentials objects (set in __init__, excluded from repr)
108    _credentials: _AirbyteCredentials = field(init=False, repr=False)
109    _client_config: CloudClientConfig = field(init=False, repr=False)
110
111    def __init__(
112        self,
113        *,
114        workspace_id: str | None = None,
115        client_id: str | SecretString | None = None,
116        client_secret: str | SecretString | None = None,
117        api_root: str | None = None,
118        config_api_root: str | None = None,
119        bearer_token: str | SecretString | None = None,
120    ) -> None:
121        """Validate and initialize credentials."""
122        env_vars = not (client_id or client_secret or bearer_token)
123        credentials = _AirbyteCredentials.from_auth(
124            workspace_id=workspace_id,
125            client_id=client_id,
126            client_secret=client_secret,
127            bearer_token=bearer_token,
128            public_api_root=api_root,
129            config_api_root=config_api_root,
130            env_vars=env_vars,
131        )
132        if not credentials.workspace_id:
133            raise exc.PyAirbyteInputError(
134                message="Workspace ID is required.",
135                guidance="Provide a workspace ID.",
136            )
137
138        self._credentials = credentials
139        self.workspace_id = credentials.workspace_id or ""
140        self.client_id = credentials.client_id
141        self.client_secret = credentials.client_secret
142        self.bearer_token = credentials.bearer_token
143        self.api_root = credentials.public_api_root
144        self.config_api_root = credentials.config_api_root
145
146        # Create internal CloudClientConfig object (validates mutual exclusivity)
147        self._client_config = CloudClientConfig(
148            client_id=self.client_id,
149            client_secret=self.client_secret,
150            bearer_token=self.bearer_token,
151            api_root=self.api_root,
152            config_api_root=self.config_api_root,
153        )
154
155    @classmethod
156    def from_env(
157        cls,
158        workspace_id: str | None = None,
159        *,
160        api_root: str | None = None,
161        config_api_root: str | None = None,
162    ) -> CloudWorkspace:
163        """Create a CloudWorkspace using credentials from environment variables.
164
165        This factory method resolves credentials from environment variables,
166        providing a convenient way to create a workspace without explicitly
167        passing credentials.
168
169        Two authentication methods are supported (mutually exclusive):
170        1. Bearer token (checked first)
171        2. OAuth2 client credentials (fallback)
172
173        Environment variables used:
174            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
175            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
176            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
177            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
178            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
179            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
180
181        Args:
182            workspace_id: The workspace ID. If not provided, will be resolved from
183                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
184            api_root: The API root URL. If not provided, will be resolved from
185                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
186                the Airbyte Cloud API.
187            config_api_root: The Config API root URL. If not provided, will be resolved
188                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
189
190        Returns:
191            A CloudWorkspace instance configured with credentials from the environment.
192
193        Raises:
194            PyAirbyteInputError: If required credentials are not found in
195                the environment or are incomplete.
196
197        Example:
198            ```python
199            # With workspace_id from environment
200            workspace = CloudWorkspace.from_env()
201
202            # With explicit workspace_id
203            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
204            ```
205        """
206        return cls(
207            workspace_id=workspace_id,
208            api_root=api_root,
209            config_api_root=config_api_root,
210        )
211
212    @property
213    def workspace_url(self) -> str | None:
214        """The web URL of the workspace."""
215        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
216
217    @cached_property
218    def _organization_info(self) -> dict[str, Any]:
219        """Fetch and cache organization info for this workspace.
220
221        Uses the Config API endpoint for an efficient O(1) lookup.
222        This is an internal method; use get_organization() for public access.
223        """
224        return api_util.get_workspace_organization_info(
225            workspace_id=self.workspace_id,
226            api_root=self.api_root,
227            config_api_root=self.config_api_root,
228            client_id=self.client_id,
229            client_secret=self.client_secret,
230            bearer_token=self.bearer_token,
231        )
232
233    @overload
234    def get_organization(self) -> CloudOrganization: ...
235
236    @overload
237    def get_organization(
238        self,
239        *,
240        raise_on_error: Literal[True],
241    ) -> CloudOrganization: ...
242
243    @overload
244    def get_organization(
245        self,
246        *,
247        raise_on_error: Literal[False],
248    ) -> CloudOrganization | None: ...
249
250    def get_organization(
251        self,
252        *,
253        raise_on_error: bool = True,
254    ) -> CloudOrganization | None:
255        """Get the organization this workspace belongs to.
256
257        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
258        which may not be available with workspace-scoped credentials.
259
260        Args:
261            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
262                If False, returns None instead of raising.
263
264        Returns:
265            CloudOrganization object with organization_id and organization_name,
266            or None if raise_on_error=False and an error occurred.
267
268        Raises:
269            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
270                (e.g., due to insufficient permissions or missing data).
271        """
272        try:
273            info = self._organization_info
274        except (AirbyteError, NotImplementedError):
275            if raise_on_error:
276                raise
277            return None
278
279        organization_id = info.get("organizationId")
280        organization_name = info.get("organizationName")
281
282        # Validate that both organization_id and organization_name are non-null and non-empty
283        if not organization_id or not organization_name:
284            if raise_on_error:
285                raise AirbyteError(
286                    message="Organization info is incomplete.",
287                    context={
288                        "organization_id": organization_id,
289                        "organization_name": organization_name,
290                    },
291                )
292            return None
293
294        organization_credentials = self._credentials.with_organization_id(organization_id)
295        return CloudOrganization(
296            organization_id=organization_id,
297            organization_name=organization_name,
298            client_id=organization_credentials.client_id,
299            client_secret=organization_credentials.client_secret,
300            bearer_token=organization_credentials.bearer_token,
301            public_api_root=organization_credentials.public_api_root,
302            config_api_root=organization_credentials.config_api_root,
303        )
304
305    # Test connection and creds
306
307    def connect(self) -> None:
308        """Check that the workspace is reachable and raise an exception otherwise.
309
310        Note: It is not necessary to call this method before calling other operations. It
311              serves primarily as a simple check to ensure that the workspace is reachable
312              and credentials are correct.
313        """
314        _ = api_util.get_workspace(
315            api_root=self.api_root,
316            workspace_id=self.workspace_id,
317            client_id=self.client_id,
318            client_secret=self.client_secret,
319            bearer_token=self.bearer_token,
320        )
321        print(f"Successfully connected to workspace: {self.workspace_url}")
322
323    # Get sources, destinations, and connections
324
325    def get_connection(
326        self,
327        connection_id: str,
328    ) -> CloudConnection:
329        """Get a connection by ID.
330
331        This method does not fetch data from the API. It returns a `CloudConnection` object,
332        which will be loaded lazily as needed.
333        """
334        return CloudConnection(
335            workspace=self,
336            connection_id=connection_id,
337        )
338
339    def get_source(
340        self,
341        source_id: str,
342    ) -> CloudSource:
343        """Get a source by ID.
344
345        This method does not fetch data from the API. It returns a `CloudSource` object,
346        which will be loaded lazily as needed.
347        """
348        return CloudSource(
349            workspace=self,
350            connector_id=source_id,
351        )
352
353    def get_destination(
354        self,
355        destination_id: str,
356    ) -> CloudDestination:
357        """Get a destination by ID.
358
359        This method does not fetch data from the API. It returns a `CloudDestination` object,
360        which will be loaded lazily as needed.
361        """
362        return CloudDestination(
363            workspace=self,
364            connector_id=destination_id,
365        )
366
367    # Deploy sources and destinations
368
369    def deploy_source(
370        self,
371        name: str,
372        source: Source,
373        *,
374        unique: bool = True,
375        random_name_suffix: bool = False,
376    ) -> CloudSource:
377        """Deploy a source to the workspace.
378
379        Returns the newly deployed source.
380
381        Args:
382            name: The name to use when deploying.
383            source: The source object to deploy.
384            unique: Whether to require a unique name. If `True`, duplicate names
385                are not allowed. Defaults to `True`.
386            random_name_suffix: Whether to append a random suffix to the name.
387        """
388        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
389        source_config_dict["sourceType"] = source.name.replace("source-", "")
390
391        if random_name_suffix:
392            name += f" (ID: {text_util.generate_random_suffix()})"
393
394        if unique:
395            existing = self.list_sources(name=name)
396            if existing:
397                raise exc.AirbyteDuplicateResourcesError(
398                    resource_type="source",
399                    resource_name=name,
400                )
401
402        deployed_source = api_util.create_source(
403            name=name,
404            api_root=self.api_root,
405            workspace_id=self.workspace_id,
406            config=source_config_dict,
407            client_id=self.client_id,
408            client_secret=self.client_secret,
409            bearer_token=self.bearer_token,
410        )
411        return CloudSource(
412            workspace=self,
413            connector_id=deployed_source.source_id,
414        )
415
416    def deploy_destination(
417        self,
418        name: str,
419        destination: Destination | dict[str, Any],
420        *,
421        unique: bool = True,
422        random_name_suffix: bool = False,
423    ) -> CloudDestination:
424        """Deploy a destination to the workspace.
425
426        Returns the newly deployed destination ID.
427
428        Args:
429            name: The name to use when deploying.
430            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
431                dictionary of configuration values.
432            unique: Whether to require a unique name. If `True`, duplicate names
433                are not allowed. Defaults to `True`.
434            random_name_suffix: Whether to append a random suffix to the name.
435        """
436        if isinstance(destination, Destination):
437            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
438            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
439            # raise ValueError(destination_conf_dict)
440        else:
441            destination_conf_dict = destination.copy()
442            if "destinationType" not in destination_conf_dict:
443                raise exc.PyAirbyteInputError(
444                    message="Missing `destinationType` in configuration dictionary.",
445                )
446
447        if random_name_suffix:
448            name += f" (ID: {text_util.generate_random_suffix()})"
449
450        if unique:
451            existing = self.list_destinations(name=name)
452            if existing:
453                raise exc.AirbyteDuplicateResourcesError(
454                    resource_type="destination",
455                    resource_name=name,
456                )
457
458        deployed_destination = api_util.create_destination(
459            name=name,
460            api_root=self.api_root,
461            workspace_id=self.workspace_id,
462            config=destination_conf_dict,  # Wants a dataclass but accepts dict
463            client_id=self.client_id,
464            client_secret=self.client_secret,
465            bearer_token=self.bearer_token,
466        )
467        return CloudDestination(
468            workspace=self,
469            connector_id=deployed_destination.destination_id,
470        )
471
472    def permanently_delete_source(
473        self,
474        source: str | CloudSource,
475        *,
476        safe_mode: bool = True,
477    ) -> None:
478        """Delete a source from the workspace.
479
480        You can pass either the source ID `str` or a deployed `Source` object.
481
482        Args:
483            source: The source ID or CloudSource object to delete
484            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
485                (case insensitive) to prevent accidental deletion. Defaults to True.
486        """
487        if not isinstance(source, (str, CloudSource)):
488            raise exc.PyAirbyteInputError(
489                message="Invalid source type.",
490                input_value=type(source).__name__,
491            )
492
493        api_util.delete_source(
494            source_id=source.connector_id if isinstance(source, CloudSource) else source,
495            source_name=source.name if isinstance(source, CloudSource) else None,
496            api_root=self.api_root,
497            client_id=self.client_id,
498            client_secret=self.client_secret,
499            bearer_token=self.bearer_token,
500            safe_mode=safe_mode,
501        )
502
503    # Deploy and delete destinations
504
505    def permanently_delete_destination(
506        self,
507        destination: str | CloudDestination,
508        *,
509        safe_mode: bool = True,
510    ) -> None:
511        """Delete a deployed destination from the workspace.
512
513        You can pass either the `Cache` class or the deployed destination ID as a `str`.
514
515        Args:
516            destination: The destination ID or CloudDestination object to delete
517            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
518                (case insensitive) to prevent accidental deletion. Defaults to True.
519        """
520        if not isinstance(destination, (str, CloudDestination)):
521            raise exc.PyAirbyteInputError(
522                message="Invalid destination type.",
523                input_value=type(destination).__name__,
524            )
525
526        api_util.delete_destination(
527            destination_id=(
528                destination if isinstance(destination, str) else destination.destination_id
529            ),
530            destination_name=(
531                destination.name if isinstance(destination, CloudDestination) else None
532            ),
533            api_root=self.api_root,
534            client_id=self.client_id,
535            client_secret=self.client_secret,
536            bearer_token=self.bearer_token,
537            safe_mode=safe_mode,
538        )
539
540    # Deploy and delete connections
541
542    def deploy_connection(
543        self,
544        connection_name: str,
545        *,
546        source: CloudSource | str,
547        selected_streams: list[str],
548        destination: CloudDestination | str,
549        table_prefix: str | None = None,
550    ) -> CloudConnection:
551        """Create a new connection between an already deployed source and destination.
552
553        Returns the newly deployed connection object.
554
555        Args:
556            connection_name: The name of the connection.
557            source: The deployed source. You can pass a source ID or a CloudSource object.
558            destination: The deployed destination. You can pass a destination ID or a
559                CloudDestination object.
560            table_prefix: Optional. The table prefix to use when syncing to the destination.
561            selected_streams: The selected stream names to sync within the connection.
562        """
563        if not selected_streams:
564            raise exc.PyAirbyteInputError(
565                guidance="You must provide `selected_streams` when creating a connection."
566            )
567
568        source_id: str = source if isinstance(source, str) else source.connector_id
569        destination_id: str = (
570            destination if isinstance(destination, str) else destination.connector_id
571        )
572
573        deployed_connection = api_util.create_connection(
574            name=connection_name,
575            source_id=source_id,
576            destination_id=destination_id,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            selected_stream_names=selected_streams,
580            prefix=table_prefix or "",
581            client_id=self.client_id,
582            client_secret=self.client_secret,
583            bearer_token=self.bearer_token,
584        )
585
586        return CloudConnection(
587            workspace=self,
588            connection_id=deployed_connection.connection_id,
589            source=deployed_connection.source_id,
590            destination=deployed_connection.destination_id,
591        )
592
593    def permanently_delete_connection(
594        self,
595        connection: str | CloudConnection,
596        *,
597        cascade_delete_source: bool = False,
598        cascade_delete_destination: bool = False,
599        safe_mode: bool = True,
600    ) -> None:
601        """Delete a deployed connection from the workspace.
602
603        Args:
604            connection: The connection ID or CloudConnection object to delete
605            cascade_delete_source: If True, also delete the source after deleting the connection
606            cascade_delete_destination: If True, also delete the destination after deleting
607                the connection
608            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
609                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
610                to cascade deletes.
611        """
612        if connection is None:
613            raise ValueError("No connection ID provided.")
614
615        if isinstance(connection, str):
616            connection = CloudConnection(
617                workspace=self,
618                connection_id=connection,
619            )
620
621        api_util.delete_connection(
622            connection_id=connection.connection_id,
623            connection_name=connection.name,
624            api_root=self.api_root,
625            workspace_id=self.workspace_id,
626            client_id=self.client_id,
627            client_secret=self.client_secret,
628            bearer_token=self.bearer_token,
629            safe_mode=safe_mode,
630        )
631
632        if cascade_delete_source:
633            self.permanently_delete_source(
634                source=connection.source_id,
635                safe_mode=safe_mode,
636            )
637        if cascade_delete_destination:
638            self.permanently_delete_destination(
639                destination=connection.destination_id,
640                safe_mode=safe_mode,
641            )
642
643    # List workspaces, sources, destinations, and connections
644
645    def list_workspaces(
646        self,
647        name: str | None = None,
648        *,
649        name_filter: Callable | None = None,
650        limit: int | None = None,
651    ) -> list[CloudWorkspaceInfo]:
652        """List workspaces available to the current credentials, with an optional limit."""
653        return [
654            CloudWorkspaceInfo.from_api_response(workspace)
655            for workspace in api_util.list_workspaces(
656                workspace_id="",
657                api_root=self.api_root,
658                name=name,
659                name_filter=name_filter,
660                client_id=self.client_id,
661                client_secret=self.client_secret,
662                bearer_token=self.bearer_token,
663                limit=limit,
664            )
665        ]
666
667    def rename(
668        self,
669        name: str,
670    ) -> CloudWorkspace:
671        """Rename this workspace."""
672        api_util.rename_workspace(
673            workspace_id=self.workspace_id,
674            name=name,
675            api_root=self.api_root,
676            client_id=self.client_id,
677            client_secret=self.client_secret,
678            bearer_token=self.bearer_token,
679        )
680        return self
681
682    def permanently_delete(
683        self,
684        *,
685        workspace_name: str | None = None,
686        safe_mode: bool = True,
687    ) -> None:
688        """Permanently delete this workspace if it has no connections.
689
690        When `safe_mode` is enabled, the workspace name must contain `delete-me`
691        or `deleteme`. This also checks for existing connections before deleting
692        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
693        """
694        api_util.permanently_delete_workspace(
695            workspace_id=self.workspace_id,
696            workspace_name=workspace_name,
697            api_root=self.api_root,
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701            safe_mode=safe_mode,
702        )
703
704    def list_connections(
705        self,
706        name: str | None = None,
707        *,
708        name_filter: Callable | None = None,
709        limit: int | None = None,
710    ) -> list[CloudConnection]:
711        """List connections by name in the workspace, with an optional limit."""
712        connections = api_util.list_connections(
713            api_root=self.api_root,
714            workspace_id=self.workspace_id,
715            name=name,
716            name_filter=name_filter,
717            limit=limit,
718            client_id=self.client_id,
719            client_secret=self.client_secret,
720            bearer_token=self.bearer_token,
721        )
722        return [
723            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
724                workspace=self,
725                connection_response=connection,
726            )
727            for connection in connections
728        ]
729
730    def list_sources(
731        self,
732        name: str | None = None,
733        *,
734        name_filter: Callable | None = None,
735        limit: int | None = None,
736    ) -> list[CloudSource]:
737        """List all sources in the workspace, with an optional limit."""
738        sources = api_util.list_sources(
739            api_root=self.api_root,
740            workspace_id=self.workspace_id,
741            name=name,
742            name_filter=name_filter,
743            limit=limit,
744            client_id=self.client_id,
745            client_secret=self.client_secret,
746            bearer_token=self.bearer_token,
747        )
748        return [
749            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
750                workspace=self,
751                source_response=source,
752            )
753            for source in sources
754        ]
755
756    def list_destinations(
757        self,
758        name: str | None = None,
759        *,
760        name_filter: Callable | None = None,
761        limit: int | None = None,
762    ) -> list[CloudDestination]:
763        """List all destinations in the workspace, with an optional limit."""
764        destinations = api_util.list_destinations(
765            api_root=self.api_root,
766            workspace_id=self.workspace_id,
767            name=name,
768            name_filter=name_filter,
769            limit=limit,
770            client_id=self.client_id,
771            client_secret=self.client_secret,
772            bearer_token=self.bearer_token,
773        )
774        return [
775            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
776                workspace=self,
777                destination_response=destination,
778            )
779            for destination in destinations
780        ]
781
782    def publish_custom_source_definition(
783        self,
784        name: str,
785        *,
786        manifest_yaml: dict[str, Any] | Path | str | None = None,
787        docker_image: str | None = None,
788        docker_tag: str | None = None,
789        unique: bool = True,
790        pre_validate: bool = True,
791        testing_values: dict[str, Any] | None = None,
792    ) -> CustomCloudSourceDefinition:
793        """Publish a custom source connector definition.
794
795        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
796        and docker_tag (for Docker connectors), but not both.
797
798        Args:
799            name: Display name for the connector definition
800            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
801            docker_image: Docker repository (e.g., 'airbyte/source-custom')
802            docker_tag: Docker image tag (e.g., '1.0.0')
803            unique: Whether to enforce name uniqueness
804            pre_validate: Whether to validate manifest client-side (YAML only)
805            testing_values: Optional configuration values to use for testing in the
806                Connector Builder UI. If provided, these values are stored as the complete
807                testing values object for the connector builder project (replaces any existing
808                values), allowing immediate test read operations.
809
810        Returns:
811            CustomCloudSourceDefinition object representing the created definition
812
813        Raises:
814            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
815            AirbyteDuplicateResourcesError: If unique=True and name already exists
816        """
817        is_yaml = manifest_yaml is not None
818        is_docker = docker_image is not None
819
820        if is_yaml == is_docker:
821            raise exc.PyAirbyteInputError(
822                message=(
823                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
824                    "docker_image + docker_tag (for Docker connectors), but not both"
825                ),
826                context={
827                    "manifest_yaml_provided": is_yaml,
828                    "docker_image_provided": is_docker,
829                },
830            )
831
832        if is_docker and docker_tag is None:
833            raise exc.PyAirbyteInputError(
834                message="docker_tag is required when docker_image is specified",
835                context={"docker_image": docker_image},
836            )
837
838        if unique:
839            existing = self.list_custom_source_definitions(
840                definition_type="yaml" if is_yaml else "docker",
841            )
842            if any(d.name == name for d in existing):
843                raise exc.AirbyteDuplicateResourcesError(
844                    resource_type="custom_source_definition",
845                    resource_name=name,
846                )
847
848        if is_yaml:
849            manifest_dict: dict[str, Any]
850            if isinstance(manifest_yaml, Path):
851                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
852            elif isinstance(manifest_yaml, str):
853                manifest_dict = yaml.safe_load(manifest_yaml)
854            elif manifest_yaml is not None:
855                manifest_dict = manifest_yaml
856            else:
857                raise exc.PyAirbyteInputError(
858                    message="manifest_yaml is required for YAML connectors",
859                    context={"name": name},
860                )
861
862            if pre_validate:
863                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
864
865            result = api_util.create_custom_yaml_source_definition(
866                name=name,
867                workspace_id=self.workspace_id,
868                manifest=manifest_dict,
869                api_root=self.api_root,
870                client_id=self.client_id,
871                client_secret=self.client_secret,
872                bearer_token=self.bearer_token,
873            )
874            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
875                self, result
876            )
877
878            # Set testing values if provided
879            if testing_values is not None:
880                custom_definition.set_testing_values(testing_values)
881
882            return custom_definition
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )
888
889    def list_custom_source_definitions(
890        self,
891        *,
892        definition_type: Literal["yaml", "docker"],
893    ) -> list[CustomCloudSourceDefinition]:
894        """List custom source connector definitions.
895
896        Args:
897            definition_type: Connector type to list ("yaml" or "docker"). Required.
898
899        Returns:
900            List of CustomCloudSourceDefinition objects matching the specified type
901        """
902        if definition_type == "yaml":
903            yaml_definitions = api_util.list_custom_yaml_source_definitions(
904                workspace_id=self.workspace_id,
905                api_root=self.api_root,
906                client_id=self.client_id,
907                client_secret=self.client_secret,
908                bearer_token=self.bearer_token,
909            )
910            return [
911                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
912                for d in yaml_definitions
913            ]
914
915        raise NotImplementedError(
916            "Docker custom source definitions are not yet supported. "
917            "Only YAML manifest-based custom sources are currently available."
918        )
919
920    def get_custom_source_definition(
921        self,
922        definition_id: str,
923        *,
924        definition_type: Literal["yaml", "docker"],
925    ) -> CustomCloudSourceDefinition:
926        """Get a specific custom source definition by ID.
927
928        Args:
929            definition_id: The definition ID
930            definition_type: Connector type ("yaml" or "docker"). Required.
931
932        Returns:
933            CustomCloudSourceDefinition object
934        """
935        if definition_type == "yaml":
936            result = api_util.get_custom_yaml_source_definition(
937                workspace_id=self.workspace_id,
938                definition_id=definition_id,
939                api_root=self.api_root,
940                client_id=self.client_id,
941                client_secret=self.client_secret,
942                bearer_token=self.bearer_token,
943            )
944            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
945
946        raise NotImplementedError(
947            "Docker custom source definitions are not yet supported. "
948            "Only YAML manifest-based custom sources are currently available."
949        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( *, workspace_id: str | None = None, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, api_root: str | None = None, config_api_root: str | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None)
111    def __init__(
112        self,
113        *,
114        workspace_id: str | None = None,
115        client_id: str | SecretString | None = None,
116        client_secret: str | SecretString | None = None,
117        api_root: str | None = None,
118        config_api_root: str | None = None,
119        bearer_token: str | SecretString | None = None,
120    ) -> None:
121        """Validate and initialize credentials."""
122        env_vars = not (client_id or client_secret or bearer_token)
123        credentials = _AirbyteCredentials.from_auth(
124            workspace_id=workspace_id,
125            client_id=client_id,
126            client_secret=client_secret,
127            bearer_token=bearer_token,
128            public_api_root=api_root,
129            config_api_root=config_api_root,
130            env_vars=env_vars,
131        )
132        if not credentials.workspace_id:
133            raise exc.PyAirbyteInputError(
134                message="Workspace ID is required.",
135                guidance="Provide a workspace ID.",
136            )
137
138        self._credentials = credentials
139        self.workspace_id = credentials.workspace_id or ""
140        self.client_id = credentials.client_id
141        self.client_secret = credentials.client_secret
142        self.bearer_token = credentials.bearer_token
143        self.api_root = credentials.public_api_root
144        self.config_api_root = credentials.config_api_root
145
146        # Create internal CloudClientConfig object (validates mutual exclusivity)
147        self._client_config = CloudClientConfig(
148            client_id=self.client_id,
149            client_secret=self.client_secret,
150            bearer_token=self.bearer_token,
151            api_root=self.api_root,
152            config_api_root=self.config_api_root,
153        )

Validate and initialize credentials.

workspace_id: str
client_id: airbyte.secrets.SecretString | None
client_secret: airbyte.secrets.SecretString | None
api_root: str
config_api_root: str | None

The Config API root URL.

bearer_token: airbyte.secrets.SecretString | None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None, config_api_root: str | None = None) -> CloudWorkspace:
155    @classmethod
156    def from_env(
157        cls,
158        workspace_id: str | None = None,
159        *,
160        api_root: str | None = None,
161        config_api_root: str | None = None,
162    ) -> CloudWorkspace:
163        """Create a CloudWorkspace using credentials from environment variables.
164
165        This factory method resolves credentials from environment variables,
166        providing a convenient way to create a workspace without explicitly
167        passing credentials.
168
169        Two authentication methods are supported (mutually exclusive):
170        1. Bearer token (checked first)
171        2. OAuth2 client credentials (fallback)
172
173        Environment variables used:
174            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
175            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
176            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
177            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
178            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
179            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
180
181        Args:
182            workspace_id: The workspace ID. If not provided, will be resolved from
183                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
184            api_root: The API root URL. If not provided, will be resolved from
185                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
186                the Airbyte Cloud API.
187            config_api_root: The Config API root URL. If not provided, will be resolved
188                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
189
190        Returns:
191            A CloudWorkspace instance configured with credentials from the environment.
192
193        Raises:
194            PyAirbyteInputError: If required credentials are not found in
195                the environment or are incomplete.
196
197        Example:
198            ```python
199            # With workspace_id from environment
200            workspace = CloudWorkspace.from_env()
201
202            # With explicit workspace_id
203            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
204            ```
205        """
206        return cls(
207            workspace_id=workspace_id,
208            api_root=api_root,
209            config_api_root=config_api_root,
210        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
  • AIRBYTE_CLOUD_CONFIG_API_URL: Optional. The Config API root URL.
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
  • config_api_root: The Config API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_CONFIG_API_URL environment variable.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteInputError: If required credentials are not found in the environment or are incomplete.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
212    @property
213    def workspace_url(self) -> str | None:
214        """The web URL of the workspace."""
215        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> airbyte.cloud.CloudOrganization | None:
250    def get_organization(
251        self,
252        *,
253        raise_on_error: bool = True,
254    ) -> CloudOrganization | None:
255        """Get the organization this workspace belongs to.
256
257        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
258        which may not be available with workspace-scoped credentials.
259
260        Args:
261            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
262                If False, returns None instead of raising.
263
264        Returns:
265            CloudOrganization object with organization_id and organization_name,
266            or None if raise_on_error=False and an error occurred.
267
268        Raises:
269            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
270                (e.g., due to insufficient permissions or missing data).
271        """
272        try:
273            info = self._organization_info
274        except (AirbyteError, NotImplementedError):
275            if raise_on_error:
276                raise
277            return None
278
279        organization_id = info.get("organizationId")
280        organization_name = info.get("organizationName")
281
282        # Validate that both organization_id and organization_name are non-null and non-empty
283        if not organization_id or not organization_name:
284            if raise_on_error:
285                raise AirbyteError(
286                    message="Organization info is incomplete.",
287                    context={
288                        "organization_id": organization_id,
289                        "organization_name": organization_name,
290                    },
291                )
292            return None
293
294        organization_credentials = self._credentials.with_organization_id(organization_id)
295        return CloudOrganization(
296            organization_id=organization_id,
297            organization_name=organization_name,
298            client_id=organization_credentials.client_id,
299            client_secret=organization_credentials.client_secret,
300            bearer_token=organization_credentials.bearer_token,
301            public_api_root=organization_credentials.public_api_root,
302            config_api_root=organization_credentials.config_api_root,
303        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
307    def connect(self) -> None:
308        """Check that the workspace is reachable and raise an exception otherwise.
309
310        Note: It is not necessary to call this method before calling other operations. It
311              serves primarily as a simple check to ensure that the workspace is reachable
312              and credentials are correct.
313        """
314        _ = api_util.get_workspace(
315            api_root=self.api_root,
316            workspace_id=self.workspace_id,
317            client_id=self.client_id,
318            client_secret=self.client_secret,
319            bearer_token=self.bearer_token,
320        )
321        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
325    def get_connection(
326        self,
327        connection_id: str,
328    ) -> CloudConnection:
329        """Get a connection by ID.
330
331        This method does not fetch data from the API. It returns a `CloudConnection` object,
332        which will be loaded lazily as needed.
333        """
334        return CloudConnection(
335            workspace=self,
336            connection_id=connection_id,
337        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
339    def get_source(
340        self,
341        source_id: str,
342    ) -> CloudSource:
343        """Get a source by ID.
344
345        This method does not fetch data from the API. It returns a `CloudSource` object,
346        which will be loaded lazily as needed.
347        """
348        return CloudSource(
349            workspace=self,
350            connector_id=source_id,
351        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
353    def get_destination(
354        self,
355        destination_id: str,
356    ) -> CloudDestination:
357        """Get a destination by ID.
358
359        This method does not fetch data from the API. It returns a `CloudDestination` object,
360        which will be loaded lazily as needed.
361        """
362        return CloudDestination(
363            workspace=self,
364            connector_id=destination_id,
365        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
369    def deploy_source(
370        self,
371        name: str,
372        source: Source,
373        *,
374        unique: bool = True,
375        random_name_suffix: bool = False,
376    ) -> CloudSource:
377        """Deploy a source to the workspace.
378
379        Returns the newly deployed source.
380
381        Args:
382            name: The name to use when deploying.
383            source: The source object to deploy.
384            unique: Whether to require a unique name. If `True`, duplicate names
385                are not allowed. Defaults to `True`.
386            random_name_suffix: Whether to append a random suffix to the name.
387        """
388        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
389        source_config_dict["sourceType"] = source.name.replace("source-", "")
390
391        if random_name_suffix:
392            name += f" (ID: {text_util.generate_random_suffix()})"
393
394        if unique:
395            existing = self.list_sources(name=name)
396            if existing:
397                raise exc.AirbyteDuplicateResourcesError(
398                    resource_type="source",
399                    resource_name=name,
400                )
401
402        deployed_source = api_util.create_source(
403            name=name,
404            api_root=self.api_root,
405            workspace_id=self.workspace_id,
406            config=source_config_dict,
407            client_id=self.client_id,
408            client_secret=self.client_secret,
409            bearer_token=self.bearer_token,
410        )
411        return CloudSource(
412            workspace=self,
413            connector_id=deployed_source.source_id,
414        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
416    def deploy_destination(
417        self,
418        name: str,
419        destination: Destination | dict[str, Any],
420        *,
421        unique: bool = True,
422        random_name_suffix: bool = False,
423    ) -> CloudDestination:
424        """Deploy a destination to the workspace.
425
426        Returns the newly deployed destination ID.
427
428        Args:
429            name: The name to use when deploying.
430            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
431                dictionary of configuration values.
432            unique: Whether to require a unique name. If `True`, duplicate names
433                are not allowed. Defaults to `True`.
434            random_name_suffix: Whether to append a random suffix to the name.
435        """
436        if isinstance(destination, Destination):
437            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
438            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
439            # raise ValueError(destination_conf_dict)
440        else:
441            destination_conf_dict = destination.copy()
442            if "destinationType" not in destination_conf_dict:
443                raise exc.PyAirbyteInputError(
444                    message="Missing `destinationType` in configuration dictionary.",
445                )
446
447        if random_name_suffix:
448            name += f" (ID: {text_util.generate_random_suffix()})"
449
450        if unique:
451            existing = self.list_destinations(name=name)
452            if existing:
453                raise exc.AirbyteDuplicateResourcesError(
454                    resource_type="destination",
455                    resource_name=name,
456                )
457
458        deployed_destination = api_util.create_destination(
459            name=name,
460            api_root=self.api_root,
461            workspace_id=self.workspace_id,
462            config=destination_conf_dict,  # Wants a dataclass but accepts dict
463            client_id=self.client_id,
464            client_secret=self.client_secret,
465            bearer_token=self.bearer_token,
466        )
467        return CloudDestination(
468            workspace=self,
469            connector_id=deployed_destination.destination_id,
470        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
472    def permanently_delete_source(
473        self,
474        source: str | CloudSource,
475        *,
476        safe_mode: bool = True,
477    ) -> None:
478        """Delete a source from the workspace.
479
480        You can pass either the source ID `str` or a deployed `Source` object.
481
482        Args:
483            source: The source ID or CloudSource object to delete
484            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
485                (case insensitive) to prevent accidental deletion. Defaults to True.
486        """
487        if not isinstance(source, (str, CloudSource)):
488            raise exc.PyAirbyteInputError(
489                message="Invalid source type.",
490                input_value=type(source).__name__,
491            )
492
493        api_util.delete_source(
494            source_id=source.connector_id if isinstance(source, CloudSource) else source,
495            source_name=source.name if isinstance(source, CloudSource) else None,
496            api_root=self.api_root,
497            client_id=self.client_id,
498            client_secret=self.client_secret,
499            bearer_token=self.bearer_token,
500            safe_mode=safe_mode,
501        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
505    def permanently_delete_destination(
506        self,
507        destination: str | CloudDestination,
508        *,
509        safe_mode: bool = True,
510    ) -> None:
511        """Delete a deployed destination from the workspace.
512
513        You can pass either the `Cache` class or the deployed destination ID as a `str`.
514
515        Args:
516            destination: The destination ID or CloudDestination object to delete
517            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
518                (case insensitive) to prevent accidental deletion. Defaults to True.
519        """
520        if not isinstance(destination, (str, CloudDestination)):
521            raise exc.PyAirbyteInputError(
522                message="Invalid destination type.",
523                input_value=type(destination).__name__,
524            )
525
526        api_util.delete_destination(
527            destination_id=(
528                destination if isinstance(destination, str) else destination.destination_id
529            ),
530            destination_name=(
531                destination.name if isinstance(destination, CloudDestination) else None
532            ),
533            api_root=self.api_root,
534            client_id=self.client_id,
535            client_secret=self.client_secret,
536            bearer_token=self.bearer_token,
537            safe_mode=safe_mode,
538        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
542    def deploy_connection(
543        self,
544        connection_name: str,
545        *,
546        source: CloudSource | str,
547        selected_streams: list[str],
548        destination: CloudDestination | str,
549        table_prefix: str | None = None,
550    ) -> CloudConnection:
551        """Create a new connection between an already deployed source and destination.
552
553        Returns the newly deployed connection object.
554
555        Args:
556            connection_name: The name of the connection.
557            source: The deployed source. You can pass a source ID or a CloudSource object.
558            destination: The deployed destination. You can pass a destination ID or a
559                CloudDestination object.
560            table_prefix: Optional. The table prefix to use when syncing to the destination.
561            selected_streams: The selected stream names to sync within the connection.
562        """
563        if not selected_streams:
564            raise exc.PyAirbyteInputError(
565                guidance="You must provide `selected_streams` when creating a connection."
566            )
567
568        source_id: str = source if isinstance(source, str) else source.connector_id
569        destination_id: str = (
570            destination if isinstance(destination, str) else destination.connector_id
571        )
572
573        deployed_connection = api_util.create_connection(
574            name=connection_name,
575            source_id=source_id,
576            destination_id=destination_id,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            selected_stream_names=selected_streams,
580            prefix=table_prefix or "",
581            client_id=self.client_id,
582            client_secret=self.client_secret,
583            bearer_token=self.bearer_token,
584        )
585
586        return CloudConnection(
587            workspace=self,
588            connection_id=deployed_connection.connection_id,
589            source=deployed_connection.source_id,
590            destination=deployed_connection.destination_id,
591        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
593    def permanently_delete_connection(
594        self,
595        connection: str | CloudConnection,
596        *,
597        cascade_delete_source: bool = False,
598        cascade_delete_destination: bool = False,
599        safe_mode: bool = True,
600    ) -> None:
601        """Delete a deployed connection from the workspace.
602
603        Args:
604            connection: The connection ID or CloudConnection object to delete
605            cascade_delete_source: If True, also delete the source after deleting the connection
606            cascade_delete_destination: If True, also delete the destination after deleting
607                the connection
608            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
609                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
610                to cascade deletes.
611        """
612        if connection is None:
613            raise ValueError("No connection ID provided.")
614
615        if isinstance(connection, str):
616            connection = CloudConnection(
617                workspace=self,
618                connection_id=connection,
619            )
620
621        api_util.delete_connection(
622            connection_id=connection.connection_id,
623            connection_name=connection.name,
624            api_root=self.api_root,
625            workspace_id=self.workspace_id,
626            client_id=self.client_id,
627            client_secret=self.client_secret,
628            bearer_token=self.bearer_token,
629            safe_mode=safe_mode,
630        )
631
632        if cascade_delete_source:
633            self.permanently_delete_source(
634                source=connection.source_id,
635                safe_mode=safe_mode,
636            )
637        if cascade_delete_destination:
638            self.permanently_delete_destination(
639                destination=connection.destination_id,
640                safe_mode=safe_mode,
641            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_workspaces( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[airbyte.cloud.CloudWorkspaceInfo]:
645    def list_workspaces(
646        self,
647        name: str | None = None,
648        *,
649        name_filter: Callable | None = None,
650        limit: int | None = None,
651    ) -> list[CloudWorkspaceInfo]:
652        """List workspaces available to the current credentials, with an optional limit."""
653        return [
654            CloudWorkspaceInfo.from_api_response(workspace)
655            for workspace in api_util.list_workspaces(
656                workspace_id="",
657                api_root=self.api_root,
658                name=name,
659                name_filter=name_filter,
660                client_id=self.client_id,
661                client_secret=self.client_secret,
662                bearer_token=self.bearer_token,
663                limit=limit,
664            )
665        ]

List workspaces available to the current credentials, with an optional limit.

def rename(self, name: str) -> CloudWorkspace:
667    def rename(
668        self,
669        name: str,
670    ) -> CloudWorkspace:
671        """Rename this workspace."""
672        api_util.rename_workspace(
673            workspace_id=self.workspace_id,
674            name=name,
675            api_root=self.api_root,
676            client_id=self.client_id,
677            client_secret=self.client_secret,
678            bearer_token=self.bearer_token,
679        )
680        return self

Rename this workspace.

def permanently_delete( self, *, workspace_name: str | None = None, safe_mode: bool = True) -> None:
682    def permanently_delete(
683        self,
684        *,
685        workspace_name: str | None = None,
686        safe_mode: bool = True,
687    ) -> None:
688        """Permanently delete this workspace if it has no connections.
689
690        When `safe_mode` is enabled, the workspace name must contain `delete-me`
691        or `deleteme`. This also checks for existing connections before deleting
692        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
693        """
694        api_util.permanently_delete_workspace(
695            workspace_id=self.workspace_id,
696            workspace_name=workspace_name,
697            api_root=self.api_root,
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701            safe_mode=safe_mode,
702        )

Permanently delete this workspace if it has no connections.

When safe_mode is enabled, the workspace name must contain delete-me or deleteme. This also checks for existing connections before deleting and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[airbyte.cloud.CloudConnection]:
704    def list_connections(
705        self,
706        name: str | None = None,
707        *,
708        name_filter: Callable | None = None,
709        limit: int | None = None,
710    ) -> list[CloudConnection]:
711        """List connections by name in the workspace, with an optional limit."""
712        connections = api_util.list_connections(
713            api_root=self.api_root,
714            workspace_id=self.workspace_id,
715            name=name,
716            name_filter=name_filter,
717            limit=limit,
718            client_id=self.client_id,
719            client_secret=self.client_secret,
720            bearer_token=self.bearer_token,
721        )
722        return [
723            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
724                workspace=self,
725                connection_response=connection,
726            )
727            for connection in connections
728        ]

List connections by name in the workspace, with an optional limit.

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
730    def list_sources(
731        self,
732        name: str | None = None,
733        *,
734        name_filter: Callable | None = None,
735        limit: int | None = None,
736    ) -> list[CloudSource]:
737        """List all sources in the workspace, with an optional limit."""
738        sources = api_util.list_sources(
739            api_root=self.api_root,
740            workspace_id=self.workspace_id,
741            name=name,
742            name_filter=name_filter,
743            limit=limit,
744            client_id=self.client_id,
745            client_secret=self.client_secret,
746            bearer_token=self.bearer_token,
747        )
748        return [
749            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
750                workspace=self,
751                source_response=source,
752            )
753            for source in sources
754        ]

List all sources in the workspace, with an optional limit.

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
756    def list_destinations(
757        self,
758        name: str | None = None,
759        *,
760        name_filter: Callable | None = None,
761        limit: int | None = None,
762    ) -> list[CloudDestination]:
763        """List all destinations in the workspace, with an optional limit."""
764        destinations = api_util.list_destinations(
765            api_root=self.api_root,
766            workspace_id=self.workspace_id,
767            name=name,
768            name_filter=name_filter,
769            limit=limit,
770            client_id=self.client_id,
771            client_secret=self.client_secret,
772            bearer_token=self.bearer_token,
773        )
774        return [
775            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
776                workspace=self,
777                destination_response=destination,
778            )
779            for destination in destinations
780        ]

List all destinations in the workspace, with an optional limit.

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
782    def publish_custom_source_definition(
783        self,
784        name: str,
785        *,
786        manifest_yaml: dict[str, Any] | Path | str | None = None,
787        docker_image: str | None = None,
788        docker_tag: str | None = None,
789        unique: bool = True,
790        pre_validate: bool = True,
791        testing_values: dict[str, Any] | None = None,
792    ) -> CustomCloudSourceDefinition:
793        """Publish a custom source connector definition.
794
795        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
796        and docker_tag (for Docker connectors), but not both.
797
798        Args:
799            name: Display name for the connector definition
800            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
801            docker_image: Docker repository (e.g., 'airbyte/source-custom')
802            docker_tag: Docker image tag (e.g., '1.0.0')
803            unique: Whether to enforce name uniqueness
804            pre_validate: Whether to validate manifest client-side (YAML only)
805            testing_values: Optional configuration values to use for testing in the
806                Connector Builder UI. If provided, these values are stored as the complete
807                testing values object for the connector builder project (replaces any existing
808                values), allowing immediate test read operations.
809
810        Returns:
811            CustomCloudSourceDefinition object representing the created definition
812
813        Raises:
814            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
815            AirbyteDuplicateResourcesError: If unique=True and name already exists
816        """
817        is_yaml = manifest_yaml is not None
818        is_docker = docker_image is not None
819
820        if is_yaml == is_docker:
821            raise exc.PyAirbyteInputError(
822                message=(
823                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
824                    "docker_image + docker_tag (for Docker connectors), but not both"
825                ),
826                context={
827                    "manifest_yaml_provided": is_yaml,
828                    "docker_image_provided": is_docker,
829                },
830            )
831
832        if is_docker and docker_tag is None:
833            raise exc.PyAirbyteInputError(
834                message="docker_tag is required when docker_image is specified",
835                context={"docker_image": docker_image},
836            )
837
838        if unique:
839            existing = self.list_custom_source_definitions(
840                definition_type="yaml" if is_yaml else "docker",
841            )
842            if any(d.name == name for d in existing):
843                raise exc.AirbyteDuplicateResourcesError(
844                    resource_type="custom_source_definition",
845                    resource_name=name,
846                )
847
848        if is_yaml:
849            manifest_dict: dict[str, Any]
850            if isinstance(manifest_yaml, Path):
851                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
852            elif isinstance(manifest_yaml, str):
853                manifest_dict = yaml.safe_load(manifest_yaml)
854            elif manifest_yaml is not None:
855                manifest_dict = manifest_yaml
856            else:
857                raise exc.PyAirbyteInputError(
858                    message="manifest_yaml is required for YAML connectors",
859                    context={"name": name},
860                )
861
862            if pre_validate:
863                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
864
865            result = api_util.create_custom_yaml_source_definition(
866                name=name,
867                workspace_id=self.workspace_id,
868                manifest=manifest_dict,
869                api_root=self.api_root,
870                client_id=self.client_id,
871                client_secret=self.client_secret,
872                bearer_token=self.bearer_token,
873            )
874            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
875                self, result
876            )
877
878            # Set testing values if provided
879            if testing_values is not None:
880                custom_definition.set_testing_values(testing_values)
881
882            return custom_definition
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
889    def list_custom_source_definitions(
890        self,
891        *,
892        definition_type: Literal["yaml", "docker"],
893    ) -> list[CustomCloudSourceDefinition]:
894        """List custom source connector definitions.
895
896        Args:
897            definition_type: Connector type to list ("yaml" or "docker"). Required.
898
899        Returns:
900            List of CustomCloudSourceDefinition objects matching the specified type
901        """
902        if definition_type == "yaml":
903            yaml_definitions = api_util.list_custom_yaml_source_definitions(
904                workspace_id=self.workspace_id,
905                api_root=self.api_root,
906                client_id=self.client_id,
907                client_secret=self.client_secret,
908                bearer_token=self.bearer_token,
909            )
910            return [
911                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
912                for d in yaml_definitions
913            ]
914
915        raise NotImplementedError(
916            "Docker custom source definitions are not yet supported. "
917            "Only YAML manifest-based custom sources are currently available."
918        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
920    def get_custom_source_definition(
921        self,
922        definition_id: str,
923        *,
924        definition_type: Literal["yaml", "docker"],
925    ) -> CustomCloudSourceDefinition:
926        """Get a specific custom source definition by ID.
927
928        Args:
929            definition_id: The definition ID
930            definition_type: Connector type ("yaml" or "docker"). Required.
931
932        Returns:
933            CustomCloudSourceDefinition object
934        """
935        if definition_type == "yaml":
936            result = api_util.get_custom_yaml_source_definition(
937                workspace_id=self.workspace_id,
938                definition_id=definition_id,
939                api_root=self.api_root,
940                client_id=self.client_id,
941                client_secret=self.client_secret,
942                bearer_token=self.bearer_token,
943            )
944            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
945
946        raise NotImplementedError(
947            "Docker custom source definitions are not yet supported. "
948            "Only YAML manifest-based custom sources are currently available."
949        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object