airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6
  7## Usage Examples
  8
  9Get a new workspace object and deploy a source to it:
 10
 11```python
 12import airbyte as ab
 13from airbyte import cloud
 14
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="...",
 17    client_id="...",
 18    client_secret="...",
 19)
 20
 21# Deploy a source to the workspace
 22source = ab.get_source("source-faker", config={"count": 100})
 23deployed_source = workspace.deploy_source(
 24    name="test-source",
 25    source=source,
 26)
 27
 28# Run a check on the deployed source and raise an exception if the check fails
 29check_result = deployed_source.check(raise_on_error=True)
 30
 31# Permanently delete the newly-created source
 32workspace.permanently_delete_source(deployed_source)
 33```
 34"""
 35
 36from __future__ import annotations
 37
 38from dataclasses import dataclass
 39from functools import cached_property
 40from pathlib import Path
 41from typing import TYPE_CHECKING, Any, Literal
 42
 43import yaml
 44
 45from airbyte import exceptions as exc
 46from airbyte._util import api_util, text_util
 47from airbyte._util.api_util import get_web_url_root
 48from airbyte.cloud.connections import CloudConnection
 49from airbyte.cloud.connectors import (
 50    CloudDestination,
 51    CloudSource,
 52    CustomCloudSourceDefinition,
 53)
 54from airbyte.destinations.base import Destination
 55from airbyte.secrets.base import SecretString
 56
 57
 58if TYPE_CHECKING:
 59    from collections.abc import Callable
 60
 61    from airbyte.sources.base import Source
 62
 63
 64@dataclass
 65class CloudWorkspace:
 66    """A remote workspace on the Airbyte Cloud.
 67
 68    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 69    instances, both OSS and Enterprise.
 70    """
 71
 72    workspace_id: str
 73    client_id: SecretString
 74    client_secret: SecretString
 75    api_root: str = api_util.CLOUD_API_ROOT
 76
 77    def __post_init__(self) -> None:
 78        """Ensure that the client ID and secret are handled securely."""
 79        self.client_id = SecretString(self.client_id)
 80        self.client_secret = SecretString(self.client_secret)
 81
 82    @property
 83    def workspace_url(self) -> str | None:
 84        """The web URL of the workspace."""
 85        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 86
 87    @cached_property
 88    def _organization_info(self) -> dict[str, Any]:
 89        """Fetch and cache organization info for this workspace.
 90
 91        Uses the Config API endpoint for an efficient O(1) lookup.
 92        """
 93        return api_util.get_workspace_organization_info(
 94            workspace_id=self.workspace_id,
 95            api_root=self.api_root,
 96            client_id=self.client_id,
 97            client_secret=self.client_secret,
 98        )
 99
100    @property
101    def organization_id(self) -> str | None:
102        """The ID of the organization this workspace belongs to.
103
104        This value is cached after the first lookup.
105        """
106        return self._organization_info.get("organizationId")
107
108    @property
109    def organization_name(self) -> str | None:
110        """The name of the organization this workspace belongs to.
111
112        This value is cached after the first lookup.
113        """
114        return self._organization_info.get("organizationName")
115
116    # Test connection and creds
117
118    def connect(self) -> None:
119        """Check that the workspace is reachable and raise an exception otherwise.
120
121        Note: It is not necessary to call this method before calling other operations. It
122              serves primarily as a simple check to ensure that the workspace is reachable
123              and credentials are correct.
124        """
125        _ = api_util.get_workspace(
126            api_root=self.api_root,
127            workspace_id=self.workspace_id,
128            client_id=self.client_id,
129            client_secret=self.client_secret,
130        )
131        print(f"Successfully connected to workspace: {self.workspace_url}")
132
133    # Get sources, destinations, and connections
134
135    def get_connection(
136        self,
137        connection_id: str,
138    ) -> CloudConnection:
139        """Get a connection by ID.
140
141        This method does not fetch data from the API. It returns a `CloudConnection` object,
142        which will be loaded lazily as needed.
143        """
144        return CloudConnection(
145            workspace=self,
146            connection_id=connection_id,
147        )
148
149    def get_source(
150        self,
151        source_id: str,
152    ) -> CloudSource:
153        """Get a source by ID.
154
155        This method does not fetch data from the API. It returns a `CloudSource` object,
156        which will be loaded lazily as needed.
157        """
158        return CloudSource(
159            workspace=self,
160            connector_id=source_id,
161        )
162
163    def get_destination(
164        self,
165        destination_id: str,
166    ) -> CloudDestination:
167        """Get a destination by ID.
168
169        This method does not fetch data from the API. It returns a `CloudDestination` object,
170        which will be loaded lazily as needed.
171        """
172        return CloudDestination(
173            workspace=self,
174            connector_id=destination_id,
175        )
176
177    # Deploy sources and destinations
178
179    def deploy_source(
180        self,
181        name: str,
182        source: Source,
183        *,
184        unique: bool = True,
185        random_name_suffix: bool = False,
186    ) -> CloudSource:
187        """Deploy a source to the workspace.
188
189        Returns the newly deployed source.
190
191        Args:
192            name: The name to use when deploying.
193            source: The source object to deploy.
194            unique: Whether to require a unique name. If `True`, duplicate names
195                are not allowed. Defaults to `True`.
196            random_name_suffix: Whether to append a random suffix to the name.
197        """
198        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
199        source_config_dict["sourceType"] = source.name.replace("source-", "")
200
201        if random_name_suffix:
202            name += f" (ID: {text_util.generate_random_suffix()})"
203
204        if unique:
205            existing = self.list_sources(name=name)
206            if existing:
207                raise exc.AirbyteDuplicateResourcesError(
208                    resource_type="source",
209                    resource_name=name,
210                )
211
212        deployed_source = api_util.create_source(
213            name=name,
214            api_root=self.api_root,
215            workspace_id=self.workspace_id,
216            config=source_config_dict,
217            client_id=self.client_id,
218            client_secret=self.client_secret,
219        )
220        return CloudSource(
221            workspace=self,
222            connector_id=deployed_source.source_id,
223        )
224
225    def deploy_destination(
226        self,
227        name: str,
228        destination: Destination | dict[str, Any],
229        *,
230        unique: bool = True,
231        random_name_suffix: bool = False,
232    ) -> CloudDestination:
233        """Deploy a destination to the workspace.
234
235        Returns the newly deployed destination ID.
236
237        Args:
238            name: The name to use when deploying.
239            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
240                dictionary of configuration values.
241            unique: Whether to require a unique name. If `True`, duplicate names
242                are not allowed. Defaults to `True`.
243            random_name_suffix: Whether to append a random suffix to the name.
244        """
245        if isinstance(destination, Destination):
246            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
247            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
248            # raise ValueError(destination_conf_dict)
249        else:
250            destination_conf_dict = destination.copy()
251            if "destinationType" not in destination_conf_dict:
252                raise exc.PyAirbyteInputError(
253                    message="Missing `destinationType` in configuration dictionary.",
254                )
255
256        if random_name_suffix:
257            name += f" (ID: {text_util.generate_random_suffix()})"
258
259        if unique:
260            existing = self.list_destinations(name=name)
261            if existing:
262                raise exc.AirbyteDuplicateResourcesError(
263                    resource_type="destination",
264                    resource_name=name,
265                )
266
267        deployed_destination = api_util.create_destination(
268            name=name,
269            api_root=self.api_root,
270            workspace_id=self.workspace_id,
271            config=destination_conf_dict,  # Wants a dataclass but accepts dict
272            client_id=self.client_id,
273            client_secret=self.client_secret,
274        )
275        return CloudDestination(
276            workspace=self,
277            connector_id=deployed_destination.destination_id,
278        )
279
280    def permanently_delete_source(
281        self,
282        source: str | CloudSource,
283        *,
284        safe_mode: bool = True,
285    ) -> None:
286        """Delete a source from the workspace.
287
288        You can pass either the source ID `str` or a deployed `Source` object.
289
290        Args:
291            source: The source ID or CloudSource object to delete
292            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
293                (case insensitive) to prevent accidental deletion. Defaults to True.
294        """
295        if not isinstance(source, (str, CloudSource)):
296            raise exc.PyAirbyteInputError(
297                message="Invalid source type.",
298                input_value=type(source).__name__,
299            )
300
301        api_util.delete_source(
302            source_id=source.connector_id if isinstance(source, CloudSource) else source,
303            source_name=source.name if isinstance(source, CloudSource) else None,
304            api_root=self.api_root,
305            client_id=self.client_id,
306            client_secret=self.client_secret,
307            safe_mode=safe_mode,
308        )
309
310    # Deploy and delete destinations
311
312    def permanently_delete_destination(
313        self,
314        destination: str | CloudDestination,
315        *,
316        safe_mode: bool = True,
317    ) -> None:
318        """Delete a deployed destination from the workspace.
319
320        You can pass either the `Cache` class or the deployed destination ID as a `str`.
321
322        Args:
323            destination: The destination ID or CloudDestination object to delete
324            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
325                (case insensitive) to prevent accidental deletion. Defaults to True.
326        """
327        if not isinstance(destination, (str, CloudDestination)):
328            raise exc.PyAirbyteInputError(
329                message="Invalid destination type.",
330                input_value=type(destination).__name__,
331            )
332
333        api_util.delete_destination(
334            destination_id=(
335                destination if isinstance(destination, str) else destination.destination_id
336            ),
337            destination_name=(
338                destination.name if isinstance(destination, CloudDestination) else None
339            ),
340            api_root=self.api_root,
341            client_id=self.client_id,
342            client_secret=self.client_secret,
343            safe_mode=safe_mode,
344        )
345
346    # Deploy and delete connections
347
348    def deploy_connection(
349        self,
350        connection_name: str,
351        *,
352        source: CloudSource | str,
353        selected_streams: list[str],
354        destination: CloudDestination | str,
355        table_prefix: str | None = None,
356    ) -> CloudConnection:
357        """Create a new connection between an already deployed source and destination.
358
359        Returns the newly deployed connection object.
360
361        Args:
362            connection_name: The name of the connection.
363            source: The deployed source. You can pass a source ID or a CloudSource object.
364            destination: The deployed destination. You can pass a destination ID or a
365                CloudDestination object.
366            table_prefix: Optional. The table prefix to use when syncing to the destination.
367            selected_streams: The selected stream names to sync within the connection.
368        """
369        if not selected_streams:
370            raise exc.PyAirbyteInputError(
371                guidance="You must provide `selected_streams` when creating a connection."
372            )
373
374        source_id: str = source if isinstance(source, str) else source.connector_id
375        destination_id: str = (
376            destination if isinstance(destination, str) else destination.connector_id
377        )
378
379        deployed_connection = api_util.create_connection(
380            name=connection_name,
381            source_id=source_id,
382            destination_id=destination_id,
383            api_root=self.api_root,
384            workspace_id=self.workspace_id,
385            selected_stream_names=selected_streams,
386            prefix=table_prefix or "",
387            client_id=self.client_id,
388            client_secret=self.client_secret,
389        )
390
391        return CloudConnection(
392            workspace=self,
393            connection_id=deployed_connection.connection_id,
394            source=deployed_connection.source_id,
395            destination=deployed_connection.destination_id,
396        )
397
398    def permanently_delete_connection(
399        self,
400        connection: str | CloudConnection,
401        *,
402        cascade_delete_source: bool = False,
403        cascade_delete_destination: bool = False,
404        safe_mode: bool = True,
405    ) -> None:
406        """Delete a deployed connection from the workspace.
407
408        Args:
409            connection: The connection ID or CloudConnection object to delete
410            cascade_delete_source: If True, also delete the source after deleting the connection
411            cascade_delete_destination: If True, also delete the destination after deleting
412                the connection
413            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
414                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
415                to cascade deletes.
416        """
417        if connection is None:
418            raise ValueError("No connection ID provided.")
419
420        if isinstance(connection, str):
421            connection = CloudConnection(
422                workspace=self,
423                connection_id=connection,
424            )
425
426        api_util.delete_connection(
427            connection_id=connection.connection_id,
428            connection_name=connection.name,
429            api_root=self.api_root,
430            workspace_id=self.workspace_id,
431            client_id=self.client_id,
432            client_secret=self.client_secret,
433            safe_mode=safe_mode,
434        )
435
436        if cascade_delete_source:
437            self.permanently_delete_source(
438                source=connection.source_id,
439                safe_mode=safe_mode,
440            )
441        if cascade_delete_destination:
442            self.permanently_delete_destination(
443                destination=connection.destination_id,
444                safe_mode=safe_mode,
445            )
446
447    # List sources, destinations, and connections
448
449    def list_connections(
450        self,
451        name: str | None = None,
452        *,
453        name_filter: Callable | None = None,
454    ) -> list[CloudConnection]:
455        """List connections by name in the workspace.
456
457        TODO: Add pagination support
458        """
459        connections = api_util.list_connections(
460            api_root=self.api_root,
461            workspace_id=self.workspace_id,
462            name=name,
463            name_filter=name_filter,
464            client_id=self.client_id,
465            client_secret=self.client_secret,
466        )
467        return [
468            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
469                workspace=self,
470                connection_response=connection,
471            )
472            for connection in connections
473            if name is None or connection.name == name
474        ]
475
476    def list_sources(
477        self,
478        name: str | None = None,
479        *,
480        name_filter: Callable | None = None,
481    ) -> list[CloudSource]:
482        """List all sources in the workspace.
483
484        TODO: Add pagination support
485        """
486        sources = api_util.list_sources(
487            api_root=self.api_root,
488            workspace_id=self.workspace_id,
489            name=name,
490            name_filter=name_filter,
491            client_id=self.client_id,
492            client_secret=self.client_secret,
493        )
494        return [
495            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
496                workspace=self,
497                source_response=source,
498            )
499            for source in sources
500            if name is None or source.name == name
501        ]
502
503    def list_destinations(
504        self,
505        name: str | None = None,
506        *,
507        name_filter: Callable | None = None,
508    ) -> list[CloudDestination]:
509        """List all destinations in the workspace.
510
511        TODO: Add pagination support
512        """
513        destinations = api_util.list_destinations(
514            api_root=self.api_root,
515            workspace_id=self.workspace_id,
516            name=name,
517            name_filter=name_filter,
518            client_id=self.client_id,
519            client_secret=self.client_secret,
520        )
521        return [
522            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
523                workspace=self,
524                destination_response=destination,
525            )
526            for destination in destinations
527            if name is None or destination.name == name
528        ]
529
530    def publish_custom_source_definition(
531        self,
532        name: str,
533        *,
534        manifest_yaml: dict[str, Any] | Path | str | None = None,
535        docker_image: str | None = None,
536        docker_tag: str | None = None,
537        unique: bool = True,
538        pre_validate: bool = True,
539        testing_values: dict[str, Any] | None = None,
540    ) -> CustomCloudSourceDefinition:
541        """Publish a custom source connector definition.
542
543        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
544        and docker_tag (for Docker connectors), but not both.
545
546        Args:
547            name: Display name for the connector definition
548            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
549            docker_image: Docker repository (e.g., 'airbyte/source-custom')
550            docker_tag: Docker image tag (e.g., '1.0.0')
551            unique: Whether to enforce name uniqueness
552            pre_validate: Whether to validate manifest client-side (YAML only)
553            testing_values: Optional configuration values to use for testing in the
554                Connector Builder UI. If provided, these values are stored as the complete
555                testing values object for the connector builder project (replaces any existing
556                values), allowing immediate test read operations.
557
558        Returns:
559            CustomCloudSourceDefinition object representing the created definition
560
561        Raises:
562            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
563            AirbyteDuplicateResourcesError: If unique=True and name already exists
564        """
565        is_yaml = manifest_yaml is not None
566        is_docker = docker_image is not None
567
568        if is_yaml == is_docker:
569            raise exc.PyAirbyteInputError(
570                message=(
571                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
572                    "docker_image + docker_tag (for Docker connectors), but not both"
573                ),
574                context={
575                    "manifest_yaml_provided": is_yaml,
576                    "docker_image_provided": is_docker,
577                },
578            )
579
580        if is_docker and docker_tag is None:
581            raise exc.PyAirbyteInputError(
582                message="docker_tag is required when docker_image is specified",
583                context={"docker_image": docker_image},
584            )
585
586        if unique:
587            existing = self.list_custom_source_definitions(
588                definition_type="yaml" if is_yaml else "docker",
589            )
590            if any(d.name == name for d in existing):
591                raise exc.AirbyteDuplicateResourcesError(
592                    resource_type="custom_source_definition",
593                    resource_name=name,
594                )
595
596        if is_yaml:
597            manifest_dict: dict[str, Any]
598            if isinstance(manifest_yaml, Path):
599                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
600            elif isinstance(manifest_yaml, str):
601                manifest_dict = yaml.safe_load(manifest_yaml)
602            elif manifest_yaml is not None:
603                manifest_dict = manifest_yaml
604            else:
605                raise exc.PyAirbyteInputError(
606                    message="manifest_yaml is required for YAML connectors",
607                    context={"name": name},
608                )
609
610            if pre_validate:
611                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
612
613            result = api_util.create_custom_yaml_source_definition(
614                name=name,
615                workspace_id=self.workspace_id,
616                manifest=manifest_dict,
617                api_root=self.api_root,
618                client_id=self.client_id,
619                client_secret=self.client_secret,
620            )
621            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
622                self, result
623            )
624
625            # Set testing values if provided
626            if testing_values is not None:
627                custom_definition.set_testing_values(testing_values)
628
629            return custom_definition
630
631        raise NotImplementedError(
632            "Docker custom source definitions are not yet supported. "
633            "Only YAML manifest-based custom sources are currently available."
634        )
635
636    def list_custom_source_definitions(
637        self,
638        *,
639        definition_type: Literal["yaml", "docker"],
640    ) -> list[CustomCloudSourceDefinition]:
641        """List custom source connector definitions.
642
643        Args:
644            definition_type: Connector type to list ("yaml" or "docker"). Required.
645
646        Returns:
647            List of CustomCloudSourceDefinition objects matching the specified type
648        """
649        if definition_type == "yaml":
650            yaml_definitions = api_util.list_custom_yaml_source_definitions(
651                workspace_id=self.workspace_id,
652                api_root=self.api_root,
653                client_id=self.client_id,
654                client_secret=self.client_secret,
655            )
656            return [
657                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
658                for d in yaml_definitions
659            ]
660
661        raise NotImplementedError(
662            "Docker custom source definitions are not yet supported. "
663            "Only YAML manifest-based custom sources are currently available."
664        )
665
666    def get_custom_source_definition(
667        self,
668        definition_id: str,
669        *,
670        definition_type: Literal["yaml", "docker"],
671    ) -> CustomCloudSourceDefinition:
672        """Get a specific custom source definition by ID.
673
674        Args:
675            definition_id: The definition ID
676            definition_type: Connector type ("yaml" or "docker"). Required.
677
678        Returns:
679            CustomCloudSourceDefinition object
680        """
681        if definition_type == "yaml":
682            result = api_util.get_custom_yaml_source_definition(
683                workspace_id=self.workspace_id,
684                definition_id=definition_id,
685                api_root=self.api_root,
686                client_id=self.client_id,
687                client_secret=self.client_secret,
688            )
689            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
690
691        raise NotImplementedError(
692            "Docker custom source definitions are not yet supported. "
693            "Only YAML manifest-based custom sources are currently available."
694        )
@dataclass
class CloudWorkspace:
 65@dataclass
 66class CloudWorkspace:
 67    """A remote workspace on the Airbyte Cloud.
 68
 69    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 70    instances, both OSS and Enterprise.
 71    """
 72
 73    workspace_id: str
 74    client_id: SecretString
 75    client_secret: SecretString
 76    api_root: str = api_util.CLOUD_API_ROOT
 77
 78    def __post_init__(self) -> None:
 79        """Ensure that the client ID and secret are handled securely."""
 80        self.client_id = SecretString(self.client_id)
 81        self.client_secret = SecretString(self.client_secret)
 82
 83    @property
 84    def workspace_url(self) -> str | None:
 85        """The web URL of the workspace."""
 86        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 87
 88    @cached_property
 89    def _organization_info(self) -> dict[str, Any]:
 90        """Fetch and cache organization info for this workspace.
 91
 92        Uses the Config API endpoint for an efficient O(1) lookup.
 93        """
 94        return api_util.get_workspace_organization_info(
 95            workspace_id=self.workspace_id,
 96            api_root=self.api_root,
 97            client_id=self.client_id,
 98            client_secret=self.client_secret,
 99        )
100
101    @property
102    def organization_id(self) -> str | None:
103        """The ID of the organization this workspace belongs to.
104
105        This value is cached after the first lookup.
106        """
107        return self._organization_info.get("organizationId")
108
109    @property
110    def organization_name(self) -> str | None:
111        """The name of the organization this workspace belongs to.
112
113        This value is cached after the first lookup.
114        """
115        return self._organization_info.get("organizationName")
116
117    # Test connection and creds
118
119    def connect(self) -> None:
120        """Check that the workspace is reachable and raise an exception otherwise.
121
122        Note: It is not necessary to call this method before calling other operations. It
123              serves primarily as a simple check to ensure that the workspace is reachable
124              and credentials are correct.
125        """
126        _ = api_util.get_workspace(
127            api_root=self.api_root,
128            workspace_id=self.workspace_id,
129            client_id=self.client_id,
130            client_secret=self.client_secret,
131        )
132        print(f"Successfully connected to workspace: {self.workspace_url}")
133
134    # Get sources, destinations, and connections
135
136    def get_connection(
137        self,
138        connection_id: str,
139    ) -> CloudConnection:
140        """Get a connection by ID.
141
142        This method does not fetch data from the API. It returns a `CloudConnection` object,
143        which will be loaded lazily as needed.
144        """
145        return CloudConnection(
146            workspace=self,
147            connection_id=connection_id,
148        )
149
150    def get_source(
151        self,
152        source_id: str,
153    ) -> CloudSource:
154        """Get a source by ID.
155
156        This method does not fetch data from the API. It returns a `CloudSource` object,
157        which will be loaded lazily as needed.
158        """
159        return CloudSource(
160            workspace=self,
161            connector_id=source_id,
162        )
163
164    def get_destination(
165        self,
166        destination_id: str,
167    ) -> CloudDestination:
168        """Get a destination by ID.
169
170        This method does not fetch data from the API. It returns a `CloudDestination` object,
171        which will be loaded lazily as needed.
172        """
173        return CloudDestination(
174            workspace=self,
175            connector_id=destination_id,
176        )
177
178    # Deploy sources and destinations
179
180    def deploy_source(
181        self,
182        name: str,
183        source: Source,
184        *,
185        unique: bool = True,
186        random_name_suffix: bool = False,
187    ) -> CloudSource:
188        """Deploy a source to the workspace.
189
190        Returns the newly deployed source.
191
192        Args:
193            name: The name to use when deploying.
194            source: The source object to deploy.
195            unique: Whether to require a unique name. If `True`, duplicate names
196                are not allowed. Defaults to `True`.
197            random_name_suffix: Whether to append a random suffix to the name.
198        """
199        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
200        source_config_dict["sourceType"] = source.name.replace("source-", "")
201
202        if random_name_suffix:
203            name += f" (ID: {text_util.generate_random_suffix()})"
204
205        if unique:
206            existing = self.list_sources(name=name)
207            if existing:
208                raise exc.AirbyteDuplicateResourcesError(
209                    resource_type="source",
210                    resource_name=name,
211                )
212
213        deployed_source = api_util.create_source(
214            name=name,
215            api_root=self.api_root,
216            workspace_id=self.workspace_id,
217            config=source_config_dict,
218            client_id=self.client_id,
219            client_secret=self.client_secret,
220        )
221        return CloudSource(
222            workspace=self,
223            connector_id=deployed_source.source_id,
224        )
225
226    def deploy_destination(
227        self,
228        name: str,
229        destination: Destination | dict[str, Any],
230        *,
231        unique: bool = True,
232        random_name_suffix: bool = False,
233    ) -> CloudDestination:
234        """Deploy a destination to the workspace.
235
236        Returns the newly deployed destination ID.
237
238        Args:
239            name: The name to use when deploying.
240            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
241                dictionary of configuration values.
242            unique: Whether to require a unique name. If `True`, duplicate names
243                are not allowed. Defaults to `True`.
244            random_name_suffix: Whether to append a random suffix to the name.
245        """
246        if isinstance(destination, Destination):
247            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
248            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
249            # raise ValueError(destination_conf_dict)
250        else:
251            destination_conf_dict = destination.copy()
252            if "destinationType" not in destination_conf_dict:
253                raise exc.PyAirbyteInputError(
254                    message="Missing `destinationType` in configuration dictionary.",
255                )
256
257        if random_name_suffix:
258            name += f" (ID: {text_util.generate_random_suffix()})"
259
260        if unique:
261            existing = self.list_destinations(name=name)
262            if existing:
263                raise exc.AirbyteDuplicateResourcesError(
264                    resource_type="destination",
265                    resource_name=name,
266                )
267
268        deployed_destination = api_util.create_destination(
269            name=name,
270            api_root=self.api_root,
271            workspace_id=self.workspace_id,
272            config=destination_conf_dict,  # Wants a dataclass but accepts dict
273            client_id=self.client_id,
274            client_secret=self.client_secret,
275        )
276        return CloudDestination(
277            workspace=self,
278            connector_id=deployed_destination.destination_id,
279        )
280
281    def permanently_delete_source(
282        self,
283        source: str | CloudSource,
284        *,
285        safe_mode: bool = True,
286    ) -> None:
287        """Delete a source from the workspace.
288
289        You can pass either the source ID `str` or a deployed `Source` object.
290
291        Args:
292            source: The source ID or CloudSource object to delete
293            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
294                (case insensitive) to prevent accidental deletion. Defaults to True.
295        """
296        if not isinstance(source, (str, CloudSource)):
297            raise exc.PyAirbyteInputError(
298                message="Invalid source type.",
299                input_value=type(source).__name__,
300            )
301
302        api_util.delete_source(
303            source_id=source.connector_id if isinstance(source, CloudSource) else source,
304            source_name=source.name if isinstance(source, CloudSource) else None,
305            api_root=self.api_root,
306            client_id=self.client_id,
307            client_secret=self.client_secret,
308            safe_mode=safe_mode,
309        )
310
311    # Deploy and delete destinations
312
313    def permanently_delete_destination(
314        self,
315        destination: str | CloudDestination,
316        *,
317        safe_mode: bool = True,
318    ) -> None:
319        """Delete a deployed destination from the workspace.
320
321        You can pass either the `Cache` class or the deployed destination ID as a `str`.
322
323        Args:
324            destination: The destination ID or CloudDestination object to delete
325            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
326                (case insensitive) to prevent accidental deletion. Defaults to True.
327        """
328        if not isinstance(destination, (str, CloudDestination)):
329            raise exc.PyAirbyteInputError(
330                message="Invalid destination type.",
331                input_value=type(destination).__name__,
332            )
333
334        api_util.delete_destination(
335            destination_id=(
336                destination if isinstance(destination, str) else destination.destination_id
337            ),
338            destination_name=(
339                destination.name if isinstance(destination, CloudDestination) else None
340            ),
341            api_root=self.api_root,
342            client_id=self.client_id,
343            client_secret=self.client_secret,
344            safe_mode=safe_mode,
345        )
346
347    # Deploy and delete connections
348
349    def deploy_connection(
350        self,
351        connection_name: str,
352        *,
353        source: CloudSource | str,
354        selected_streams: list[str],
355        destination: CloudDestination | str,
356        table_prefix: str | None = None,
357    ) -> CloudConnection:
358        """Create a new connection between an already deployed source and destination.
359
360        Returns the newly deployed connection object.
361
362        Args:
363            connection_name: The name of the connection.
364            source: The deployed source. You can pass a source ID or a CloudSource object.
365            destination: The deployed destination. You can pass a destination ID or a
366                CloudDestination object.
367            table_prefix: Optional. The table prefix to use when syncing to the destination.
368            selected_streams: The selected stream names to sync within the connection.
369        """
370        if not selected_streams:
371            raise exc.PyAirbyteInputError(
372                guidance="You must provide `selected_streams` when creating a connection."
373            )
374
375        source_id: str = source if isinstance(source, str) else source.connector_id
376        destination_id: str = (
377            destination if isinstance(destination, str) else destination.connector_id
378        )
379
380        deployed_connection = api_util.create_connection(
381            name=connection_name,
382            source_id=source_id,
383            destination_id=destination_id,
384            api_root=self.api_root,
385            workspace_id=self.workspace_id,
386            selected_stream_names=selected_streams,
387            prefix=table_prefix or "",
388            client_id=self.client_id,
389            client_secret=self.client_secret,
390        )
391
392        return CloudConnection(
393            workspace=self,
394            connection_id=deployed_connection.connection_id,
395            source=deployed_connection.source_id,
396            destination=deployed_connection.destination_id,
397        )
398
399    def permanently_delete_connection(
400        self,
401        connection: str | CloudConnection,
402        *,
403        cascade_delete_source: bool = False,
404        cascade_delete_destination: bool = False,
405        safe_mode: bool = True,
406    ) -> None:
407        """Delete a deployed connection from the workspace.
408
409        Args:
410            connection: The connection ID or CloudConnection object to delete
411            cascade_delete_source: If True, also delete the source after deleting the connection
412            cascade_delete_destination: If True, also delete the destination after deleting
413                the connection
414            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
415                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
416                to cascade deletes.
417        """
418        if connection is None:
419            raise ValueError("No connection ID provided.")
420
421        if isinstance(connection, str):
422            connection = CloudConnection(
423                workspace=self,
424                connection_id=connection,
425            )
426
427        api_util.delete_connection(
428            connection_id=connection.connection_id,
429            connection_name=connection.name,
430            api_root=self.api_root,
431            workspace_id=self.workspace_id,
432            client_id=self.client_id,
433            client_secret=self.client_secret,
434            safe_mode=safe_mode,
435        )
436
437        if cascade_delete_source:
438            self.permanently_delete_source(
439                source=connection.source_id,
440                safe_mode=safe_mode,
441            )
442        if cascade_delete_destination:
443            self.permanently_delete_destination(
444                destination=connection.destination_id,
445                safe_mode=safe_mode,
446            )
447
448    # List sources, destinations, and connections
449
450    def list_connections(
451        self,
452        name: str | None = None,
453        *,
454        name_filter: Callable | None = None,
455    ) -> list[CloudConnection]:
456        """List connections by name in the workspace.
457
458        TODO: Add pagination support
459        """
460        connections = api_util.list_connections(
461            api_root=self.api_root,
462            workspace_id=self.workspace_id,
463            name=name,
464            name_filter=name_filter,
465            client_id=self.client_id,
466            client_secret=self.client_secret,
467        )
468        return [
469            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
470                workspace=self,
471                connection_response=connection,
472            )
473            for connection in connections
474            if name is None or connection.name == name
475        ]
476
477    def list_sources(
478        self,
479        name: str | None = None,
480        *,
481        name_filter: Callable | None = None,
482    ) -> list[CloudSource]:
483        """List all sources in the workspace.
484
485        TODO: Add pagination support
486        """
487        sources = api_util.list_sources(
488            api_root=self.api_root,
489            workspace_id=self.workspace_id,
490            name=name,
491            name_filter=name_filter,
492            client_id=self.client_id,
493            client_secret=self.client_secret,
494        )
495        return [
496            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
497                workspace=self,
498                source_response=source,
499            )
500            for source in sources
501            if name is None or source.name == name
502        ]
503
504    def list_destinations(
505        self,
506        name: str | None = None,
507        *,
508        name_filter: Callable | None = None,
509    ) -> list[CloudDestination]:
510        """List all destinations in the workspace.
511
512        TODO: Add pagination support
513        """
514        destinations = api_util.list_destinations(
515            api_root=self.api_root,
516            workspace_id=self.workspace_id,
517            name=name,
518            name_filter=name_filter,
519            client_id=self.client_id,
520            client_secret=self.client_secret,
521        )
522        return [
523            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
524                workspace=self,
525                destination_response=destination,
526            )
527            for destination in destinations
528            if name is None or destination.name == name
529        ]
530
531    def publish_custom_source_definition(
532        self,
533        name: str,
534        *,
535        manifest_yaml: dict[str, Any] | Path | str | None = None,
536        docker_image: str | None = None,
537        docker_tag: str | None = None,
538        unique: bool = True,
539        pre_validate: bool = True,
540        testing_values: dict[str, Any] | None = None,
541    ) -> CustomCloudSourceDefinition:
542        """Publish a custom source connector definition.
543
544        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
545        and docker_tag (for Docker connectors), but not both.
546
547        Args:
548            name: Display name for the connector definition
549            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
550            docker_image: Docker repository (e.g., 'airbyte/source-custom')
551            docker_tag: Docker image tag (e.g., '1.0.0')
552            unique: Whether to enforce name uniqueness
553            pre_validate: Whether to validate manifest client-side (YAML only)
554            testing_values: Optional configuration values to use for testing in the
555                Connector Builder UI. If provided, these values are stored as the complete
556                testing values object for the connector builder project (replaces any existing
557                values), allowing immediate test read operations.
558
559        Returns:
560            CustomCloudSourceDefinition object representing the created definition
561
562        Raises:
563            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
564            AirbyteDuplicateResourcesError: If unique=True and name already exists
565        """
566        is_yaml = manifest_yaml is not None
567        is_docker = docker_image is not None
568
569        if is_yaml == is_docker:
570            raise exc.PyAirbyteInputError(
571                message=(
572                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
573                    "docker_image + docker_tag (for Docker connectors), but not both"
574                ),
575                context={
576                    "manifest_yaml_provided": is_yaml,
577                    "docker_image_provided": is_docker,
578                },
579            )
580
581        if is_docker and docker_tag is None:
582            raise exc.PyAirbyteInputError(
583                message="docker_tag is required when docker_image is specified",
584                context={"docker_image": docker_image},
585            )
586
587        if unique:
588            existing = self.list_custom_source_definitions(
589                definition_type="yaml" if is_yaml else "docker",
590            )
591            if any(d.name == name for d in existing):
592                raise exc.AirbyteDuplicateResourcesError(
593                    resource_type="custom_source_definition",
594                    resource_name=name,
595                )
596
597        if is_yaml:
598            manifest_dict: dict[str, Any]
599            if isinstance(manifest_yaml, Path):
600                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
601            elif isinstance(manifest_yaml, str):
602                manifest_dict = yaml.safe_load(manifest_yaml)
603            elif manifest_yaml is not None:
604                manifest_dict = manifest_yaml
605            else:
606                raise exc.PyAirbyteInputError(
607                    message="manifest_yaml is required for YAML connectors",
608                    context={"name": name},
609                )
610
611            if pre_validate:
612                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
613
614            result = api_util.create_custom_yaml_source_definition(
615                name=name,
616                workspace_id=self.workspace_id,
617                manifest=manifest_dict,
618                api_root=self.api_root,
619                client_id=self.client_id,
620                client_secret=self.client_secret,
621            )
622            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
623                self, result
624            )
625
626            # Set testing values if provided
627            if testing_values is not None:
628                custom_definition.set_testing_values(testing_values)
629
630            return custom_definition
631
632        raise NotImplementedError(
633            "Docker custom source definitions are not yet supported. "
634            "Only YAML manifest-based custom sources are currently available."
635        )
636
637    def list_custom_source_definitions(
638        self,
639        *,
640        definition_type: Literal["yaml", "docker"],
641    ) -> list[CustomCloudSourceDefinition]:
642        """List custom source connector definitions.
643
644        Args:
645            definition_type: Connector type to list ("yaml" or "docker"). Required.
646
647        Returns:
648            List of CustomCloudSourceDefinition objects matching the specified type
649        """
650        if definition_type == "yaml":
651            yaml_definitions = api_util.list_custom_yaml_source_definitions(
652                workspace_id=self.workspace_id,
653                api_root=self.api_root,
654                client_id=self.client_id,
655                client_secret=self.client_secret,
656            )
657            return [
658                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
659                for d in yaml_definitions
660            ]
661
662        raise NotImplementedError(
663            "Docker custom source definitions are not yet supported. "
664            "Only YAML manifest-based custom sources are currently available."
665        )
666
667    def get_custom_source_definition(
668        self,
669        definition_id: str,
670        *,
671        definition_type: Literal["yaml", "docker"],
672    ) -> CustomCloudSourceDefinition:
673        """Get a specific custom source definition by ID.
674
675        Args:
676            definition_id: The definition ID
677            definition_type: Connector type ("yaml" or "docker"). Required.
678
679        Returns:
680            CustomCloudSourceDefinition object
681        """
682        if definition_type == "yaml":
683            result = api_util.get_custom_yaml_source_definition(
684                workspace_id=self.workspace_id,
685                definition_id=definition_id,
686                api_root=self.api_root,
687                client_id=self.client_id,
688                client_secret=self.client_secret,
689            )
690            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
691
692        raise NotImplementedError(
693            "Docker custom source definitions are not yet supported. "
694            "Only YAML manifest-based custom sources are currently available."
695        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
83    @property
84    def workspace_url(self) -> str | None:
85        """The web URL of the workspace."""
86        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

organization_id: str | None
101    @property
102    def organization_id(self) -> str | None:
103        """The ID of the organization this workspace belongs to.
104
105        This value is cached after the first lookup.
106        """
107        return self._organization_info.get("organizationId")

The ID of the organization this workspace belongs to.

This value is cached after the first lookup.

organization_name: str | None
109    @property
110    def organization_name(self) -> str | None:
111        """The name of the organization this workspace belongs to.
112
113        This value is cached after the first lookup.
114        """
115        return self._organization_info.get("organizationName")

The name of the organization this workspace belongs to.

This value is cached after the first lookup.

def connect(self) -> None:
119    def connect(self) -> None:
120        """Check that the workspace is reachable and raise an exception otherwise.
121
122        Note: It is not necessary to call this method before calling other operations. It
123              serves primarily as a simple check to ensure that the workspace is reachable
124              and credentials are correct.
125        """
126        _ = api_util.get_workspace(
127            api_root=self.api_root,
128            workspace_id=self.workspace_id,
129            client_id=self.client_id,
130            client_secret=self.client_secret,
131        )
132        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
136    def get_connection(
137        self,
138        connection_id: str,
139    ) -> CloudConnection:
140        """Get a connection by ID.
141
142        This method does not fetch data from the API. It returns a `CloudConnection` object,
143        which will be loaded lazily as needed.
144        """
145        return CloudConnection(
146            workspace=self,
147            connection_id=connection_id,
148        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
150    def get_source(
151        self,
152        source_id: str,
153    ) -> CloudSource:
154        """Get a source by ID.
155
156        This method does not fetch data from the API. It returns a `CloudSource` object,
157        which will be loaded lazily as needed.
158        """
159        return CloudSource(
160            workspace=self,
161            connector_id=source_id,
162        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
164    def get_destination(
165        self,
166        destination_id: str,
167    ) -> CloudDestination:
168        """Get a destination by ID.
169
170        This method does not fetch data from the API. It returns a `CloudDestination` object,
171        which will be loaded lazily as needed.
172        """
173        return CloudDestination(
174            workspace=self,
175            connector_id=destination_id,
176        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
180    def deploy_source(
181        self,
182        name: str,
183        source: Source,
184        *,
185        unique: bool = True,
186        random_name_suffix: bool = False,
187    ) -> CloudSource:
188        """Deploy a source to the workspace.
189
190        Returns the newly deployed source.
191
192        Args:
193            name: The name to use when deploying.
194            source: The source object to deploy.
195            unique: Whether to require a unique name. If `True`, duplicate names
196                are not allowed. Defaults to `True`.
197            random_name_suffix: Whether to append a random suffix to the name.
198        """
199        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
200        source_config_dict["sourceType"] = source.name.replace("source-", "")
201
202        if random_name_suffix:
203            name += f" (ID: {text_util.generate_random_suffix()})"
204
205        if unique:
206            existing = self.list_sources(name=name)
207            if existing:
208                raise exc.AirbyteDuplicateResourcesError(
209                    resource_type="source",
210                    resource_name=name,
211                )
212
213        deployed_source = api_util.create_source(
214            name=name,
215            api_root=self.api_root,
216            workspace_id=self.workspace_id,
217            config=source_config_dict,
218            client_id=self.client_id,
219            client_secret=self.client_secret,
220        )
221        return CloudSource(
222            workspace=self,
223            connector_id=deployed_source.source_id,
224        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
226    def deploy_destination(
227        self,
228        name: str,
229        destination: Destination | dict[str, Any],
230        *,
231        unique: bool = True,
232        random_name_suffix: bool = False,
233    ) -> CloudDestination:
234        """Deploy a destination to the workspace.
235
236        Returns the newly deployed destination ID.
237
238        Args:
239            name: The name to use when deploying.
240            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
241                dictionary of configuration values.
242            unique: Whether to require a unique name. If `True`, duplicate names
243                are not allowed. Defaults to `True`.
244            random_name_suffix: Whether to append a random suffix to the name.
245        """
246        if isinstance(destination, Destination):
247            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
248            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
249            # raise ValueError(destination_conf_dict)
250        else:
251            destination_conf_dict = destination.copy()
252            if "destinationType" not in destination_conf_dict:
253                raise exc.PyAirbyteInputError(
254                    message="Missing `destinationType` in configuration dictionary.",
255                )
256
257        if random_name_suffix:
258            name += f" (ID: {text_util.generate_random_suffix()})"
259
260        if unique:
261            existing = self.list_destinations(name=name)
262            if existing:
263                raise exc.AirbyteDuplicateResourcesError(
264                    resource_type="destination",
265                    resource_name=name,
266                )
267
268        deployed_destination = api_util.create_destination(
269            name=name,
270            api_root=self.api_root,
271            workspace_id=self.workspace_id,
272            config=destination_conf_dict,  # Wants a dataclass but accepts dict
273            client_id=self.client_id,
274            client_secret=self.client_secret,
275        )
276        return CloudDestination(
277            workspace=self,
278            connector_id=deployed_destination.destination_id,
279        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
281    def permanently_delete_source(
282        self,
283        source: str | CloudSource,
284        *,
285        safe_mode: bool = True,
286    ) -> None:
287        """Delete a source from the workspace.
288
289        You can pass either the source ID `str` or a deployed `Source` object.
290
291        Args:
292            source: The source ID or CloudSource object to delete
293            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
294                (case insensitive) to prevent accidental deletion. Defaults to True.
295        """
296        if not isinstance(source, (str, CloudSource)):
297            raise exc.PyAirbyteInputError(
298                message="Invalid source type.",
299                input_value=type(source).__name__,
300            )
301
302        api_util.delete_source(
303            source_id=source.connector_id if isinstance(source, CloudSource) else source,
304            source_name=source.name if isinstance(source, CloudSource) else None,
305            api_root=self.api_root,
306            client_id=self.client_id,
307            client_secret=self.client_secret,
308            safe_mode=safe_mode,
309        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
313    def permanently_delete_destination(
314        self,
315        destination: str | CloudDestination,
316        *,
317        safe_mode: bool = True,
318    ) -> None:
319        """Delete a deployed destination from the workspace.
320
321        You can pass either the `Cache` class or the deployed destination ID as a `str`.
322
323        Args:
324            destination: The destination ID or CloudDestination object to delete
325            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
326                (case insensitive) to prevent accidental deletion. Defaults to True.
327        """
328        if not isinstance(destination, (str, CloudDestination)):
329            raise exc.PyAirbyteInputError(
330                message="Invalid destination type.",
331                input_value=type(destination).__name__,
332            )
333
334        api_util.delete_destination(
335            destination_id=(
336                destination if isinstance(destination, str) else destination.destination_id
337            ),
338            destination_name=(
339                destination.name if isinstance(destination, CloudDestination) else None
340            ),
341            api_root=self.api_root,
342            client_id=self.client_id,
343            client_secret=self.client_secret,
344            safe_mode=safe_mode,
345        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
349    def deploy_connection(
350        self,
351        connection_name: str,
352        *,
353        source: CloudSource | str,
354        selected_streams: list[str],
355        destination: CloudDestination | str,
356        table_prefix: str | None = None,
357    ) -> CloudConnection:
358        """Create a new connection between an already deployed source and destination.
359
360        Returns the newly deployed connection object.
361
362        Args:
363            connection_name: The name of the connection.
364            source: The deployed source. You can pass a source ID or a CloudSource object.
365            destination: The deployed destination. You can pass a destination ID or a
366                CloudDestination object.
367            table_prefix: Optional. The table prefix to use when syncing to the destination.
368            selected_streams: The selected stream names to sync within the connection.
369        """
370        if not selected_streams:
371            raise exc.PyAirbyteInputError(
372                guidance="You must provide `selected_streams` when creating a connection."
373            )
374
375        source_id: str = source if isinstance(source, str) else source.connector_id
376        destination_id: str = (
377            destination if isinstance(destination, str) else destination.connector_id
378        )
379
380        deployed_connection = api_util.create_connection(
381            name=connection_name,
382            source_id=source_id,
383            destination_id=destination_id,
384            api_root=self.api_root,
385            workspace_id=self.workspace_id,
386            selected_stream_names=selected_streams,
387            prefix=table_prefix or "",
388            client_id=self.client_id,
389            client_secret=self.client_secret,
390        )
391
392        return CloudConnection(
393            workspace=self,
394            connection_id=deployed_connection.connection_id,
395            source=deployed_connection.source_id,
396            destination=deployed_connection.destination_id,
397        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
399    def permanently_delete_connection(
400        self,
401        connection: str | CloudConnection,
402        *,
403        cascade_delete_source: bool = False,
404        cascade_delete_destination: bool = False,
405        safe_mode: bool = True,
406    ) -> None:
407        """Delete a deployed connection from the workspace.
408
409        Args:
410            connection: The connection ID or CloudConnection object to delete
411            cascade_delete_source: If True, also delete the source after deleting the connection
412            cascade_delete_destination: If True, also delete the destination after deleting
413                the connection
414            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
415                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
416                to cascade deletes.
417        """
418        if connection is None:
419            raise ValueError("No connection ID provided.")
420
421        if isinstance(connection, str):
422            connection = CloudConnection(
423                workspace=self,
424                connection_id=connection,
425            )
426
427        api_util.delete_connection(
428            connection_id=connection.connection_id,
429            connection_name=connection.name,
430            api_root=self.api_root,
431            workspace_id=self.workspace_id,
432            client_id=self.client_id,
433            client_secret=self.client_secret,
434            safe_mode=safe_mode,
435        )
436
437        if cascade_delete_source:
438            self.permanently_delete_source(
439                source=connection.source_id,
440                safe_mode=safe_mode,
441            )
442        if cascade_delete_destination:
443            self.permanently_delete_destination(
444                destination=connection.destination_id,
445                safe_mode=safe_mode,
446            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.CloudConnection]:
450    def list_connections(
451        self,
452        name: str | None = None,
453        *,
454        name_filter: Callable | None = None,
455    ) -> list[CloudConnection]:
456        """List connections by name in the workspace.
457
458        TODO: Add pagination support
459        """
460        connections = api_util.list_connections(
461            api_root=self.api_root,
462            workspace_id=self.workspace_id,
463            name=name,
464            name_filter=name_filter,
465            client_id=self.client_id,
466            client_secret=self.client_secret,
467        )
468        return [
469            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
470                workspace=self,
471                connection_response=connection,
472            )
473            for connection in connections
474            if name is None or connection.name == name
475        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
477    def list_sources(
478        self,
479        name: str | None = None,
480        *,
481        name_filter: Callable | None = None,
482    ) -> list[CloudSource]:
483        """List all sources in the workspace.
484
485        TODO: Add pagination support
486        """
487        sources = api_util.list_sources(
488            api_root=self.api_root,
489            workspace_id=self.workspace_id,
490            name=name,
491            name_filter=name_filter,
492            client_id=self.client_id,
493            client_secret=self.client_secret,
494        )
495        return [
496            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
497                workspace=self,
498                source_response=source,
499            )
500            for source in sources
501            if name is None or source.name == name
502        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
504    def list_destinations(
505        self,
506        name: str | None = None,
507        *,
508        name_filter: Callable | None = None,
509    ) -> list[CloudDestination]:
510        """List all destinations in the workspace.
511
512        TODO: Add pagination support
513        """
514        destinations = api_util.list_destinations(
515            api_root=self.api_root,
516            workspace_id=self.workspace_id,
517            name=name,
518            name_filter=name_filter,
519            client_id=self.client_id,
520            client_secret=self.client_secret,
521        )
522        return [
523            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
524                workspace=self,
525                destination_response=destination,
526            )
527            for destination in destinations
528            if name is None or destination.name == name
529        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
531    def publish_custom_source_definition(
532        self,
533        name: str,
534        *,
535        manifest_yaml: dict[str, Any] | Path | str | None = None,
536        docker_image: str | None = None,
537        docker_tag: str | None = None,
538        unique: bool = True,
539        pre_validate: bool = True,
540        testing_values: dict[str, Any] | None = None,
541    ) -> CustomCloudSourceDefinition:
542        """Publish a custom source connector definition.
543
544        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
545        and docker_tag (for Docker connectors), but not both.
546
547        Args:
548            name: Display name for the connector definition
549            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
550            docker_image: Docker repository (e.g., 'airbyte/source-custom')
551            docker_tag: Docker image tag (e.g., '1.0.0')
552            unique: Whether to enforce name uniqueness
553            pre_validate: Whether to validate manifest client-side (YAML only)
554            testing_values: Optional configuration values to use for testing in the
555                Connector Builder UI. If provided, these values are stored as the complete
556                testing values object for the connector builder project (replaces any existing
557                values), allowing immediate test read operations.
558
559        Returns:
560            CustomCloudSourceDefinition object representing the created definition
561
562        Raises:
563            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
564            AirbyteDuplicateResourcesError: If unique=True and name already exists
565        """
566        is_yaml = manifest_yaml is not None
567        is_docker = docker_image is not None
568
569        if is_yaml == is_docker:
570            raise exc.PyAirbyteInputError(
571                message=(
572                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
573                    "docker_image + docker_tag (for Docker connectors), but not both"
574                ),
575                context={
576                    "manifest_yaml_provided": is_yaml,
577                    "docker_image_provided": is_docker,
578                },
579            )
580
581        if is_docker and docker_tag is None:
582            raise exc.PyAirbyteInputError(
583                message="docker_tag is required when docker_image is specified",
584                context={"docker_image": docker_image},
585            )
586
587        if unique:
588            existing = self.list_custom_source_definitions(
589                definition_type="yaml" if is_yaml else "docker",
590            )
591            if any(d.name == name for d in existing):
592                raise exc.AirbyteDuplicateResourcesError(
593                    resource_type="custom_source_definition",
594                    resource_name=name,
595                )
596
597        if is_yaml:
598            manifest_dict: dict[str, Any]
599            if isinstance(manifest_yaml, Path):
600                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
601            elif isinstance(manifest_yaml, str):
602                manifest_dict = yaml.safe_load(manifest_yaml)
603            elif manifest_yaml is not None:
604                manifest_dict = manifest_yaml
605            else:
606                raise exc.PyAirbyteInputError(
607                    message="manifest_yaml is required for YAML connectors",
608                    context={"name": name},
609                )
610
611            if pre_validate:
612                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
613
614            result = api_util.create_custom_yaml_source_definition(
615                name=name,
616                workspace_id=self.workspace_id,
617                manifest=manifest_dict,
618                api_root=self.api_root,
619                client_id=self.client_id,
620                client_secret=self.client_secret,
621            )
622            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
623                self, result
624            )
625
626            # Set testing values if provided
627            if testing_values is not None:
628                custom_definition.set_testing_values(testing_values)
629
630            return custom_definition
631
632        raise NotImplementedError(
633            "Docker custom source definitions are not yet supported. "
634            "Only YAML manifest-based custom sources are currently available."
635        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
637    def list_custom_source_definitions(
638        self,
639        *,
640        definition_type: Literal["yaml", "docker"],
641    ) -> list[CustomCloudSourceDefinition]:
642        """List custom source connector definitions.
643
644        Args:
645            definition_type: Connector type to list ("yaml" or "docker"). Required.
646
647        Returns:
648            List of CustomCloudSourceDefinition objects matching the specified type
649        """
650        if definition_type == "yaml":
651            yaml_definitions = api_util.list_custom_yaml_source_definitions(
652                workspace_id=self.workspace_id,
653                api_root=self.api_root,
654                client_id=self.client_id,
655                client_secret=self.client_secret,
656            )
657            return [
658                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
659                for d in yaml_definitions
660            ]
661
662        raise NotImplementedError(
663            "Docker custom source definitions are not yet supported. "
664            "Only YAML manifest-based custom sources are currently available."
665        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
667    def get_custom_source_definition(
668        self,
669        definition_id: str,
670        *,
671        definition_type: Literal["yaml", "docker"],
672    ) -> CustomCloudSourceDefinition:
673        """Get a specific custom source definition by ID.
674
675        Args:
676            definition_id: The definition ID
677            definition_type: Connector type ("yaml" or "docker"). Required.
678
679        Returns:
680            CustomCloudSourceDefinition object
681        """
682        if definition_type == "yaml":
683            result = api_util.get_custom_yaml_source_definition(
684                workspace_id=self.workspace_id,
685                definition_id=definition_id,
686                api_root=self.api_root,
687                client_id=self.client_id,
688                client_secret=self.client_secret,
689            )
690            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
691
692        raise NotImplementedError(
693            "Docker custom source definitions are not yet supported. "
694            "Only YAML manifest-based custom sources are currently available."
695        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object