airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6
  7## Usage Examples
  8
  9Get a new workspace object and deploy a source to it:
 10
 11```python
 12import airbyte as ab
 13from airbyte import cloud
 14
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="...",
 17    client_id="...",
 18    client_secret="...",
 19)
 20
 21# Deploy a source to the workspace
 22source = ab.get_source("source-faker", config={"count": 100})
 23deployed_source = workspace.deploy_source(
 24    name="test-source",
 25    source=source,
 26)
 27
 28# Run a check on the deployed source and raise an exception if the check fails
 29check_result = deployed_source.check(raise_on_error=True)
 30
 31# Permanently delete the newly-created source
 32workspace.permanently_delete_source(deployed_source)
 33```
 34"""
 35
 36from __future__ import annotations
 37
 38from dataclasses import dataclass, field
 39from functools import cached_property
 40from pathlib import Path
 41from typing import TYPE_CHECKING, Any, Literal, overload
 42
 43import yaml
 44
 45from airbyte import exceptions as exc
 46from airbyte._util import api_util, text_util
 47from airbyte._util.api_util import get_web_url_root
 48from airbyte.cloud.auth import (
 49    resolve_cloud_api_url,
 50    resolve_cloud_bearer_token,
 51    resolve_cloud_client_id,
 52    resolve_cloud_client_secret,
 53    resolve_cloud_workspace_id,
 54)
 55from airbyte.cloud.client_config import CloudClientConfig
 56from airbyte.cloud.connections import CloudConnection
 57from airbyte.cloud.connectors import (
 58    CloudDestination,
 59    CloudSource,
 60    CustomCloudSourceDefinition,
 61)
 62from airbyte.destinations.base import Destination
 63from airbyte.exceptions import AirbyteError
 64from airbyte.secrets.base import SecretString
 65
 66
 67if TYPE_CHECKING:
 68    from collections.abc import Callable
 69
 70    from airbyte.sources.base import Source
 71
 72
 73@dataclass
 74class CloudOrganization:
 75    """Information about an organization in Airbyte Cloud.
 76
 77    This is a minimal value object returned by CloudWorkspace.get_organization().
 78    """
 79
 80    organization_id: str
 81    """The organization ID."""
 82
 83    organization_name: str | None = None
 84    """Display name of the organization."""
 85
 86
 87@dataclass
 88class CloudWorkspace:
 89    """A remote workspace on the Airbyte Cloud.
 90
 91    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 92    instances, both OSS and Enterprise.
 93
 94    Two authentication methods are supported (mutually exclusive):
 95    1. OAuth2 client credentials (client_id + client_secret)
 96    2. Bearer token authentication
 97
 98    Example with client credentials:
 99        ```python
100        workspace = CloudWorkspace(
101            workspace_id="...",
102            client_id="...",
103            client_secret="...",
104        )
105        ```
106
107    Example with bearer token:
108        ```python
109        workspace = CloudWorkspace(
110            workspace_id="...",
111            bearer_token="...",
112        )
113        ```
114    """
115
116    workspace_id: str
117    client_id: SecretString | None = None
118    client_secret: SecretString | None = None
119    api_root: str = api_util.CLOUD_API_ROOT
120    bearer_token: SecretString | None = None
121
122    # Internal credentials object (set in __post_init__, excluded from __init__)
123    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
124
125    def __post_init__(self) -> None:
126        """Validate and initialize credentials."""
127        # Wrap secrets in SecretString if provided
128        if self.client_id is not None:
129            self.client_id = SecretString(self.client_id)
130        if self.client_secret is not None:
131            self.client_secret = SecretString(self.client_secret)
132        if self.bearer_token is not None:
133            self.bearer_token = SecretString(self.bearer_token)
134
135        # Create internal CloudClientConfig object (validates mutual exclusivity)
136        self._credentials = CloudClientConfig(
137            client_id=self.client_id,
138            client_secret=self.client_secret,
139            bearer_token=self.bearer_token,
140            api_root=self.api_root,
141        )
142
143    @classmethod
144    def from_env(
145        cls,
146        workspace_id: str | None = None,
147        *,
148        api_root: str | None = None,
149    ) -> CloudWorkspace:
150        """Create a CloudWorkspace using credentials from environment variables.
151
152        This factory method resolves credentials from environment variables,
153        providing a convenient way to create a workspace without explicitly
154        passing credentials.
155
156        Two authentication methods are supported (mutually exclusive):
157        1. Bearer token (checked first)
158        2. OAuth2 client credentials (fallback)
159
160        Environment variables used:
161            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
162            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
163            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
164            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
165            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
166
167        Args:
168            workspace_id: The workspace ID. If not provided, will be resolved from
169                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
170            api_root: The API root URL. If not provided, will be resolved from
171                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
172                the Airbyte Cloud API.
173
174        Returns:
175            A CloudWorkspace instance configured with credentials from the environment.
176
177        Raises:
178            PyAirbyteSecretNotFoundError: If required credentials are not found in
179                the environment.
180
181        Example:
182            ```python
183            # With workspace_id from environment
184            workspace = CloudWorkspace.from_env()
185
186            # With explicit workspace_id
187            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
188            ```
189        """
190        resolved_api_root = resolve_cloud_api_url(api_root)
191
192        # Try bearer token first
193        bearer_token = resolve_cloud_bearer_token()
194        if bearer_token:
195            return cls(
196                workspace_id=resolve_cloud_workspace_id(workspace_id),
197                bearer_token=bearer_token,
198                api_root=resolved_api_root,
199            )
200
201        # Fall back to client credentials
202        return cls(
203            workspace_id=resolve_cloud_workspace_id(workspace_id),
204            client_id=resolve_cloud_client_id(),
205            client_secret=resolve_cloud_client_secret(),
206            api_root=resolved_api_root,
207        )
208
209    @property
210    def workspace_url(self) -> str | None:
211        """The web URL of the workspace."""
212        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
213
214    @cached_property
215    def _organization_info(self) -> dict[str, Any]:
216        """Fetch and cache organization info for this workspace.
217
218        Uses the Config API endpoint for an efficient O(1) lookup.
219        This is an internal method; use get_organization() for public access.
220        """
221        return api_util.get_workspace_organization_info(
222            workspace_id=self.workspace_id,
223            api_root=self.api_root,
224            client_id=self.client_id,
225            client_secret=self.client_secret,
226            bearer_token=self.bearer_token,
227        )
228
229    @overload
230    def get_organization(self) -> CloudOrganization: ...
231
232    @overload
233    def get_organization(
234        self,
235        *,
236        raise_on_error: Literal[True],
237    ) -> CloudOrganization: ...
238
239    @overload
240    def get_organization(
241        self,
242        *,
243        raise_on_error: Literal[False],
244    ) -> CloudOrganization | None: ...
245
246    def get_organization(
247        self,
248        *,
249        raise_on_error: bool = True,
250    ) -> CloudOrganization | None:
251        """Get the organization this workspace belongs to.
252
253        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
254        which may not be available with workspace-scoped credentials.
255
256        Args:
257            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
258                If False, returns None instead of raising.
259
260        Returns:
261            CloudOrganization object with organization_id and organization_name,
262            or None if raise_on_error=False and an error occurred.
263
264        Raises:
265            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
266                (e.g., due to insufficient permissions or missing data).
267        """
268        try:
269            info = self._organization_info
270        except (AirbyteError, NotImplementedError):
271            if raise_on_error:
272                raise
273            return None
274
275        organization_id = info.get("organizationId")
276        organization_name = info.get("organizationName")
277
278        # Validate that both organization_id and organization_name are non-null and non-empty
279        if not organization_id or not organization_name:
280            if raise_on_error:
281                raise AirbyteError(
282                    message="Organization info is incomplete.",
283                    context={
284                        "organization_id": organization_id,
285                        "organization_name": organization_name,
286                    },
287                )
288            return None
289
290        return CloudOrganization(
291            organization_id=organization_id,
292            organization_name=organization_name,
293        )
294
295    # Test connection and creds
296
297    def connect(self) -> None:
298        """Check that the workspace is reachable and raise an exception otherwise.
299
300        Note: It is not necessary to call this method before calling other operations. It
301              serves primarily as a simple check to ensure that the workspace is reachable
302              and credentials are correct.
303        """
304        _ = api_util.get_workspace(
305            api_root=self.api_root,
306            workspace_id=self.workspace_id,
307            client_id=self.client_id,
308            client_secret=self.client_secret,
309            bearer_token=self.bearer_token,
310        )
311        print(f"Successfully connected to workspace: {self.workspace_url}")
312
313    # Get sources, destinations, and connections
314
315    def get_connection(
316        self,
317        connection_id: str,
318    ) -> CloudConnection:
319        """Get a connection by ID.
320
321        This method does not fetch data from the API. It returns a `CloudConnection` object,
322        which will be loaded lazily as needed.
323        """
324        return CloudConnection(
325            workspace=self,
326            connection_id=connection_id,
327        )
328
329    def get_source(
330        self,
331        source_id: str,
332    ) -> CloudSource:
333        """Get a source by ID.
334
335        This method does not fetch data from the API. It returns a `CloudSource` object,
336        which will be loaded lazily as needed.
337        """
338        return CloudSource(
339            workspace=self,
340            connector_id=source_id,
341        )
342
343    def get_destination(
344        self,
345        destination_id: str,
346    ) -> CloudDestination:
347        """Get a destination by ID.
348
349        This method does not fetch data from the API. It returns a `CloudDestination` object,
350        which will be loaded lazily as needed.
351        """
352        return CloudDestination(
353            workspace=self,
354            connector_id=destination_id,
355        )
356
357    # Deploy sources and destinations
358
359    def deploy_source(
360        self,
361        name: str,
362        source: Source,
363        *,
364        unique: bool = True,
365        random_name_suffix: bool = False,
366    ) -> CloudSource:
367        """Deploy a source to the workspace.
368
369        Returns the newly deployed source.
370
371        Args:
372            name: The name to use when deploying.
373            source: The source object to deploy.
374            unique: Whether to require a unique name. If `True`, duplicate names
375                are not allowed. Defaults to `True`.
376            random_name_suffix: Whether to append a random suffix to the name.
377        """
378        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
379        source_config_dict["sourceType"] = source.name.replace("source-", "")
380
381        if random_name_suffix:
382            name += f" (ID: {text_util.generate_random_suffix()})"
383
384        if unique:
385            existing = self.list_sources(name=name)
386            if existing:
387                raise exc.AirbyteDuplicateResourcesError(
388                    resource_type="source",
389                    resource_name=name,
390                )
391
392        deployed_source = api_util.create_source(
393            name=name,
394            api_root=self.api_root,
395            workspace_id=self.workspace_id,
396            config=source_config_dict,
397            client_id=self.client_id,
398            client_secret=self.client_secret,
399            bearer_token=self.bearer_token,
400        )
401        return CloudSource(
402            workspace=self,
403            connector_id=deployed_source.source_id,
404        )
405
406    def deploy_destination(
407        self,
408        name: str,
409        destination: Destination | dict[str, Any],
410        *,
411        unique: bool = True,
412        random_name_suffix: bool = False,
413    ) -> CloudDestination:
414        """Deploy a destination to the workspace.
415
416        Returns the newly deployed destination ID.
417
418        Args:
419            name: The name to use when deploying.
420            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
421                dictionary of configuration values.
422            unique: Whether to require a unique name. If `True`, duplicate names
423                are not allowed. Defaults to `True`.
424            random_name_suffix: Whether to append a random suffix to the name.
425        """
426        if isinstance(destination, Destination):
427            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
428            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
429            # raise ValueError(destination_conf_dict)
430        else:
431            destination_conf_dict = destination.copy()
432            if "destinationType" not in destination_conf_dict:
433                raise exc.PyAirbyteInputError(
434                    message="Missing `destinationType` in configuration dictionary.",
435                )
436
437        if random_name_suffix:
438            name += f" (ID: {text_util.generate_random_suffix()})"
439
440        if unique:
441            existing = self.list_destinations(name=name)
442            if existing:
443                raise exc.AirbyteDuplicateResourcesError(
444                    resource_type="destination",
445                    resource_name=name,
446                )
447
448        deployed_destination = api_util.create_destination(
449            name=name,
450            api_root=self.api_root,
451            workspace_id=self.workspace_id,
452            config=destination_conf_dict,  # Wants a dataclass but accepts dict
453            client_id=self.client_id,
454            client_secret=self.client_secret,
455            bearer_token=self.bearer_token,
456        )
457        return CloudDestination(
458            workspace=self,
459            connector_id=deployed_destination.destination_id,
460        )
461
462    def permanently_delete_source(
463        self,
464        source: str | CloudSource,
465        *,
466        safe_mode: bool = True,
467    ) -> None:
468        """Delete a source from the workspace.
469
470        You can pass either the source ID `str` or a deployed `Source` object.
471
472        Args:
473            source: The source ID or CloudSource object to delete
474            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
475                (case insensitive) to prevent accidental deletion. Defaults to True.
476        """
477        if not isinstance(source, (str, CloudSource)):
478            raise exc.PyAirbyteInputError(
479                message="Invalid source type.",
480                input_value=type(source).__name__,
481            )
482
483        api_util.delete_source(
484            source_id=source.connector_id if isinstance(source, CloudSource) else source,
485            source_name=source.name if isinstance(source, CloudSource) else None,
486            api_root=self.api_root,
487            client_id=self.client_id,
488            client_secret=self.client_secret,
489            bearer_token=self.bearer_token,
490            safe_mode=safe_mode,
491        )
492
493    # Deploy and delete destinations
494
495    def permanently_delete_destination(
496        self,
497        destination: str | CloudDestination,
498        *,
499        safe_mode: bool = True,
500    ) -> None:
501        """Delete a deployed destination from the workspace.
502
503        You can pass either the `Cache` class or the deployed destination ID as a `str`.
504
505        Args:
506            destination: The destination ID or CloudDestination object to delete
507            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
508                (case insensitive) to prevent accidental deletion. Defaults to True.
509        """
510        if not isinstance(destination, (str, CloudDestination)):
511            raise exc.PyAirbyteInputError(
512                message="Invalid destination type.",
513                input_value=type(destination).__name__,
514            )
515
516        api_util.delete_destination(
517            destination_id=(
518                destination if isinstance(destination, str) else destination.destination_id
519            ),
520            destination_name=(
521                destination.name if isinstance(destination, CloudDestination) else None
522            ),
523            api_root=self.api_root,
524            client_id=self.client_id,
525            client_secret=self.client_secret,
526            bearer_token=self.bearer_token,
527            safe_mode=safe_mode,
528        )
529
530    # Deploy and delete connections
531
532    def deploy_connection(
533        self,
534        connection_name: str,
535        *,
536        source: CloudSource | str,
537        selected_streams: list[str],
538        destination: CloudDestination | str,
539        table_prefix: str | None = None,
540    ) -> CloudConnection:
541        """Create a new connection between an already deployed source and destination.
542
543        Returns the newly deployed connection object.
544
545        Args:
546            connection_name: The name of the connection.
547            source: The deployed source. You can pass a source ID or a CloudSource object.
548            destination: The deployed destination. You can pass a destination ID or a
549                CloudDestination object.
550            table_prefix: Optional. The table prefix to use when syncing to the destination.
551            selected_streams: The selected stream names to sync within the connection.
552        """
553        if not selected_streams:
554            raise exc.PyAirbyteInputError(
555                guidance="You must provide `selected_streams` when creating a connection."
556            )
557
558        source_id: str = source if isinstance(source, str) else source.connector_id
559        destination_id: str = (
560            destination if isinstance(destination, str) else destination.connector_id
561        )
562
563        deployed_connection = api_util.create_connection(
564            name=connection_name,
565            source_id=source_id,
566            destination_id=destination_id,
567            api_root=self.api_root,
568            workspace_id=self.workspace_id,
569            selected_stream_names=selected_streams,
570            prefix=table_prefix or "",
571            client_id=self.client_id,
572            client_secret=self.client_secret,
573            bearer_token=self.bearer_token,
574        )
575
576        return CloudConnection(
577            workspace=self,
578            connection_id=deployed_connection.connection_id,
579            source=deployed_connection.source_id,
580            destination=deployed_connection.destination_id,
581        )
582
583    def permanently_delete_connection(
584        self,
585        connection: str | CloudConnection,
586        *,
587        cascade_delete_source: bool = False,
588        cascade_delete_destination: bool = False,
589        safe_mode: bool = True,
590    ) -> None:
591        """Delete a deployed connection from the workspace.
592
593        Args:
594            connection: The connection ID or CloudConnection object to delete
595            cascade_delete_source: If True, also delete the source after deleting the connection
596            cascade_delete_destination: If True, also delete the destination after deleting
597                the connection
598            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
599                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
600                to cascade deletes.
601        """
602        if connection is None:
603            raise ValueError("No connection ID provided.")
604
605        if isinstance(connection, str):
606            connection = CloudConnection(
607                workspace=self,
608                connection_id=connection,
609            )
610
611        api_util.delete_connection(
612            connection_id=connection.connection_id,
613            connection_name=connection.name,
614            api_root=self.api_root,
615            workspace_id=self.workspace_id,
616            client_id=self.client_id,
617            client_secret=self.client_secret,
618            bearer_token=self.bearer_token,
619            safe_mode=safe_mode,
620        )
621
622        if cascade_delete_source:
623            self.permanently_delete_source(
624                source=connection.source_id,
625                safe_mode=safe_mode,
626            )
627        if cascade_delete_destination:
628            self.permanently_delete_destination(
629                destination=connection.destination_id,
630                safe_mode=safe_mode,
631            )
632
633    # List sources, destinations, and connections
634
635    def list_connections(
636        self,
637        name: str | None = None,
638        *,
639        name_filter: Callable | None = None,
640    ) -> list[CloudConnection]:
641        """List connections by name in the workspace.
642
643        TODO: Add pagination support
644        """
645        connections = api_util.list_connections(
646            api_root=self.api_root,
647            workspace_id=self.workspace_id,
648            name=name,
649            name_filter=name_filter,
650            client_id=self.client_id,
651            client_secret=self.client_secret,
652            bearer_token=self.bearer_token,
653        )
654        return [
655            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
656                workspace=self,
657                connection_response=connection,
658            )
659            for connection in connections
660            if name is None or connection.name == name
661        ]
662
663    def list_sources(
664        self,
665        name: str | None = None,
666        *,
667        name_filter: Callable | None = None,
668    ) -> list[CloudSource]:
669        """List all sources in the workspace.
670
671        TODO: Add pagination support
672        """
673        sources = api_util.list_sources(
674            api_root=self.api_root,
675            workspace_id=self.workspace_id,
676            name=name,
677            name_filter=name_filter,
678            client_id=self.client_id,
679            client_secret=self.client_secret,
680            bearer_token=self.bearer_token,
681        )
682        return [
683            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
684                workspace=self,
685                source_response=source,
686            )
687            for source in sources
688            if name is None or source.name == name
689        ]
690
691    def list_destinations(
692        self,
693        name: str | None = None,
694        *,
695        name_filter: Callable | None = None,
696    ) -> list[CloudDestination]:
697        """List all destinations in the workspace.
698
699        TODO: Add pagination support
700        """
701        destinations = api_util.list_destinations(
702            api_root=self.api_root,
703            workspace_id=self.workspace_id,
704            name=name,
705            name_filter=name_filter,
706            client_id=self.client_id,
707            client_secret=self.client_secret,
708            bearer_token=self.bearer_token,
709        )
710        return [
711            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
712                workspace=self,
713                destination_response=destination,
714            )
715            for destination in destinations
716            if name is None or destination.name == name
717        ]
718
719    def publish_custom_source_definition(
720        self,
721        name: str,
722        *,
723        manifest_yaml: dict[str, Any] | Path | str | None = None,
724        docker_image: str | None = None,
725        docker_tag: str | None = None,
726        unique: bool = True,
727        pre_validate: bool = True,
728        testing_values: dict[str, Any] | None = None,
729    ) -> CustomCloudSourceDefinition:
730        """Publish a custom source connector definition.
731
732        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
733        and docker_tag (for Docker connectors), but not both.
734
735        Args:
736            name: Display name for the connector definition
737            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
738            docker_image: Docker repository (e.g., 'airbyte/source-custom')
739            docker_tag: Docker image tag (e.g., '1.0.0')
740            unique: Whether to enforce name uniqueness
741            pre_validate: Whether to validate manifest client-side (YAML only)
742            testing_values: Optional configuration values to use for testing in the
743                Connector Builder UI. If provided, these values are stored as the complete
744                testing values object for the connector builder project (replaces any existing
745                values), allowing immediate test read operations.
746
747        Returns:
748            CustomCloudSourceDefinition object representing the created definition
749
750        Raises:
751            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
752            AirbyteDuplicateResourcesError: If unique=True and name already exists
753        """
754        is_yaml = manifest_yaml is not None
755        is_docker = docker_image is not None
756
757        if is_yaml == is_docker:
758            raise exc.PyAirbyteInputError(
759                message=(
760                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
761                    "docker_image + docker_tag (for Docker connectors), but not both"
762                ),
763                context={
764                    "manifest_yaml_provided": is_yaml,
765                    "docker_image_provided": is_docker,
766                },
767            )
768
769        if is_docker and docker_tag is None:
770            raise exc.PyAirbyteInputError(
771                message="docker_tag is required when docker_image is specified",
772                context={"docker_image": docker_image},
773            )
774
775        if unique:
776            existing = self.list_custom_source_definitions(
777                definition_type="yaml" if is_yaml else "docker",
778            )
779            if any(d.name == name for d in existing):
780                raise exc.AirbyteDuplicateResourcesError(
781                    resource_type="custom_source_definition",
782                    resource_name=name,
783                )
784
785        if is_yaml:
786            manifest_dict: dict[str, Any]
787            if isinstance(manifest_yaml, Path):
788                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
789            elif isinstance(manifest_yaml, str):
790                manifest_dict = yaml.safe_load(manifest_yaml)
791            elif manifest_yaml is not None:
792                manifest_dict = manifest_yaml
793            else:
794                raise exc.PyAirbyteInputError(
795                    message="manifest_yaml is required for YAML connectors",
796                    context={"name": name},
797                )
798
799            if pre_validate:
800                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
801
802            result = api_util.create_custom_yaml_source_definition(
803                name=name,
804                workspace_id=self.workspace_id,
805                manifest=manifest_dict,
806                api_root=self.api_root,
807                client_id=self.client_id,
808                client_secret=self.client_secret,
809                bearer_token=self.bearer_token,
810            )
811            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
812                self, result
813            )
814
815            # Set testing values if provided
816            if testing_values is not None:
817                custom_definition.set_testing_values(testing_values)
818
819            return custom_definition
820
821        raise NotImplementedError(
822            "Docker custom source definitions are not yet supported. "
823            "Only YAML manifest-based custom sources are currently available."
824        )
825
826    def list_custom_source_definitions(
827        self,
828        *,
829        definition_type: Literal["yaml", "docker"],
830    ) -> list[CustomCloudSourceDefinition]:
831        """List custom source connector definitions.
832
833        Args:
834            definition_type: Connector type to list ("yaml" or "docker"). Required.
835
836        Returns:
837            List of CustomCloudSourceDefinition objects matching the specified type
838        """
839        if definition_type == "yaml":
840            yaml_definitions = api_util.list_custom_yaml_source_definitions(
841                workspace_id=self.workspace_id,
842                api_root=self.api_root,
843                client_id=self.client_id,
844                client_secret=self.client_secret,
845                bearer_token=self.bearer_token,
846            )
847            return [
848                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
849                for d in yaml_definitions
850            ]
851
852        raise NotImplementedError(
853            "Docker custom source definitions are not yet supported. "
854            "Only YAML manifest-based custom sources are currently available."
855        )
856
857    def get_custom_source_definition(
858        self,
859        definition_id: str,
860        *,
861        definition_type: Literal["yaml", "docker"],
862    ) -> CustomCloudSourceDefinition:
863        """Get a specific custom source definition by ID.
864
865        Args:
866            definition_id: The definition ID
867            definition_type: Connector type ("yaml" or "docker"). Required.
868
869        Returns:
870            CustomCloudSourceDefinition object
871        """
872        if definition_type == "yaml":
873            result = api_util.get_custom_yaml_source_definition(
874                workspace_id=self.workspace_id,
875                definition_id=definition_id,
876                api_root=self.api_root,
877                client_id=self.client_id,
878                client_secret=self.client_secret,
879                bearer_token=self.bearer_token,
880            )
881            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
882
883        raise NotImplementedError(
884            "Docker custom source definitions are not yet supported. "
885            "Only YAML manifest-based custom sources are currently available."
886        )
@dataclass
class CloudOrganization:
74@dataclass
75class CloudOrganization:
76    """Information about an organization in Airbyte Cloud.
77
78    This is a minimal value object returned by CloudWorkspace.get_organization().
79    """
80
81    organization_id: str
82    """The organization ID."""
83
84    organization_name: str | None = None
85    """Display name of the organization."""

Information about an organization in Airbyte Cloud.

This is a minimal value object returned by CloudWorkspace.get_organization().

CloudOrganization(organization_id: str, organization_name: str | None = None)
organization_id: str

The organization ID.

organization_name: str | None = None

Display name of the organization.

@dataclass
class CloudWorkspace:
 88@dataclass
 89class CloudWorkspace:
 90    """A remote workspace on the Airbyte Cloud.
 91
 92    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 93    instances, both OSS and Enterprise.
 94
 95    Two authentication methods are supported (mutually exclusive):
 96    1. OAuth2 client credentials (client_id + client_secret)
 97    2. Bearer token authentication
 98
 99    Example with client credentials:
