airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6
  7## Usage Examples
  8
  9Get a new workspace object and deploy a source to it:
 10
 11```python
 12import airbyte as ab
 13from airbyte import cloud
 14
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="...",
 17    client_id="...",
 18    client_secret="...",
 19)
 20
 21# Deploy a source to the workspace
 22source = ab.get_source("source-faker", config={"count": 100})
 23deployed_source = workspace.deploy_source(
 24    name="test-source",
 25    source=source,
 26)
 27
 28# Run a check on the deployed source and raise an exception if the check fails
 29check_result = deployed_source.check(raise_on_error=True)
 30
 31# Permanently delete the newly-created source
 32workspace.permanently_delete_source(deployed_source)
 33```
 34"""
 35
 36from __future__ import annotations
 37
 38from dataclasses import dataclass
 39from pathlib import Path
 40from typing import TYPE_CHECKING, Any, Literal
 41
 42import yaml
 43
 44from airbyte import exceptions as exc
 45from airbyte._util import api_util, text_util
 46from airbyte._util.api_util import get_web_url_root
 47from airbyte.cloud.connections import CloudConnection
 48from airbyte.cloud.connectors import (
 49    CloudDestination,
 50    CloudSource,
 51    CustomCloudSourceDefinition,
 52)
 53from airbyte.destinations.base import Destination
 54from airbyte.secrets.base import SecretString
 55
 56
 57if TYPE_CHECKING:
 58    from collections.abc import Callable
 59
 60    from airbyte.sources.base import Source
 61
 62
 63@dataclass
 64class CloudWorkspace:
 65    """A remote workspace on the Airbyte Cloud.
 66
 67    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 68    instances, both OSS and Enterprise.
 69    """
 70
 71    workspace_id: str
 72    client_id: SecretString
 73    client_secret: SecretString
 74    api_root: str = api_util.CLOUD_API_ROOT
 75
 76    def __post_init__(self) -> None:
 77        """Ensure that the client ID and secret are handled securely."""
 78        self.client_id = SecretString(self.client_id)
 79        self.client_secret = SecretString(self.client_secret)
 80
 81    @property
 82    def workspace_url(self) -> str | None:
 83        """The web URL of the workspace."""
 84        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 85
 86    # Test connection and creds
 87
 88    def connect(self) -> None:
 89        """Check that the workspace is reachable and raise an exception otherwise.
 90
 91        Note: It is not necessary to call this method before calling other operations. It
 92              serves primarily as a simple check to ensure that the workspace is reachable
 93              and credentials are correct.
 94        """
 95        _ = api_util.get_workspace(
 96            api_root=self.api_root,
 97            workspace_id=self.workspace_id,
 98            client_id=self.client_id,
 99            client_secret=self.client_secret,
100        )
101        print(f"Successfully connected to workspace: {self.workspace_url}")
102
103    # Get sources, destinations, and connections
104
105    def get_connection(
106        self,
107        connection_id: str,
108    ) -> CloudConnection:
109        """Get a connection by ID.
110
111        This method does not fetch data from the API. It returns a `CloudConnection` object,
112        which will be loaded lazily as needed.
113        """
114        return CloudConnection(
115            workspace=self,
116            connection_id=connection_id,
117        )
118
119    def get_source(
120        self,
121        source_id: str,
122    ) -> CloudSource:
123        """Get a source by ID.
124
125        This method does not fetch data from the API. It returns a `CloudSource` object,
126        which will be loaded lazily as needed.
127        """
128        return CloudSource(
129            workspace=self,
130            connector_id=source_id,
131        )
132
133    def get_destination(
134        self,
135        destination_id: str,
136    ) -> CloudDestination:
137        """Get a destination by ID.
138
139        This method does not fetch data from the API. It returns a `CloudDestination` object,
140        which will be loaded lazily as needed.
141        """
142        return CloudDestination(
143            workspace=self,
144            connector_id=destination_id,
145        )
146
147    # Deploy sources and destinations
148
149    def deploy_source(
150        self,
151        name: str,
152        source: Source,
153        *,
154        unique: bool = True,
155        random_name_suffix: bool = False,
156    ) -> CloudSource:
157        """Deploy a source to the workspace.
158
159        Returns the newly deployed source.
160
161        Args:
162            name: The name to use when deploying.
163            source: The source object to deploy.
164            unique: Whether to require a unique name. If `True`, duplicate names
165                are not allowed. Defaults to `True`.
166            random_name_suffix: Whether to append a random suffix to the name.
167        """
168        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
169        source_config_dict["sourceType"] = source.name.replace("source-", "")
170
171        if random_name_suffix:
172            name += f" (ID: {text_util.generate_random_suffix()})"
173
174        if unique:
175            existing = self.list_sources(name=name)
176            if existing:
177                raise exc.AirbyteDuplicateResourcesError(
178                    resource_type="source",
179                    resource_name=name,
180                )
181
182        deployed_source = api_util.create_source(
183            name=name,
184            api_root=self.api_root,
185            workspace_id=self.workspace_id,
186            config=source_config_dict,
187            client_id=self.client_id,
188            client_secret=self.client_secret,
189        )
190        return CloudSource(
191            workspace=self,
192            connector_id=deployed_source.source_id,
193        )
194
195    def deploy_destination(
196        self,
197        name: str,
198        destination: Destination | dict[str, Any],
199        *,
200        unique: bool = True,
201        random_name_suffix: bool = False,
202    ) -> CloudDestination:
203        """Deploy a destination to the workspace.
204
205        Returns the newly deployed destination ID.
206
207        Args:
208            name: The name to use when deploying.
209            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
210                dictionary of configuration values.
211            unique: Whether to require a unique name. If `True`, duplicate names
212                are not allowed. Defaults to `True`.
213            random_name_suffix: Whether to append a random suffix to the name.
214        """
215        if isinstance(destination, Destination):
216            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
217            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
218            # raise ValueError(destination_conf_dict)
219        else:
220            destination_conf_dict = destination.copy()
221            if "destinationType" not in destination_conf_dict:
222                raise exc.PyAirbyteInputError(
223                    message="Missing `destinationType` in configuration dictionary.",
224                )
225
226        if random_name_suffix:
227            name += f" (ID: {text_util.generate_random_suffix()})"
228
229        if unique:
230            existing = self.list_destinations(name=name)
231            if existing:
232                raise exc.AirbyteDuplicateResourcesError(
233                    resource_type="destination",
234                    resource_name=name,
235                )
236
237        deployed_destination = api_util.create_destination(
238            name=name,
239            api_root=self.api_root,
240            workspace_id=self.workspace_id,
241            config=destination_conf_dict,  # Wants a dataclass but accepts dict
242            client_id=self.client_id,
243            client_secret=self.client_secret,
244        )
245        return CloudDestination(
246            workspace=self,
247            connector_id=deployed_destination.destination_id,
248        )
249
250    def permanently_delete_source(
251        self,
252        source: str | CloudSource,
253    ) -> None:
254        """Delete a source from the workspace.
255
256        You can pass either the source ID `str` or a deployed `Source` object.
257        """
258        if not isinstance(source, (str, CloudSource)):
259            raise exc.PyAirbyteInputError(
260                message="Invalid source type.",
261                input_value=type(source).__name__,
262            )
263
264        api_util.delete_source(
265            source_id=source.connector_id if isinstance(source, CloudSource) else source,
266            api_root=self.api_root,
267            client_id=self.client_id,
268            client_secret=self.client_secret,
269        )
270
271    # Deploy and delete destinations
272
273    def permanently_delete_destination(
274        self,
275        destination: str | CloudDestination,
276    ) -> None:
277        """Delete a deployed destination from the workspace.
278
279        You can pass either the `Cache` class or the deployed destination ID as a `str`.
280        """
281        if not isinstance(destination, (str, CloudDestination)):
282            raise exc.PyAirbyteInputError(
283                message="Invalid destination type.",
284                input_value=type(destination).__name__,
285            )
286
287        api_util.delete_destination(
288            destination_id=(
289                destination if isinstance(destination, str) else destination.destination_id
290            ),
291            api_root=self.api_root,
292            client_id=self.client_id,
293            client_secret=self.client_secret,
294        )
295
296    # Deploy and delete connections
297
298    def deploy_connection(
299        self,
300        connection_name: str,
301        *,
302        source: CloudSource | str,
303        selected_streams: list[str],
304        destination: CloudDestination | str,
305        table_prefix: str | None = None,
306    ) -> CloudConnection:
307        """Create a new connection between an already deployed source and destination.
308
309        Returns the newly deployed connection object.
310
311        Args:
312            connection_name: The name of the connection.
313            source: The deployed source. You can pass a source ID or a CloudSource object.
314            destination: The deployed destination. You can pass a destination ID or a
315                CloudDestination object.
316            table_prefix: Optional. The table prefix to use when syncing to the destination.
317            selected_streams: The selected stream names to sync within the connection.
318        """
319        if not selected_streams:
320            raise exc.PyAirbyteInputError(
321                guidance="You must provide `selected_streams` when creating a connection."
322            )
323
324        source_id: str = source if isinstance(source, str) else source.connector_id
325        destination_id: str = (
326            destination if isinstance(destination, str) else destination.connector_id
327        )
328
329        deployed_connection = api_util.create_connection(
330            name=connection_name,
331            source_id=source_id,
332            destination_id=destination_id,
333            api_root=self.api_root,
334            workspace_id=self.workspace_id,
335            selected_stream_names=selected_streams,
336            prefix=table_prefix or "",
337            client_id=self.client_id,
338            client_secret=self.client_secret,
339        )
340
341        return CloudConnection(
342            workspace=self,
343            connection_id=deployed_connection.connection_id,
344            source=deployed_connection.source_id,
345            destination=deployed_connection.destination_id,
346        )
347
348    def permanently_delete_connection(
349        self,
350        connection: str | CloudConnection,
351        *,
352        cascade_delete_source: bool = False,
353        cascade_delete_destination: bool = False,
354    ) -> None:
355        """Delete a deployed connection from the workspace."""
356        if connection is None:
357            raise ValueError("No connection ID provided.")
358
359        if isinstance(connection, str):
360            connection = CloudConnection(
361                workspace=self,
362                connection_id=connection,
363            )
364
365        api_util.delete_connection(
366            connection_id=connection.connection_id,
367            api_root=self.api_root,
368            workspace_id=self.workspace_id,
369            client_id=self.client_id,
370            client_secret=self.client_secret,
371        )
372
373        if cascade_delete_source:
374            self.permanently_delete_source(source=connection.source_id)
375        if cascade_delete_destination:
376            self.permanently_delete_destination(destination=connection.destination_id)
377
378    # List sources, destinations, and connections
379
380    def list_connections(
381        self,
382        name: str | None = None,
383        *,
384        name_filter: Callable | None = None,
385    ) -> list[CloudConnection]:
386        """List connections by name in the workspace.
387
388        TODO: Add pagination support
389        """
390        connections = api_util.list_connections(
391            api_root=self.api_root,
392            workspace_id=self.workspace_id,
393            name=name,
394            name_filter=name_filter,
395            client_id=self.client_id,
396            client_secret=self.client_secret,
397        )
398        return [
399            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
400                workspace=self,
401                connection_response=connection,
402            )
403            for connection in connections
404            if name is None or connection.name == name
405        ]
406
407    def list_sources(
408        self,
409        name: str | None = None,
410        *,
411        name_filter: Callable | None = None,
412    ) -> list[CloudSource]:
413        """List all sources in the workspace.
414
415        TODO: Add pagination support
416        """
417        sources = api_util.list_sources(
418            api_root=self.api_root,
419            workspace_id=self.workspace_id,
420            name=name,
421            name_filter=name_filter,
422            client_id=self.client_id,
423            client_secret=self.client_secret,
424        )
425        return [
426            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
427                workspace=self,
428                source_response=source,
429            )
430            for source in sources
431            if name is None or source.name == name
432        ]
433
434    def list_destinations(
435        self,
436        name: str | None = None,
437        *,
438        name_filter: Callable | None = None,
439    ) -> list[CloudDestination]:
440        """List all destinations in the workspace.
441
442        TODO: Add pagination support
443        """
444        destinations = api_util.list_destinations(
445            api_root=self.api_root,
446            workspace_id=self.workspace_id,
447            name=name,
448            name_filter=name_filter,
449            client_id=self.client_id,
450            client_secret=self.client_secret,
451        )
452        return [
453            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
454                workspace=self,
455                destination_response=destination,
456            )
457            for destination in destinations
458            if name is None or destination.name == name
459        ]
460
461    def publish_custom_source_definition(
462        self,
463        name: str,
464        *,
465        manifest_yaml: dict[str, Any] | Path | str | None = None,
466        docker_image: str | None = None,
467        docker_tag: str | None = None,
468        unique: bool = True,
469        pre_validate: bool = True,
470    ) -> CustomCloudSourceDefinition:
471        """Publish a custom source connector definition.
472
473        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
474        and docker_tag (for Docker connectors), but not both.
475
476        Args:
477            name: Display name for the connector definition
478            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
479            docker_image: Docker repository (e.g., 'airbyte/source-custom')
480            docker_tag: Docker image tag (e.g., '1.0.0')
481            unique: Whether to enforce name uniqueness
482            pre_validate: Whether to validate manifest client-side (YAML only)
483
484        Returns:
485            CustomCloudSourceDefinition object representing the created definition
486
487        Raises:
488            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
489            AirbyteDuplicateResourcesError: If unique=True and name already exists
490        """
491        is_yaml = manifest_yaml is not None
492        is_docker = docker_image is not None
493
494        if is_yaml == is_docker:
495            raise exc.PyAirbyteInputError(
496                message=(
497                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
498                    "docker_image + docker_tag (for Docker connectors), but not both"
499                ),
500                context={
501                    "manifest_yaml_provided": is_yaml,
502                    "docker_image_provided": is_docker,
503                },
504            )
505
506        if is_docker and docker_tag is None:
507            raise exc.PyAirbyteInputError(
508                message="docker_tag is required when docker_image is specified",
509                context={"docker_image": docker_image},
510            )
511
512        if unique:
513            existing = self.list_custom_source_definitions(
514                definition_type="yaml" if is_yaml else "docker",
515            )
516            if any(d.name == name for d in existing):
517                raise exc.AirbyteDuplicateResourcesError(
518                    resource_type="custom_source_definition",
519                    resource_name=name,
520                )
521
522        if is_yaml:
523            manifest_dict: dict[str, Any]
524            if isinstance(manifest_yaml, Path):
525                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
526            elif isinstance(manifest_yaml, str):
527                manifest_dict = yaml.safe_load(manifest_yaml)
528            elif manifest_yaml is not None:
529                manifest_dict = manifest_yaml
530            else:
531                raise exc.PyAirbyteInputError(
532                    message="manifest_yaml is required for YAML connectors",
533                    context={"name": name},
534                )
535
536            if pre_validate:
537                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
538
539            result = api_util.create_custom_yaml_source_definition(
540                name=name,
541                workspace_id=self.workspace_id,
542                manifest=manifest_dict,
543                api_root=self.api_root,
544                client_id=self.client_id,
545                client_secret=self.client_secret,
546            )
547            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
548
549        raise NotImplementedError(
550            "Docker custom source definitions are not yet supported. "
551            "Only YAML manifest-based custom sources are currently available."
552        )
553
554    def list_custom_source_definitions(
555        self,
556        *,
557        definition_type: Literal["yaml", "docker"],
558    ) -> list[CustomCloudSourceDefinition]:
559        """List custom source connector definitions.
560
561        Args:
562            definition_type: Connector type to list ("yaml" or "docker"). Required.
563
564        Returns:
565            List of CustomCloudSourceDefinition objects matching the specified type
566        """
567        if definition_type == "yaml":
568            yaml_definitions = api_util.list_custom_yaml_source_definitions(
569                workspace_id=self.workspace_id,
570                api_root=self.api_root,
571                client_id=self.client_id,
572                client_secret=self.client_secret,
573            )
574            return [
575                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
576                for d in yaml_definitions
577            ]
578
579        raise NotImplementedError(
580            "Docker custom source definitions are not yet supported. "
581            "Only YAML manifest-based custom sources are currently available."
582        )
583
584    def get_custom_source_definition(
585        self,
586        definition_id: str,
587        *,
588        definition_type: Literal["yaml", "docker"],
589    ) -> CustomCloudSourceDefinition:
590        """Get a specific custom source definition by ID.
591
592        Args:
593            definition_id: The definition ID
594            definition_type: Connector type ("yaml" or "docker"). Required.
595
596        Returns:
597            CustomCloudSourceDefinition object
598        """
599        if definition_type == "yaml":
600            result = api_util.get_custom_yaml_source_definition(
601                workspace_id=self.workspace_id,
602                definition_id=definition_id,
603                api_root=self.api_root,
604                client_id=self.client_id,
605                client_secret=self.client_secret,
606            )
607            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
608
609        raise NotImplementedError(
610            "Docker custom source definitions are not yet supported. "
611            "Only YAML manifest-based custom sources are currently available."
612        )
@dataclass
class CloudWorkspace:
 64@dataclass
 65class CloudWorkspace:
 66    """A remote workspace on the Airbyte Cloud.
 67
 68    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 69    instances, both OSS and Enterprise.
 70    """
 71
 72    workspace_id: str
 73    client_id: SecretString
 74    client_secret: SecretString
 75    api_root: str = api_util.CLOUD_API_ROOT
 76
 77    def __post_init__(self) -> None:
 78        """Ensure that the client ID and secret are handled securely."""
 79        self.client_id = SecretString(self.client_id)
 80        self.client_secret = SecretString(self.client_secret)
 81
 82    @property
 83    def workspace_url(self) -> str | None:
 84        """The web URL of the workspace."""
 85        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 86
 87    # Test connection and creds
 88
 89    def connect(self) -> None:
 90        """Check that the workspace is reachable and raise an exception otherwise.
 91
 92        Note: It is not necessary to call this method before calling other operations. It
 93              serves primarily as a simple check to ensure that the workspace is reachable
 94              and credentials are correct.
 95        """
 96        _ = api_util.get_workspace(
 97            api_root=self.api_root,
 98            workspace_id=self.workspace_id,
 99            client_id=self.client_id,
100            client_secret=self.client_secret,
101        )
102        print(f"Successfully connected to workspace: {self.workspace_url}")
103
104    # Get sources, destinations, and connections
105
106    def get_connection(
107        self,
108        connection_id: str,
109    ) -> CloudConnection:
110        """Get a connection by ID.
111
112        This method does not fetch data from the API. It returns a `CloudConnection` object,
113        which will be loaded lazily as needed.
114        """
115        return CloudConnection(
116            workspace=self,
117            connection_id=connection_id,
118        )
119
120    def get_source(
121        self,
122        source_id: str,
123    ) -> CloudSource:
124        """Get a source by ID.
125
126        This method does not fetch data from the API. It returns a `CloudSource` object,
127        which will be loaded lazily as needed.
128        """
129        return CloudSource(
130            workspace=self,
131            connector_id=source_id,
132        )
133
134    def get_destination(
135        self,
136        destination_id: str,
137    ) -> CloudDestination:
138        """Get a destination by ID.
139
140        This method does not fetch data from the API. It returns a `CloudDestination` object,
141        which will be loaded lazily as needed.
142        """
143        return CloudDestination(
144            workspace=self,
145            connector_id=destination_id,
146        )
147
148    # Deploy sources and destinations
149
150    def deploy_source(
151        self,
152        name: str,
153        source: Source,
154        *,
155        unique: bool = True,
156        random_name_suffix: bool = False,
157    ) -> CloudSource:
158        """Deploy a source to the workspace.
159
160        Returns the newly deployed source.
161
162        Args:
163            name: The name to use when deploying.
164            source: The source object to deploy.
165            unique: Whether to require a unique name. If `True`, duplicate names
166                are not allowed. Defaults to `True`.
167            random_name_suffix: Whether to append a random suffix to the name.
168        """
169        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
170        source_config_dict["sourceType"] = source.name.replace("source-", "")
171
172        if random_name_suffix:
173            name += f" (ID: {text_util.generate_random_suffix()})"
174
175        if unique:
176            existing = self.list_sources(name=name)
177            if existing:
178                raise exc.AirbyteDuplicateResourcesError(
179                    resource_type="source",
180                    resource_name=name,
181                )
182
183        deployed_source = api_util.create_source(
184            name=name,
185            api_root=self.api_root,
186            workspace_id=self.workspace_id,
187            config=source_config_dict,
188            client_id=self.client_id,
189            client_secret=self.client_secret,
190        )
191        return CloudSource(
192            workspace=self,
193            connector_id=deployed_source.source_id,
194        )
195
196    def deploy_destination(
197        self,
198        name: str,
199        destination: Destination | dict[str, Any],
200        *,
201        unique: bool = True,
202        random_name_suffix: bool = False,
203    ) -> CloudDestination:
204        """Deploy a destination to the workspace.
205
206        Returns the newly deployed destination ID.
207
208        Args:
209            name: The name to use when deploying.
210            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
211                dictionary of configuration values.
212            unique: Whether to require a unique name. If `True`, duplicate names
213                are not allowed. Defaults to `True`.
214            random_name_suffix: Whether to append a random suffix to the name.
215        """
216        if isinstance(destination, Destination):
217            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
218            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
219            # raise ValueError(destination_conf_dict)
220        else:
221            destination_conf_dict = destination.copy()
222            if "destinationType" not in destination_conf_dict:
223                raise exc.PyAirbyteInputError(
224                    message="Missing `destinationType` in configuration dictionary.",
225                )
226
227        if random_name_suffix:
228            name += f" (ID: {text_util.generate_random_suffix()})"
229
230        if unique:
231            existing = self.list_destinations(name=name)
232            if existing:
233                raise exc.AirbyteDuplicateResourcesError(
234                    resource_type="destination",
235                    resource_name=name,
236                )
237
238        deployed_destination = api_util.create_destination(
239            name=name,
240            api_root=self.api_root,
241            workspace_id=self.workspace_id,
242            config=destination_conf_dict,  # Wants a dataclass but accepts dict
243            client_id=self.client_id,
244            client_secret=self.client_secret,
245        )
246        return CloudDestination(
247            workspace=self,
248            connector_id=deployed_destination.destination_id,
249        )
250
251    def permanently_delete_source(
252        self,
253        source: str | CloudSource,
254    ) -> None:
255        """Delete a source from the workspace.
256
257        You can pass either the source ID `str` or a deployed `Source` object.
258        """
259        if not isinstance(source, (str, CloudSource)):
260            raise exc.PyAirbyteInputError(
261                message="Invalid source type.",
262                input_value=type(source).__name__,
263            )
264
265        api_util.delete_source(
266            source_id=source.connector_id if isinstance(source, CloudSource) else source,
267            api_root=self.api_root,
268            client_id=self.client_id,
269            client_secret=self.client_secret,
270        )
271
272    # Deploy and delete destinations
273
274    def permanently_delete_destination(
275        self,
276        destination: str | CloudDestination,
277    ) -> None:
278        """Delete a deployed destination from the workspace.
279
280        You can pass either the `Cache` class or the deployed destination ID as a `str`.
281        """
282        if not isinstance(destination, (str, CloudDestination)):
283            raise exc.PyAirbyteInputError(
284                message="Invalid destination type.",
285                input_value=type(destination).__name__,
286            )
287
288        api_util.delete_destination(
289            destination_id=(
290                destination if isinstance(destination, str) else destination.destination_id
291            ),
292            api_root=self.api_root,
293            client_id=self.client_id,
294            client_secret=self.client_secret,
295        )
296
297    # Deploy and delete connections
298
299    def deploy_connection(
300        self,
301        connection_name: str,
302        *,
303        source: CloudSource | str,
304        selected_streams: list[str],
305        destination: CloudDestination | str,
306        table_prefix: str | None = None,
307    ) -> CloudConnection:
308        """Create a new connection between an already deployed source and destination.
309
310        Returns the newly deployed connection object.
311
312        Args:
313            connection_name: The name of the connection.
314            source: The deployed source. You can pass a source ID or a CloudSource object.
315            destination: The deployed destination. You can pass a destination ID or a
316                CloudDestination object.
317            table_prefix: Optional. The table prefix to use when syncing to the destination.
318            selected_streams: The selected stream names to sync within the connection.
319        """
320        if not selected_streams:
321            raise exc.PyAirbyteInputError(
322                guidance="You must provide `selected_streams` when creating a connection."
323            )
324
325        source_id: str = source if isinstance(source, str) else source.connector_id
326        destination_id: str = (
327            destination if isinstance(destination, str) else destination.connector_id
328        )
329
330        deployed_connection = api_util.create_connection(
331            name=connection_name,
332            source_id=source_id,
333            destination_id=destination_id,
334            api_root=self.api_root,
335            workspace_id=self.workspace_id,
336            selected_stream_names=selected_streams,
337            prefix=table_prefix or "",
338            client_id=self.client_id,
339            client_secret=self.client_secret,
340        )
341
342        return CloudConnection(
343            workspace=self,
344            connection_id=deployed_connection.connection_id,
345            source=deployed_connection.source_id,
346            destination=deployed_connection.destination_id,
347        )
348
349    def permanently_delete_connection(
350        self,
351        connection: str | CloudConnection,
352        *,
353        cascade_delete_source: bool = False,
354        cascade_delete_destination: bool = False,
355    ) -> None:
356        """Delete a deployed connection from the workspace."""
357        if connection is None:
358            raise ValueError("No connection ID provided.")
359
360        if isinstance(connection, str):
361            connection = CloudConnection(
362                workspace=self,
363                connection_id=connection,
364            )
365
366        api_util.delete_connection(
367            connection_id=connection.connection_id,
368            api_root=self.api_root,
369            workspace_id=self.workspace_id,
370            client_id=self.client_id,
371            client_secret=self.client_secret,
372        )
373
374        if cascade_delete_source:
375            self.permanently_delete_source(source=connection.source_id)
376        if cascade_delete_destination:
377            self.permanently_delete_destination(destination=connection.destination_id)
378
379    # List sources, destinations, and connections
380
381    def list_connections(
382        self,
383        name: str | None = None,
384        *,
385        name_filter: Callable | None = None,
386    ) -> list[CloudConnection]:
387        """List connections by name in the workspace.
388
389        TODO: Add pagination support
390        """
391        connections = api_util.list_connections(
392            api_root=self.api_root,
393            workspace_id=self.workspace_id,
394            name=name,
395            name_filter=name_filter,
396            client_id=self.client_id,
397            client_secret=self.client_secret,
398        )
399        return [
400            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
401                workspace=self,
402                connection_response=connection,
403            )
404            for connection in connections
405            if name is None or connection.name == name
406        ]
407
408    def list_sources(
409        self,
410        name: str | None = None,
411        *,
412        name_filter: Callable | None = None,
413    ) -> list[CloudSource]:
414        """List all sources in the workspace.
415
416        TODO: Add pagination support
417        """
418        sources = api_util.list_sources(
419            api_root=self.api_root,
420            workspace_id=self.workspace_id,
421            name=name,
422            name_filter=name_filter,
423            client_id=self.client_id,
424            client_secret=self.client_secret,
425        )
426        return [
427            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
428                workspace=self,
429                source_response=source,
430            )
431            for source in sources
432            if name is None or source.name == name
433        ]
434
435    def list_destinations(
436        self,
437        name: str | None = None,
438        *,
439        name_filter: Callable | None = None,
440    ) -> list[CloudDestination]:
441        """List all destinations in the workspace.
442
443        TODO: Add pagination support
444        """
445        destinations = api_util.list_destinations(
446            api_root=self.api_root,
447            workspace_id=self.workspace_id,
448            name=name,
449            name_filter=name_filter,
450            client_id=self.client_id,
451            client_secret=self.client_secret,
452        )
453        return [
454            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
455                workspace=self,
456                destination_response=destination,
457            )
458            for destination in destinations
459            if name is None or destination.name == name
460        ]
461
462    def publish_custom_source_definition(
463        self,
464        name: str,
465        *,
466        manifest_yaml: dict[str, Any] | Path | str | None = None,
467        docker_image: str | None = None,
468        docker_tag: str | None = None,
469        unique: bool = True,
470        pre_validate: bool = True,
471    ) -> CustomCloudSourceDefinition:
472        """Publish a custom source connector definition.
473
474        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
475        and docker_tag (for Docker connectors), but not both.
476
477        Args:
478            name: Display name for the connector definition
479            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
480            docker_image: Docker repository (e.g., 'airbyte/source-custom')
481            docker_tag: Docker image tag (e.g., '1.0.0')
482            unique: Whether to enforce name uniqueness
483            pre_validate: Whether to validate manifest client-side (YAML only)
484
485        Returns:
486            CustomCloudSourceDefinition object representing the created definition
487
488        Raises:
489            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
490            AirbyteDuplicateResourcesError: If unique=True and name already exists
491        """
492        is_yaml = manifest_yaml is not None
493        is_docker = docker_image is not None
494
495        if is_yaml == is_docker:
496            raise exc.PyAirbyteInputError(
497                message=(
498                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
499                    "docker_image + docker_tag (for Docker connectors), but not both"
500                ),
501                context={
502                    "manifest_yaml_provided": is_yaml,
503                    "docker_image_provided": is_docker,
504                },
505            )
506
507        if is_docker and docker_tag is None:
508            raise exc.PyAirbyteInputError(
509                message="docker_tag is required when docker_image is specified",
510                context={"docker_image": docker_image},
511            )
512
513        if unique:
514            existing = self.list_custom_source_definitions(
515                definition_type="yaml" if is_yaml else "docker",
516            )
517            if any(d.name == name for d in existing):
518                raise exc.AirbyteDuplicateResourcesError(
519                    resource_type="custom_source_definition",
520                    resource_name=name,
521                )
522
523        if is_yaml:
524            manifest_dict: dict[str, Any]
525            if isinstance(manifest_yaml, Path):
526                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
527            elif isinstance(manifest_yaml, str):
528                manifest_dict = yaml.safe_load(manifest_yaml)
529            elif manifest_yaml is not None:
530                manifest_dict = manifest_yaml
531            else:
532                raise exc.PyAirbyteInputError(
533                    message="manifest_yaml is required for YAML connectors",
534                    context={"name": name},
535                )
536
537            if pre_validate:
538                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
539
540            result = api_util.create_custom_yaml_source_definition(
541                name=name,
542                workspace_id=self.workspace_id,
543                manifest=manifest_dict,
544                api_root=self.api_root,
545                client_id=self.client_id,
546                client_secret=self.client_secret,
547            )
548            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
549
550        raise NotImplementedError(
551            "Docker custom source definitions are not yet supported. "
552            "Only YAML manifest-based custom sources are currently available."
553        )
554
555    def list_custom_source_definitions(
556        self,
557        *,
558        definition_type: Literal["yaml", "docker"],
559    ) -> list[CustomCloudSourceDefinition]:
560        """List custom source connector definitions.
561
562        Args:
563            definition_type: Connector type to list ("yaml" or "docker"). Required.
564
565        Returns:
566            List of CustomCloudSourceDefinition objects matching the specified type
567        """
568        if definition_type == "yaml":
569            yaml_definitions = api_util.list_custom_yaml_source_definitions(
570                workspace_id=self.workspace_id,
571                api_root=self.api_root,
572                client_id=self.client_id,
573                client_secret=self.client_secret,
574            )
575            return [
576                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
577                for d in yaml_definitions
578            ]
579
580        raise NotImplementedError(
581            "Docker custom source definitions are not yet supported. "
582            "Only YAML manifest-based custom sources are currently available."
583        )
584
585    def get_custom_source_definition(
586        self,
587        definition_id: str,
588        *,
589        definition_type: Literal["yaml", "docker"],
590    ) -> CustomCloudSourceDefinition:
591        """Get a specific custom source definition by ID.
592
593        Args:
594            definition_id: The definition ID
595            definition_type: Connector type ("yaml" or "docker"). Required.
596
597        Returns:
598            CustomCloudSourceDefinition object
599        """
600        if definition_type == "yaml":
601            result = api_util.get_custom_yaml_source_definition(
602                workspace_id=self.workspace_id,
603                definition_id=definition_id,
604                api_root=self.api_root,
605                client_id=self.client_id,
606                client_secret=self.client_secret,
607            )
608            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
609
610        raise NotImplementedError(
611            "Docker custom source definitions are not yet supported. "
612            "Only YAML manifest-based custom sources are currently available."
613        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
82    @property
83    def workspace_url(self) -> str | None:
84        """The web URL of the workspace."""
85        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def connect(self) -> None:
 89    def connect(self) -> None:
 90        """Check that the workspace is reachable and raise an exception otherwise.
 91
 92        Note: It is not necessary to call this method before calling other operations. It
 93              serves primarily as a simple check to ensure that the workspace is reachable
 94              and credentials are correct.
 95        """
 96        _ = api_util.get_workspace(
 97            api_root=self.api_root,
 98            workspace_id=self.workspace_id,
 99            client_id=self.client_id,
100            client_secret=self.client_secret,
101        )
102        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
106    def get_connection(
107        self,
108        connection_id: str,
109    ) -> CloudConnection:
110        """Get a connection by ID.
111
112        This method does not fetch data from the API. It returns a `CloudConnection` object,
113        which will be loaded lazily as needed.
114        """
115        return CloudConnection(
116            workspace=self,
117            connection_id=connection_id,
118        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
120    def get_source(
121        self,
122        source_id: str,
123    ) -> CloudSource:
124        """Get a source by ID.
125
126        This method does not fetch data from the API. It returns a `CloudSource` object,
127        which will be loaded lazily as needed.
128        """
129        return CloudSource(
130            workspace=self,
131            connector_id=source_id,
132        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
134    def get_destination(
135        self,
136        destination_id: str,
137    ) -> CloudDestination:
138        """Get a destination by ID.
139
140        This method does not fetch data from the API. It returns a `CloudDestination` object,
141        which will be loaded lazily as needed.
142        """
143        return CloudDestination(
144            workspace=self,
145            connector_id=destination_id,
146        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
150    def deploy_source(
151        self,
152        name: str,
153        source: Source,
154        *,
155        unique: bool = True,
156        random_name_suffix: bool = False,
157    ) -> CloudSource:
158        """Deploy a source to the workspace.
159
160        Returns the newly deployed source.
161
162        Args:
163            name: The name to use when deploying.
164            source: The source object to deploy.
165            unique: Whether to require a unique name. If `True`, duplicate names
166                are not allowed. Defaults to `True`.
167            random_name_suffix: Whether to append a random suffix to the name.
168        """
169        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
170        source_config_dict["sourceType"] = source.name.replace("source-", "")
171
172        if random_name_suffix:
173            name += f" (ID: {text_util.generate_random_suffix()})"
174
175        if unique:
176            existing = self.list_sources(name=name)
177            if existing:
178                raise exc.AirbyteDuplicateResourcesError(
179                    resource_type="source",
180                    resource_name=name,
181                )
182
183        deployed_source = api_util.create_source(
184            name=name,
185            api_root=self.api_root,
186            workspace_id=self.workspace_id,
187            config=source_config_dict,
188            client_id=self.client_id,
189            client_secret=self.client_secret,
190        )
191        return CloudSource(
192            workspace=self,
193            connector_id=deployed_source.source_id,
194        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
196    def deploy_destination(
197        self,
198        name: str,
199        destination: Destination | dict[str, Any],
200        *,
201        unique: bool = True,
202        random_name_suffix: bool = False,
203    ) -> CloudDestination:
204        """Deploy a destination to the workspace.
205
206        Returns the newly deployed destination ID.
207
208        Args:
209            name: The name to use when deploying.
210            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
211                dictionary of configuration values.
212            unique: Whether to require a unique name. If `True`, duplicate names
213                are not allowed. Defaults to `True`.
214            random_name_suffix: Whether to append a random suffix to the name.
215        """
216        if isinstance(destination, Destination):
217            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
218            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
219            # raise ValueError(destination_conf_dict)
220        else:
221            destination_conf_dict = destination.copy()
222            if "destinationType" not in destination_conf_dict:
223                raise exc.PyAirbyteInputError(
224                    message="Missing `destinationType` in configuration dictionary.",
225                )
226
227        if random_name_suffix:
228            name += f" (ID: {text_util.generate_random_suffix()})"
229
230        if unique:
231            existing = self.list_destinations(name=name)
232            if existing:
233                raise exc.AirbyteDuplicateResourcesError(
234                    resource_type="destination",
235                    resource_name=name,
236                )
237
238        deployed_destination = api_util.create_destination(
239            name=name,
240            api_root=self.api_root,
241            workspace_id=self.workspace_id,
242            config=destination_conf_dict,  # Wants a dataclass but accepts dict
243            client_id=self.client_id,
244            client_secret=self.client_secret,
245        )
246        return CloudDestination(
247            workspace=self,
248            connector_id=deployed_destination.destination_id,
249        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
251    def permanently_delete_source(
252        self,
253        source: str | CloudSource,
254    ) -> None:
255        """Delete a source from the workspace.
256
257        You can pass either the source ID `str` or a deployed `Source` object.
258        """
259        if not isinstance(source, (str, CloudSource)):
260            raise exc.PyAirbyteInputError(
261                message="Invalid source type.",
262                input_value=type(source).__name__,
263            )
264
265        api_util.delete_source(
266            source_id=source.connector_id if isinstance(source, CloudSource) else source,
267            api_root=self.api_root,
268            client_id=self.client_id,
269            client_secret=self.client_secret,
270        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
274    def permanently_delete_destination(
275        self,
276        destination: str | CloudDestination,
277    ) -> None:
278        """Delete a deployed destination from the workspace.
279
280        You can pass either the `Cache` class or the deployed destination ID as a `str`.
281        """
282        if not isinstance(destination, (str, CloudDestination)):
283            raise exc.PyAirbyteInputError(
284                message="Invalid destination type.",
285                input_value=type(destination).__name__,
286            )
287
288        api_util.delete_destination(
289            destination_id=(
290                destination if isinstance(destination, str) else destination.destination_id
291            ),
292            api_root=self.api_root,
293            client_id=self.client_id,
294            client_secret=self.client_secret,
295        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
299    def deploy_connection(
300        self,
301        connection_name: str,
302        *,
303        source: CloudSource | str,
304        selected_streams: list[str],
305        destination: CloudDestination | str,
306        table_prefix: str | None = None,
307    ) -> CloudConnection:
308        """Create a new connection between an already deployed source and destination.
309
310        Returns the newly deployed connection object.
311
312        Args:
313            connection_name: The name of the connection.
314            source: The deployed source. You can pass a source ID or a CloudSource object.
315            destination: The deployed destination. You can pass a destination ID or a
316                CloudDestination object.
317            table_prefix: Optional. The table prefix to use when syncing to the destination.
318            selected_streams: The selected stream names to sync within the connection.
319        """
320        if not selected_streams:
321            raise exc.PyAirbyteInputError(
322                guidance="You must provide `selected_streams` when creating a connection."
323            )
324
325        source_id: str = source if isinstance(source, str) else source.connector_id
326        destination_id: str = (
327            destination if isinstance(destination, str) else destination.connector_id
328        )
329
330        deployed_connection = api_util.create_connection(
331            name=connection_name,
332            source_id=source_id,
333            destination_id=destination_id,
334            api_root=self.api_root,
335            workspace_id=self.workspace_id,
336            selected_stream_names=selected_streams,
337            prefix=table_prefix or "",
338            client_id=self.client_id,
339            client_secret=self.client_secret,
340        )
341
342        return CloudConnection(
343            workspace=self,
344            connection_id=deployed_connection.connection_id,
345            source=deployed_connection.source_id,
346            destination=deployed_connection.destination_id,
347        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
349    def permanently_delete_connection(
350        self,
351        connection: str | CloudConnection,
352        *,
353        cascade_delete_source: bool = False,
354        cascade_delete_destination: bool = False,
355    ) -> None:
356        """Delete a deployed connection from the workspace."""
357        if connection is None:
358            raise ValueError("No connection ID provided.")
359
360        if isinstance(connection, str):
361            connection = CloudConnection(
362                workspace=self,
363                connection_id=connection,
364            )
365
366        api_util.delete_connection(
367            connection_id=connection.connection_id,
368            api_root=self.api_root,
369            workspace_id=self.workspace_id,
370            client_id=self.client_id,
371            client_secret=self.client_secret,
372        )
373
374        if cascade_delete_source:
375            self.permanently_delete_source(source=connection.source_id)
376        if cascade_delete_destination:
377            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.CloudConnection]:
381    def list_connections(
382        self,
383        name: str | None = None,
384        *,
385        name_filter: Callable | None = None,
386    ) -> list[CloudConnection]:
387        """List connections by name in the workspace.
388
389        TODO: Add pagination support
390        """
391        connections = api_util.list_connections(
392            api_root=self.api_root,
393            workspace_id=self.workspace_id,
394            name=name,
395            name_filter=name_filter,
396            client_id=self.client_id,
397            client_secret=self.client_secret,
398        )
399        return [
400            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
401                workspace=self,
402                connection_response=connection,
403            )
404            for connection in connections
405            if name is None or connection.name == name
406        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
408    def list_sources(
409        self,
410        name: str | None = None,
411        *,
412        name_filter: Callable | None = None,
413    ) -> list[CloudSource]:
414        """List all sources in the workspace.
415
416        TODO: Add pagination support
417        """
418        sources = api_util.list_sources(
419            api_root=self.api_root,
420            workspace_id=self.workspace_id,
421            name=name,
422            name_filter=name_filter,
423            client_id=self.client_id,
424            client_secret=self.client_secret,
425        )
426        return [
427            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
428                workspace=self,
429                source_response=source,
430            )
431            for source in sources
432            if name is None or source.name == name
433        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
435    def list_destinations(
436        self,
437        name: str | None = None,
438        *,
439        name_filter: Callable | None = None,
440    ) -> list[CloudDestination]:
441        """List all destinations in the workspace.
442
443        TODO: Add pagination support
444        """
445        destinations = api_util.list_destinations(
446            api_root=self.api_root,
447            workspace_id=self.workspace_id,
448            name=name,
449            name_filter=name_filter,
450            client_id=self.client_id,
451            client_secret=self.client_secret,
452        )
453        return [
454            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
455                workspace=self,
456                destination_response=destination,
457            )
458            for destination in destinations
459            if name is None or destination.name == name
460        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
462    def publish_custom_source_definition(
463        self,
464        name: str,
465        *,
466        manifest_yaml: dict[str, Any] | Path | str | None = None,
467        docker_image: str | None = None,
468        docker_tag: str | None = None,
469        unique: bool = True,
470        pre_validate: bool = True,
471    ) -> CustomCloudSourceDefinition:
472        """Publish a custom source connector definition.
473
474        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
475        and docker_tag (for Docker connectors), but not both.
476
477        Args:
478            name: Display name for the connector definition
479            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
480            docker_image: Docker repository (e.g., 'airbyte/source-custom')
481            docker_tag: Docker image tag (e.g., '1.0.0')
482            unique: Whether to enforce name uniqueness
483            pre_validate: Whether to validate manifest client-side (YAML only)
484
485        Returns:
486            CustomCloudSourceDefinition object representing the created definition
487
488        Raises:
489            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
490            AirbyteDuplicateResourcesError: If unique=True and name already exists
491        """
492        is_yaml = manifest_yaml is not None
493        is_docker = docker_image is not None
494
495        if is_yaml == is_docker:
496            raise exc.PyAirbyteInputError(
497                message=(
498                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
499                    "docker_image + docker_tag (for Docker connectors), but not both"
500                ),
501                context={
502                    "manifest_yaml_provided": is_yaml,
503                    "docker_image_provided": is_docker,
504                },
505            )
506
507        if is_docker and docker_tag is None:
508            raise exc.PyAirbyteInputError(
509                message="docker_tag is required when docker_image is specified",
510                context={"docker_image": docker_image},
511            )
512
513        if unique:
514            existing = self.list_custom_source_definitions(
515                definition_type="yaml" if is_yaml else "docker",
516            )
517            if any(d.name == name for d in existing):
518                raise exc.AirbyteDuplicateResourcesError(
519                    resource_type="custom_source_definition",
520                    resource_name=name,
521                )
522
523        if is_yaml:
524            manifest_dict: dict[str, Any]
525            if isinstance(manifest_yaml, Path):
526                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
527            elif isinstance(manifest_yaml, str):
528                manifest_dict = yaml.safe_load(manifest_yaml)
529            elif manifest_yaml is not None:
530                manifest_dict = manifest_yaml
531            else:
532                raise exc.PyAirbyteInputError(
533                    message="manifest_yaml is required for YAML connectors",
534                    context={"name": name},
535                )
536
537            if pre_validate:
538                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
539
540            result = api_util.create_custom_yaml_source_definition(
541                name=name,
542                workspace_id=self.workspace_id,
543                manifest=manifest_dict,
544                api_root=self.api_root,
545                client_id=self.client_id,
546                client_secret=self.client_secret,
547            )
548            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
549
550        raise NotImplementedError(
551            "Docker custom source definitions are not yet supported. "
552            "Only YAML manifest-based custom sources are currently available."
553        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
555    def list_custom_source_definitions(
556        self,
557        *,
558        definition_type: Literal["yaml", "docker"],
559    ) -> list[CustomCloudSourceDefinition]:
560        """List custom source connector definitions.
561
562        Args:
563            definition_type: Connector type to list ("yaml" or "docker"). Required.
564
565        Returns:
566            List of CustomCloudSourceDefinition objects matching the specified type
567        """
568        if definition_type == "yaml":
569            yaml_definitions = api_util.list_custom_yaml_source_definitions(
570                workspace_id=self.workspace_id,
571                api_root=self.api_root,
572                client_id=self.client_id,
573                client_secret=self.client_secret,
574            )
575            return [
576                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
577                for d in yaml_definitions
578            ]
579
580        raise NotImplementedError(
581            "Docker custom source definitions are not yet supported. "
582            "Only YAML manifest-based custom sources are currently available."
583        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
585    def get_custom_source_definition(
586        self,
587        definition_id: str,
588        *,
589        definition_type: Literal["yaml", "docker"],
590    ) -> CustomCloudSourceDefinition:
591        """Get a specific custom source definition by ID.
592
593        Args:
594            definition_id: The definition ID
595            definition_type: Connector type ("yaml" or "docker"). Required.
596
597        Returns:
598            CustomCloudSourceDefinition object
599        """
600        if definition_type == "yaml":
601            result = api_util.get_custom_yaml_source_definition(
602                workspace_id=self.workspace_id,
603                definition_id=definition_id,
604                api_root=self.api_root,
605                client_id=self.client_id,
606                client_secret=self.client_secret,
607            )
608            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
609
610        raise NotImplementedError(
611            "Docker custom source definitions are not yet supported. "
612            "Only YAML manifest-based custom sources are currently available."
613        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object