100        ```python
101        workspace = CloudWorkspace(
102            workspace_id="...",
103            client_id="...",
104            client_secret="...",
105        )
106        ```
107
108    Example with bearer token:
109        ```python
110        workspace = CloudWorkspace(
111            workspace_id="...",
112            bearer_token="...",
113        )
114        ```
115    """
116
117    workspace_id: str
118    client_id: SecretString | None = None
119    client_secret: SecretString | None = None
120    api_root: str = api_util.CLOUD_API_ROOT
121    bearer_token: SecretString | None = None
122
123    # Internal credentials object (set in __post_init__, excluded from __init__)
124    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
125
126    def __post_init__(self) -> None:
127        """Validate and initialize credentials."""
128        # Wrap secrets in SecretString if provided
129        if self.client_id is not None:
130            self.client_id = SecretString(self.client_id)
131        if self.client_secret is not None:
132            self.client_secret = SecretString(self.client_secret)
133        if self.bearer_token is not None:
134            self.bearer_token = SecretString(self.bearer_token)
135
136        # Create internal CloudClientConfig object (validates mutual exclusivity)
137        self._credentials = CloudClientConfig(
138            client_id=self.client_id,
139            client_secret=self.client_secret,
140            bearer_token=self.bearer_token,
141            api_root=self.api_root,
142        )
143
144    @classmethod
145    def from_env(
146        cls,
147        workspace_id: str | None = None,
148        *,
149        api_root: str | None = None,
150    ) -> CloudWorkspace:
151        """Create a CloudWorkspace using credentials from environment variables.
152
153        This factory method resolves credentials from environment variables,
154        providing a convenient way to create a workspace without explicitly
155        passing credentials.
156
157        Two authentication methods are supported (mutually exclusive):
158        1. Bearer token (checked first)
159        2. OAuth2 client credentials (fallback)
160
161        Environment variables used:
162            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
163            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
164            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
165            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
166            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
167
168        Args:
169            workspace_id: The workspace ID. If not provided, will be resolved from
170                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
171            api_root: The API root URL. If not provided, will be resolved from
172                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
173                the Airbyte Cloud API.
174
175        Returns:
176            A CloudWorkspace instance configured with credentials from the environment.
177
178        Raises:
179            PyAirbyteSecretNotFoundError: If required credentials are not found in
180                the environment.
181
182        Example:
183            ```python
184            # With workspace_id from environment
185            workspace = CloudWorkspace.from_env()
186
187            # With explicit workspace_id
188            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
189            ```
190        """
191        resolved_api_root = resolve_cloud_api_url(api_root)
192
193        # Try bearer token first
194        bearer_token = resolve_cloud_bearer_token()
195        if bearer_token:
196            return cls(
197                workspace_id=resolve_cloud_workspace_id(workspace_id),
198                bearer_token=bearer_token,
199                api_root=resolved_api_root,
200            )
201
202        # Fall back to client credentials
203        return cls(
204            workspace_id=resolve_cloud_workspace_id(workspace_id),
205            client_id=resolve_cloud_client_id(),
206            client_secret=resolve_cloud_client_secret(),
207            api_root=resolved_api_root,
208        )
209
210    @property
211    def workspace_url(self) -> str | None:
212        """The web URL of the workspace."""
213        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
214
215    @cached_property
216    def _organization_info(self) -> dict[str, Any]:
217        """Fetch and cache organization info for this workspace.
218
219        Uses the Config API endpoint for an efficient O(1) lookup.
220        This is an internal method; use get_organization() for public access.
221        """
222        return api_util.get_workspace_organization_info(
223            workspace_id=self.workspace_id,
224            api_root=self.api_root,
225            client_id=self.client_id,
226            client_secret=self.client_secret,
227            bearer_token=self.bearer_token,
228        )
229
230    @overload
231    def get_organization(self) -> CloudOrganization: ...
232
233    @overload
234    def get_organization(
235        self,
236        *,
237        raise_on_error: Literal[True],
238    ) -> CloudOrganization: ...
239
240    @overload
241    def get_organization(
242        self,
243        *,
244        raise_on_error: Literal[False],
245    ) -> CloudOrganization | None: ...
246
247    def get_organization(
248        self,
249        *,
250        raise_on_error: bool = True,
251    ) -> CloudOrganization | None:
252        """Get the organization this workspace belongs to.
253
254        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
255        which may not be available with workspace-scoped credentials.
256
257        Args:
258            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
259                If False, returns None instead of raising.
260
261        Returns:
262            CloudOrganization object with organization_id and organization_name,
263            or None if raise_on_error=False and an error occurred.
264
265        Raises:
266            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
267                (e.g., due to insufficient permissions or missing data).
268        """
269        try:
270            info = self._organization_info
271        except (AirbyteError, NotImplementedError):
272            if raise_on_error:
273                raise
274            return None
275
276        organization_id = info.get("organizationId")
277        organization_name = info.get("organizationName")
278
279        # Validate that both organization_id and organization_name are non-null and non-empty
280        if not organization_id or not organization_name:
281            if raise_on_error:
282                raise AirbyteError(
283                    message="Organization info is incomplete.",
284                    context={
285                        "organization_id": organization_id,
286                        "organization_name": organization_name,
287                    },
288                )
289            return None
290
291        return CloudOrganization(
292            organization_id=organization_id,
293            organization_name=organization_name,
294        )
295
296    # Test connection and creds
297
298    def connect(self) -> None:
299        """Check that the workspace is reachable and raise an exception otherwise.
300
301        Note: It is not necessary to call this method before calling other operations. It
302              serves primarily as a simple check to ensure that the workspace is reachable
303              and credentials are correct.
304        """
305        _ = api_util.get_workspace(
306            api_root=self.api_root,
307            workspace_id=self.workspace_id,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        print(f"Successfully connected to workspace: {self.workspace_url}")
313
314    # Get sources, destinations, and connections
315
316    def get_connection(
317        self,
318        connection_id: str,
319    ) -> CloudConnection:
320        """Get a connection by ID.
321
322        This method does not fetch data from the API. It returns a `CloudConnection` object,
323        which will be loaded lazily as needed.
324        """
325        return CloudConnection(
326            workspace=self,
327            connection_id=connection_id,
328        )
329
330    def get_source(
331        self,
332        source_id: str,
333    ) -> CloudSource:
334        """Get a source by ID.
335
336        This method does not fetch data from the API. It returns a `CloudSource` object,
337        which will be loaded lazily as needed.
338        """
339        return CloudSource(
340            workspace=self,
341            connector_id=source_id,
342        )
343
344    def get_destination(
345        self,
346        destination_id: str,
347    ) -> CloudDestination:
348        """Get a destination by ID.
349
350        This method does not fetch data from the API. It returns a `CloudDestination` object,
351        which will be loaded lazily as needed.
352        """
353        return CloudDestination(
354            workspace=self,
355            connector_id=destination_id,
356        )
357
358    # Deploy sources and destinations
359
360    def deploy_source(
361        self,
362        name: str,
363        source: Source,
364        *,
365        unique: bool = True,
366        random_name_suffix: bool = False,
367    ) -> CloudSource:
368        """Deploy a source to the workspace.
369
370        Returns the newly deployed source.
371
372        Args:
373            name: The name to use when deploying.
374            source: The source object to deploy.
375            unique: Whether to require a unique name. If `True`, duplicate names
376                are not allowed. Defaults to `True`.
377            random_name_suffix: Whether to append a random suffix to the name.
378        """
379        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
380        source_config_dict["sourceType"] = source.name.replace("source-", "")
381
382        if random_name_suffix:
383            name += f" (ID: {text_util.generate_random_suffix()})"
384
385        if unique:
386            existing = self.list_sources(name=name)
387            if existing:
388                raise exc.AirbyteDuplicateResourcesError(
389                    resource_type="source",
390                    resource_name=name,
391                )
392
393        deployed_source = api_util.create_source(
394            name=name,
395            api_root=self.api_root,
396            workspace_id=self.workspace_id,
397            config=source_config_dict,
398            client_id=self.client_id,
399            client_secret=self.client_secret,
400            bearer_token=self.bearer_token,
401        )
402        return CloudSource(
403            workspace=self,
404            connector_id=deployed_source.source_id,
405        )
406
407    def deploy_destination(
408        self,
409        name: str,
410        destination: Destination | dict[str, Any],
411        *,
412        unique: bool = True,
413        random_name_suffix: bool = False,
414    ) -> CloudDestination:
415        """Deploy a destination to the workspace.
416
417        Returns the newly deployed destination ID.
418
419        Args:
420            name: The name to use when deploying.
421            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
422                dictionary of configuration values.
423            unique: Whether to require a unique name. If `True`, duplicate names
424                are not allowed. Defaults to `True`.
425            random_name_suffix: Whether to append a random suffix to the name.
426        """
427        if isinstance(destination, Destination):
428            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
429            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
430            # raise ValueError(destination_conf_dict)
431        else:
432            destination_conf_dict = destination.copy()
433            if "destinationType" not in destination_conf_dict:
434                raise exc.PyAirbyteInputError(
435                    message="Missing `destinationType` in configuration dictionary.",
436                )
437
438        if random_name_suffix:
439            name += f" (ID: {text_util.generate_random_suffix()})"
440
441        if unique:
442            existing = self.list_destinations(name=name)
443            if existing:
444                raise exc.AirbyteDuplicateResourcesError(
445                    resource_type="destination",
446                    resource_name=name,
447                )
448
449        deployed_destination = api_util.create_destination(
450            name=name,
451            api_root=self.api_root,
452            workspace_id=self.workspace_id,
453            config=destination_conf_dict,  # Wants a dataclass but accepts dict
454            client_id=self.client_id,
455            client_secret=self.client_secret,
456            bearer_token=self.bearer_token,
457        )
458        return CloudDestination(
459            workspace=self,
460            connector_id=deployed_destination.destination_id,
461        )
462
463    def permanently_delete_source(
464        self,
465        source: str | CloudSource,
466        *,
467        safe_mode: bool = True,
468    ) -> None:
469        """Delete a source from the workspace.
470
471        You can pass either the source ID `str` or a deployed `Source` object.
472
473        Args:
474            source: The source ID or CloudSource object to delete
475            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
476                (case insensitive) to prevent accidental deletion. Defaults to True.
477        """
478        if not isinstance(source, (str, CloudSource)):
479            raise exc.PyAirbyteInputError(
480                message="Invalid source type.",
481                input_value=type(source).__name__,
482            )
483
484        api_util.delete_source(
485            source_id=source.connector_id if isinstance(source, CloudSource) else source,
486            source_name=source.name if isinstance(source, CloudSource) else None,
487            api_root=self.api_root,
488            client_id=self.client_id,
489            client_secret=self.client_secret,
490            bearer_token=self.bearer_token,
491            safe_mode=safe_mode,
492        )
493
494    # Deploy and delete destinations
495
496    def permanently_delete_destination(
497        self,
498        destination: str | CloudDestination,
499        *,
500        safe_mode: bool = True,
501    ) -> None:
502        """Delete a deployed destination from the workspace.
503
504        You can pass either the `Cache` class or the deployed destination ID as a `str`.
505
506        Args:
507            destination: The destination ID or CloudDestination object to delete
508            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
509                (case insensitive) to prevent accidental deletion. Defaults to True.
510        """
511        if not isinstance(destination, (str, CloudDestination)):
512            raise exc.PyAirbyteInputError(
513                message="Invalid destination type.",
514                input_value=type(destination).__name__,
515            )
516
517        api_util.delete_destination(
518            destination_id=(
519                destination if isinstance(destination, str) else destination.destination_id
520            ),
521            destination_name=(
522                destination.name if isinstance(destination, CloudDestination) else None
523            ),
524            api_root=self.api_root,
525            client_id=self.client_id,
526            client_secret=self.client_secret,
527            bearer_token=self.bearer_token,
528            safe_mode=safe_mode,
529        )
530
531    # Deploy and delete connections
532
533    def deploy_connection(
534        self,
535        connection_name: str,
536        *,
537        source: CloudSource | str,
538        selected_streams: list[str],
539        destination: CloudDestination | str,
540        table_prefix: str | None = None,
541    ) -> CloudConnection:
542        """Create a new connection between an already deployed source and destination.
543
544        Returns the newly deployed connection object.
545
546        Args:
547            connection_name: The name of the connection.
548            source: The deployed source. You can pass a source ID or a CloudSource object.
549            destination: The deployed destination. You can pass a destination ID or a
550                CloudDestination object.
551            table_prefix: Optional. The table prefix to use when syncing to the destination.
552            selected_streams: The selected stream names to sync within the connection.
553        """
554        if not selected_streams:
555            raise exc.PyAirbyteInputError(
556                guidance="You must provide `selected_streams` when creating a connection."
557            )
558
559        source_id: str = source if isinstance(source, str) else source.connector_id
560        destination_id: str = (
561            destination if isinstance(destination, str) else destination.connector_id
562        )
563
564        deployed_connection = api_util.create_connection(
565            name=connection_name,
566            source_id=source_id,
567            destination_id=destination_id,
568            api_root=self.api_root,
569            workspace_id=self.workspace_id,
570            selected_stream_names=selected_streams,
571            prefix=table_prefix or "",
572            client_id=self.client_id,
573            client_secret=self.client_secret,
574            bearer_token=self.bearer_token,
575        )
576
577        return CloudConnection(
578            workspace=self,
579            connection_id=deployed_connection.connection_id,
580            source=deployed_connection.source_id,
581            destination=deployed_connection.destination_id,
582        )
583
584    def permanently_delete_connection(
585        self,
586        connection: str | CloudConnection,
587        *,
588        cascade_delete_source: bool = False,
589        cascade_delete_destination: bool = False,
590        safe_mode: bool = True,
591    ) -> None:
592        """Delete a deployed connection from the workspace.
593
594        Args:
595            connection: The connection ID or CloudConnection object to delete
596            cascade_delete_source: If True, also delete the source after deleting the connection
597            cascade_delete_destination: If True, also delete the destination after deleting
598                the connection
599            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
600                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
601                to cascade deletes.
602        """
603        if connection is None:
604            raise ValueError("No connection ID provided.")
605
606        if isinstance(connection, str):
607            connection = CloudConnection(
608                workspace=self,
609                connection_id=connection,
610            )
611
612        api_util.delete_connection(
613            connection_id=connection.connection_id,
614            connection_name=connection.name,
615            api_root=self.api_root,
616            workspace_id=self.workspace_id,
617            client_id=self.client_id,
618            client_secret=self.client_secret,
619            bearer_token=self.bearer_token,
620            safe_mode=safe_mode,
621        )
622
623        if cascade_delete_source:
624            self.permanently_delete_source(
625                source=connection.source_id,
626                safe_mode=safe_mode,
627            )
628        if cascade_delete_destination:
629            self.permanently_delete_destination(
630                destination=connection.destination_id,
631                safe_mode=safe_mode,
632            )
633
634    # List sources, destinations, and connections
635
636    def list_connections(
637        self,
638        name: str | None = None,
639        *,
640        name_filter: Callable | None = None,
641    ) -> list[CloudConnection]:
642        """List connections by name in the workspace.
643
644        TODO: Add pagination support
645        """
646        connections = api_util.list_connections(
647            api_root=self.api_root,
648            workspace_id=self.workspace_id,
649            name=name,
650            name_filter=name_filter,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654        )
655        return [
656            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
657                workspace=self,
658                connection_response=connection,
659            )
660            for connection in connections
661            if name is None or connection.name == name
662        ]
663
664    def list_sources(
665        self,
666        name: str | None = None,
667        *,
668        name_filter: Callable | None = None,
669    ) -> list[CloudSource]:
670        """List all sources in the workspace.
671
672        TODO: Add pagination support
673        """
674        sources = api_util.list_sources(
675            api_root=self.api_root,
676            workspace_id=self.workspace_id,
677            name=name,
678            name_filter=name_filter,
679            client_id=self.client_id,
680            client_secret=self.client_secret,
681            bearer_token=self.bearer_token,
682        )
683        return [
684            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
685                workspace=self,
686                source_response=source,
687            )
688            for source in sources
689            if name is None or source.name == name
690        ]
691
692    def list_destinations(
693        self,
694        name: str | None = None,
695        *,
696        name_filter: Callable | None = None,
697    ) -> list[CloudDestination]:
698        """List all destinations in the workspace.
699
700        TODO: Add pagination support
701        """
702        destinations = api_util.list_destinations(
703            api_root=self.api_root,
704            workspace_id=self.workspace_id,
705            name=name,
706            name_filter=name_filter,
707            client_id=self.client_id,
708            client_secret=self.client_secret,
709            bearer_token=self.bearer_token,
710        )
711        return [
712            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
713                workspace=self,
714                destination_response=destination,
715            )
716            for destination in destinations
717            if name is None or destination.name == name
718        ]
719
720    def publish_custom_source_definition(
721        self,
722        name: str,
723        *,
724        manifest_yaml: dict[str, Any] | Path | str | None = None,
725        docker_image: str | None = None,
726        docker_tag: str | None = None,
727        unique: bool = True,
728        pre_validate: bool = True,
729        testing_values: dict[str, Any] | None = None,
730    ) -> CustomCloudSourceDefinition:
731        """Publish a custom source connector definition.
732
733        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
734        and docker_tag (for Docker connectors), but not both.
735
736        Args:
737            name: Display name for the connector definition
738            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
739            docker_image: Docker repository (e.g., 'airbyte/source-custom')
740            docker_tag: Docker image tag (e.g., '1.0.0')
741            unique: Whether to enforce name uniqueness
742            pre_validate: Whether to validate manifest client-side (YAML only)
743            testing_values: Optional configuration values to use for testing in the
744                Connector Builder UI. If provided, these values are stored as the complete
745                testing values object for the connector builder project (replaces any existing
746                values), allowing immediate test read operations.
747
748        Returns:
749            CustomCloudSourceDefinition object representing the created definition
750
751        Raises:
752            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
753            AirbyteDuplicateResourcesError: If unique=True and name already exists
754        """
755        is_yaml = manifest_yaml is not None
756        is_docker = docker_image is not None
757
758        if is_yaml == is_docker:
759            raise exc.PyAirbyteInputError(
760                message=(
761                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
762                    "docker_image + docker_tag (for Docker connectors), but not both"
763                ),
764                context={
765                    "manifest_yaml_provided": is_yaml,
766                    "docker_image_provided": is_docker,
767                },
768            )
769
770        if is_docker and docker_tag is None:
771            raise exc.PyAirbyteInputError(
772                message="docker_tag is required when docker_image is specified",
773                context={"docker_image": docker_image},
774            )
775
776        if unique:
777            existing = self.list_custom_source_definitions(
778                definition_type="yaml" if is_yaml else "docker",
779            )
780            if any(d.name == name for d in existing):
781                raise exc.AirbyteDuplicateResourcesError(
782                    resource_type="custom_source_definition",
783                    resource_name=name,
784                )
785
786        if is_yaml:
787            manifest_dict: dict[str, Any]
788            if isinstance(manifest_yaml, Path):
789                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
790            elif isinstance(manifest_yaml, str):
791                manifest_dict = yaml.safe_load(manifest_yaml)
792            elif manifest_yaml is not None:
793                manifest_dict = manifest_yaml
794            else:
795                raise exc.PyAirbyteInputError(
796                    message="manifest_yaml is required for YAML connectors",
797                    context={"name": name},
798                )
799
800            if pre_validate:
801                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
802
803            result = api_util.create_custom_yaml_source_definition(
804                name=name,
805                workspace_id=self.workspace_id,
806                manifest=manifest_dict,
807                api_root=self.api_root,
808                client_id=self.client_id,
809                client_secret=self.client_secret,
810                bearer_token=self.bearer_token,
811            )
812            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
813                self, result
814            )
815
816            # Set testing values if provided
817            if testing_values is not None:
818                custom_definition.set_testing_values(testing_values)
819
820            return custom_definition
821
822        raise NotImplementedError(
823            "Docker custom source definitions are not yet supported. "
824            "Only YAML manifest-based custom sources are currently available."
825        )
826
827    def list_custom_source_definitions(
828        self,
829        *,
830        definition_type: Literal["yaml", "docker"],
831    ) -> list[CustomCloudSourceDefinition]:
832        """List custom source connector definitions.
833
834        Args:
835            definition_type: Connector type to list ("yaml" or "docker"). Required.
836
837        Returns:
838            List of CustomCloudSourceDefinition objects matching the specified type
839        """
840        if definition_type == "yaml":
841            yaml_definitions = api_util.list_custom_yaml_source_definitions(
842                workspace_id=self.workspace_id,
843                api_root=self.api_root,
844                client_id=self.client_id,
845                client_secret=self.client_secret,
846                bearer_token=self.bearer_token,
847            )
848            return [
849                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
850                for d in yaml_definitions
851            ]
852
853        raise NotImplementedError(
854            "Docker custom source definitions are not yet supported. "
855            "Only YAML manifest-based custom sources are currently available."
856        )
857
858    def get_custom_source_definition(
859        self,
860        definition_id: str,
861        *,
862        definition_type: Literal["yaml", "docker"],
863    ) -> CustomCloudSourceDefinition:
864        """Get a specific custom source definition by ID.
865
866        Args:
867            definition_id: The definition ID
868            definition_type: Connector type ("yaml" or "docker"). Required.
869
870        Returns:
871            CustomCloudSourceDefinition object
872        """
873        if definition_type == "yaml":
874            result = api_util.get_custom_yaml_source_definition(
875                workspace_id=self.workspace_id,
876                definition_id=definition_id,
877                api_root=self.api_root,
878                client_id=self.client_id,
879                client_secret=self.client_secret,
880                bearer_token=self.bearer_token,
881            )
882            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1', bearer_token: airbyte.secrets.SecretString | None = None)
workspace_id: str
client_id: airbyte.secrets.SecretString | None = None
client_secret: airbyte.secrets.SecretString | None = None
api_root: str = 'https://api.airbyte.com/v1'
bearer_token: airbyte.secrets.SecretString | None = None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None) -> CloudWorkspace:
144    @classmethod
145    def from_env(
146        cls,
147        workspace_id: str | None = None,
148        *,
149        api_root: str | None = None,
150    ) -> CloudWorkspace:
151        """Create a CloudWorkspace using credentials from environment variables.
152
153        This factory method resolves credentials from environment variables,
154        providing a convenient way to create a workspace without explicitly
155        passing credentials.
156
157        Two authentication methods are supported (mutually exclusive):
158        1. Bearer token (checked first)
159        2. OAuth2 client credentials (fallback)
160
161        Environment variables used:
162            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
163            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
164            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
165            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
166            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
167
168        Args:
169            workspace_id: The workspace ID. If not provided, will be resolved from
170                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
171            api_root: The API root URL. If not provided, will be resolved from
172                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
173                the Airbyte Cloud API.
174
175        Returns:
176            A CloudWorkspace instance configured with credentials from the environment.
177
178        Raises:
179            PyAirbyteSecretNotFoundError: If required credentials are not found in
180                the environment.
181
182        Example:
183            ```python
184            # With workspace_id from environment
185            workspace = CloudWorkspace.from_env()
186
187            # With explicit workspace_id
188            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
189            ```
190        """
191        resolved_api_root = resolve_cloud_api_url(api_root)
192
193        # Try bearer token first
194        bearer_token = resolve_cloud_bearer_token()
195        if bearer_token:
196            return cls(
197                workspace_id=resolve_cloud_workspace_id(workspace_id),
198                bearer_token=bearer_token,
199                api_root=resolved_api_root,
200            )
201
202        # Fall back to client credentials
203        return cls(
204            workspace_id=resolve_cloud_workspace_id(workspace_id),
205            client_id=resolve_cloud_client_id(),
206            client_secret=resolve_cloud_client_secret(),
207            api_root=resolved_api_root,
208        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
210    @property
211    def workspace_url(self) -> str | None:
212        """The web URL of the workspace."""
213        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> CloudOrganization | None:
247    def get_organization(
248        self,
249        *,
250        raise_on_error: bool = True,
251    ) -> CloudOrganization | None:
252        """Get the organization this workspace belongs to.
253
254        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
255        which may not be available with workspace-scoped credentials.
256
257        Args:
258            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
259                If False, returns None instead of raising.
260
261        Returns:
262            CloudOrganization object with organization_id and organization_name,
263            or None if raise_on_error=False and an error occurred.
264
265        Raises:
266            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
267                (e.g., due to insufficient permissions or missing data).
268        """
269        try:
270            info = self._organization_info
271        except (AirbyteError, NotImplementedError):
272            if raise_on_error:
273                raise
274            return None
275
276        organization_id = info.get("organizationId")
277        organization_name = info.get("organizationName")
278
279        # Validate that both organization_id and organization_name are non-null and non-empty
280        if not organization_id or not organization_name:
281            if raise_on_error:
282                raise AirbyteError(
283                    message="Organization info is incomplete.",
284                    context={
285                        "organization_id": organization_id,
286                        "organization_name": organization_name,
287                    },
288                )
289            return None
290
291        return CloudOrganization(
292            organization_id=organization_id,
293            organization_name=organization_name,
294        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
298    def connect(self) -> None:
299        """Check that the workspace is reachable and raise an exception otherwise.
300
301        Note: It is not necessary to call this method before calling other operations. It
302              serves primarily as a simple check to ensure that the workspace is reachable
303              and credentials are correct.
304        """
305        _ = api_util.get_workspace(
306            api_root=self.api_root,
307            workspace_id=self.workspace_id,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
316    def get_connection(
317        self,
318        connection_id: str,
319    ) -> CloudConnection:
320        """Get a connection by ID.
321
322        This method does not fetch data from the API. It returns a `CloudConnection` object,
323        which will be loaded lazily as needed.
324        """
325        return CloudConnection(
326            workspace=self,
327            connection_id=connection_id,
328        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
330    def get_source(
331        self,
332        source_id: str,
333    ) -> CloudSource:
334        """Get a source by ID.
335
336        This method does not fetch data from the API. It returns a `CloudSource` object,
337        which will be loaded lazily as needed.
338        """
339        return CloudSource(
340            workspace=self,
341            connector_id=source_id,
342        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
344    def get_destination(
345        self,
346        destination_id: str,
347    ) -> CloudDestination:
348        """Get a destination by ID.
349
350        This method does not fetch data from the API. It returns a `CloudDestination` object,
351        which will be loaded lazily as needed.
352        """
353        return CloudDestination(
354            workspace=self,
355            connector_id=destination_id,
356        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
360    def deploy_source(
361        self,
362        name: str,
363        source: Source,
364        *,
365        unique: bool = True,
366        random_name_suffix: bool = False,
367    ) -> CloudSource:
368        """Deploy a source to the workspace.
369
370        Returns the newly deployed source.
371
372        Args:
373            name: The name to use when deploying.
374            source: The source object to deploy.
375            unique: Whether to require a unique name. If `True`, duplicate names
376                are not allowed. Defaults to `True`.
377            random_name_suffix: Whether to append a random suffix to the name.
378        """
379        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
380        source_config_dict["sourceType"] = source.name.replace("source-", "")
381
382        if random_name_suffix:
383            name += f" (ID: {text_util.generate_random_suffix()})"
384
385        if unique:
386            existing = self.list_sources(name=name)
387            if existing:
388                raise exc.AirbyteDuplicateResourcesError(
389                    resource_type="source",
390                    resource_name=name,
391                )
392
393        deployed_source = api_util.create_source(
394            name=name,
395            api_root=self.api_root,
396            workspace_id=self.workspace_id,
397            config=source_config_dict,
398            client_id=self.client_id,
399            client_secret=self.client_secret,
400            bearer_token=self.bearer_token,
401        )
402        return CloudSource(
403            workspace=self,
404            connector_id=deployed_source.source_id,
405        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
407    def deploy_destination(
408        self,
409        name: str,
410        destination: Destination | dict[str, Any],
411        *,
412        unique: bool = True,
413        random_name_suffix: bool = False,
414    ) -> CloudDestination:
415        """Deploy a destination to the workspace.
416
417        Returns the newly deployed destination ID.
418
419        Args:
420            name: The name to use when deploying.
421            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
422                dictionary of configuration values.
423            unique: Whether to require a unique name. If `True`, duplicate names
424                are not allowed. Defaults to `True`.
425            random_name_suffix: Whether to append a random suffix to the name.
426        """
427        if isinstance(destination, Destination):
428            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
429            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
430            # raise ValueError(destination_conf_dict)
431        else:
432            destination_conf_dict = destination.copy()
433            if "destinationType" not in destination_conf_dict:
434                raise exc.PyAirbyteInputError(
435                    message="Missing `destinationType` in configuration dictionary.",
436                )
437
438        if random_name_suffix:
439            name += f" (ID: {text_util.generate_random_suffix()})"
440
441        if unique:
442            existing = self.list_destinations(name=name)
443            if existing:
444                raise exc.AirbyteDuplicateResourcesError(
445                    resource_type="destination",
446                    resource_name=name,
447                )
448
449        deployed_destination = api_util.create_destination(
450            name=name,
451            api_root=self.api_root,
452            workspace_id=self.workspace_id,
453            config=destination_conf_dict,  # Wants a dataclass but accepts dict
454            client_id=self.client_id,
455            client_secret=self.client_secret,
456            bearer_token=self.bearer_token,
457        )
458        return CloudDestination(
459            workspace=self,
460            connector_id=deployed_destination.destination_id,
461        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
463    def permanently_delete_source(
464        self,
465        source: str | CloudSource,
466        *,
467        safe_mode: bool = True,
468    ) -> None:
469        """Delete a source from the workspace.
470
471        You can pass either the source ID `str` or a deployed `Source` object.
472
473        Args:
474            source: The source ID or CloudSource object to delete
475            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
476                (case insensitive) to prevent accidental deletion. Defaults to True.
477        """
478        if not isinstance(source, (str, CloudSource)):
479            raise exc.PyAirbyteInputError(
480                message="Invalid source type.",
481                input_value=type(source).__name__,
482            )
483
484        api_util.delete_source(
485            source_id=source.connector_id if isinstance(source, CloudSource) else source,
486            source_name=source.name if isinstance(source, CloudSource) else None,
487            api_root=self.api_root,
488            client_id=self.client_id,
489            client_secret=self.client_secret,
490            bearer_token=self.bearer_token,
491            safe_mode=safe_mode,
492        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
496    def permanently_delete_destination(
497        self,
498        destination: str | CloudDestination,
499        *,
500        safe_mode: bool = True,
501    ) -> None:
502        """Delete a deployed destination from the workspace.
503
504        You can pass either the `Cache` class or the deployed destination ID as a `str`.
505
506        Args:
507            destination: The destination ID or CloudDestination object to delete
508            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
509                (case insensitive) to prevent accidental deletion. Defaults to True.
510        """
511        if not isinstance(destination, (str, CloudDestination)):
512            raise exc.PyAirbyteInputError(
513                message="Invalid destination type.",
514                input_value=type(destination).__name__,
515            )
516
517        api_util.delete_destination(
518            destination_id=(
519                destination if isinstance(destination, str) else destination.destination_id
520            ),
521            destination_name=(
522                destination.name if isinstance(destination, CloudDestination) else None
523            ),
524            api_root=self.api_root,
525            client_id=self.client_id,
526            client_secret=self.client_secret,
527            bearer_token=self.bearer_token,
528            safe_mode=safe_mode,
529        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
533    def deploy_connection(
534        self,
535        connection_name: str,
536        *,
537        source: CloudSource | str,
538        selected_streams: list[str],
539        destination: CloudDestination | str,
540        table_prefix: str | None = None,
541    ) -> CloudConnection:
542        """Create a new connection between an already deployed source and destination.
543
544        Returns the newly deployed connection object.
545
546        Args:
547            connection_name: The name of the connection.
548            source: The deployed source. You can pass a source ID or a CloudSource object.
549            destination: The deployed destination. You can pass a destination ID or a
550                CloudDestination object.
551            table_prefix: Optional. The table prefix to use when syncing to the destination.
552            selected_streams: The selected stream names to sync within the connection.
553        """
554        if not selected_streams:
555            raise exc.PyAirbyteInputError(
556                guidance="You must provide `selected_streams` when creating a connection."
557            )
558
559        source_id: str = source if isinstance(source, str) else source.connector_id
560        destination_id: str = (
561            destination if isinstance(destination, str) else destination.connector_id
562        )
563
564        deployed_connection = api_util.create_connection(
565            name=connection_name,
566            source_id=source_id,
567            destination_id=destination_id,
568            api_root=self.api_root,
569            workspace_id=self.workspace_id,
570            selected_stream_names=selected_streams,
571            prefix=table_prefix or "",
572            client_id=self.client_id,
573            client_secret=self.client_secret,
574            bearer_token=self.bearer_token,
575        )
576
577        return CloudConnection(
578            workspace=self,
579            connection_id=deployed_connection.connection_id,
580            source=deployed_connection.source_id,
581            destination=deployed_connection.destination_id,
582        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
584    def permanently_delete_connection(
585        self,
586        connection: str | CloudConnection,
587        *,
588        cascade_delete_source: bool = False,
589        cascade_delete_destination: bool = False,
590        safe_mode: bool = True,
591    ) -> None:
592        """Delete a deployed connection from the workspace.
593
594        Args:
595            connection: The connection ID or CloudConnection object to delete
596            cascade_delete_source: If True, also delete the source after deleting the connection
597            cascade_delete_destination: If True, also delete the destination after deleting
598                the connection
599            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
600                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
601                to cascade deletes.
602        """
603        if connection is None:
604            raise ValueError("No connection ID provided.")
605
606        if isinstance(connection, str):
607            connection = CloudConnection(
608                workspace=self,
609                connection_id=connection,
610            )
611
612        api_util.delete_connection(
613            connection_id=connection.connection_id,
614            connection_name=connection.name,
615            api_root=self.api_root,
616            workspace_id=self.workspace_id,
617            client_id=self.client_id,
618            client_secret=self.client_secret,
619            bearer_token=self.bearer_token,
620            safe_mode=safe_mode,
621        )
622
623        if cascade_delete_source:
624            self.permanently_delete_source(
625                source=connection.source_id,
626                safe_mode=safe_mode,
627            )
628        if cascade_delete_destination:
629            self.permanently_delete_destination(
630                destination=connection.destination_id,
631                safe_mode=safe_mode,
632            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.CloudConnection]:
636    def list_connections(
637        self,
638        name: str | None = None,
639        *,
640        name_filter: Callable | None = None,
641    ) -> list[CloudConnection]:
642        """List connections by name in the workspace.
643
644        TODO: Add pagination support
645        """
646        connections = api_util.list_connections(
647            api_root=self.api_root,
648            workspace_id=self.workspace_id,
649            name=name,
650            name_filter=name_filter,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654        )
655        return [
656            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
657                workspace=self,
658                connection_response=connection,
659            )
660            for connection in connections
661            if name is None or connection.name == name
662        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
664    def list_sources(
665        self,
666        name: str | None = None,
667        *,
668        name_filter: Callable | None = None,
669    ) -> list[CloudSource]:
670        """List all sources in the workspace.
671
672        TODO: Add pagination support
673        """
674        sources = api_util.list_sources(
675            api_root=self.api_root,
676            workspace_id=self.workspace_id,
677            name=name,
678            name_filter=name_filter,
679            client_id=self.client_id,
680            client_secret=self.client_secret,
681            bearer_token=self.bearer_token,
682        )
683        return [
684            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
685                workspace=self,
686                source_response=source,
687            )
688            for source in sources
689            if name is None or source.name == name
690        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
692    def list_destinations(
693        self,
694        name: str | None = None,
695        *,
696        name_filter: Callable | None = None,
697    ) -> list[CloudDestination]:
698        """List all destinations in the workspace.
699
700        TODO: Add pagination support
701        """
702        destinations = api_util.list_destinations(
703            api_root=self.api_root,
704            workspace_id=self.workspace_id,
705            name=name,
706            name_filter=name_filter,
707            client_id=self.client_id,
708            client_secret=self.client_secret,
709            bearer_token=self.bearer_token,
710        )
711        return [
712            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
713                workspace=self,
714                destination_response=destination,
715            )
716            for destination in destinations
717            if name is None or destination.name == name
718        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
720    def publish_custom_source_definition(
721        self,
722        name: str,
723        *,
724        manifest_yaml: dict[str, Any] | Path | str | None = None,
725        docker_image: str | None = None,
726        docker_tag: str | None = None,
727        unique: bool = True,
728        pre_validate: bool = True,
729        testing_values: dict[str, Any] | None = None,
730    ) -> CustomCloudSourceDefinition:
731        """Publish a custom source connector definition.
732
733        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
734        and docker_tag (for Docker connectors), but not both.
735
736        Args:
737            name: Display name for the connector definition
738            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
739            docker_image: Docker repository (e.g., 'airbyte/source-custom')
740            docker_tag: Docker image tag (e.g., '1.0.0')
741            unique: Whether to enforce name uniqueness
742            pre_validate: Whether to validate manifest client-side (YAML only)
743            testing_values: Optional configuration values to use for testing in the
744                Connector Builder UI. If provided, these values are stored as the complete
745                testing values object for the connector builder project (replaces any existing
746                values), allowing immediate test read operations.
747
748        Returns:
749            CustomCloudSourceDefinition object representing the created definition
750
751        Raises:
752            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
753            AirbyteDuplicateResourcesError: If unique=True and name already exists
754        """
755        is_yaml = manifest_yaml is not None
756        is_docker = docker_image is not None
757
758        if is_yaml == is_docker:
759            raise exc.PyAirbyteInputError(
760                message=(
761                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
762                    "docker_image + docker_tag (for Docker connectors), but not both"
763                ),
764                context={
765                    "manifest_yaml_provided": is_yaml,
766                    "docker_image_provided": is_docker,
767                },
768            )
769
770        if is_docker and docker_tag is None:
771            raise exc.PyAirbyteInputError(
772                message="docker_tag is required when docker_image is specified",
773                context={"docker_image": docker_image},
774            )
775
776        if unique:
777            existing = self.list_custom_source_definitions(
778                definition_type="yaml" if is_yaml else "docker",
779            )
780            if any(d.name == name for d in existing):
781                raise exc.AirbyteDuplicateResourcesError(
782                    resource_type="custom_source_definition",
783                    resource_name=name,
784                )
785
786        if is_yaml:
787            manifest_dict: dict[str, Any]
788            if isinstance(manifest_yaml, Path):
789                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
790            elif isinstance(manifest_yaml, str):
791                manifest_dict = yaml.safe_load(manifest_yaml)
792            elif manifest_yaml is not None:
793                manifest_dict = manifest_yaml
794            else:
795                raise exc.PyAirbyteInputError(
796                    message="manifest_yaml is required for YAML connectors",
797                    context={"name": name},
798                )
799
800            if pre_validate:
801                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
802
803            result = api_util.create_custom_yaml_source_definition(
804                name=name,
805                workspace_id=self.workspace_id,
806                manifest=manifest_dict,
807                api_root=self.api_root,
808                client_id=self.client_id,
809                client_secret=self.client_secret,
810                bearer_token=self.bearer_token,
811            )
812            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
813                self, result
814            )
815
816            # Set testing values if provided
817            if testing_values is not None:
818                custom_definition.set_testing_values(testing_values)
819
820            return custom_definition
821
822        raise NotImplementedError(
823            "Docker custom source definitions are not yet supported. "
824            "Only YAML manifest-based custom sources are currently available."
825        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
827    def list_custom_source_definitions(
828        self,
829        *,
830        definition_type: Literal["yaml", "docker"],
831    ) -> list[CustomCloudSourceDefinition]:
832        """List custom source connector definitions.
833
834        Args:
835            definition_type: Connector type to list ("yaml" or "docker"). Required.
836
837        Returns:
838            List of CustomCloudSourceDefinition objects matching the specified type
839        """
840        if definition_type == "yaml":
841            yaml_definitions = api_util.list_custom_yaml_source_definitions(
842                workspace_id=self.workspace_id,
843                api_root=self.api_root,
844                client_id=self.client_id,
845                client_secret=self.client_secret,
846                bearer_token=self.bearer_token,
847            )
848            return [
849                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
850                for d in yaml_definitions
851            ]
852
853        raise NotImplementedError(
854            "Docker custom source definitions are not yet supported. "
855            "Only YAML manifest-based custom sources are currently available."
856        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
858    def get_custom_source_definition(
859        self,
860        definition_id: str,
861        *,
862        definition_type: Literal["yaml", "docker"],
863    ) -> CustomCloudSourceDefinition:
864        """Get a specific custom source definition by ID.
865
866        Args:
867            definition_id: The definition ID
868            definition_type: Connector type ("yaml" or "docker"). Required.
869
870        Returns:
871            CustomCloudSourceDefinition object
872        """
873        if definition_type == "yaml":
874            result = api_util.get_custom_yaml_source_definition(
875                workspace_id=self.workspace_id,
876                definition_id=definition_id,
877                api_root=self.api_root,
878                client_id=self.client_id,
879                client_secret=self.client_secret,
880                bearer_token=self.bearer_token,
881            )
882            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object