airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6
  7## Usage Examples
  8
  9Get a new workspace object and deploy a source to it:
 10
 11```python
 12import airbyte as ab
 13from airbyte import cloud
 14
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="...",
 17    client_id="...",
 18    client_secret="...",
 19)
 20
 21# Deploy a source to the workspace
 22source = ab.get_source("source-faker", config={"count": 100})
 23deployed_source = workspace.deploy_source(
 24    name="test-source",
 25    source=source,
 26)
 27
 28# Run a check on the deployed source and raise an exception if the check fails
 29check_result = deployed_source.check(raise_on_error=True)
 30
 31# Permanently delete the newly-created source
 32workspace.permanently_delete_source(deployed_source)
 33```
 34"""
 35
 36from __future__ import annotations
 37
 38from dataclasses import dataclass
 39from typing import TYPE_CHECKING, Any
 40
 41from airbyte import exceptions as exc
 42from airbyte._util import api_util, text_util
 43from airbyte._util.api_util import get_web_url_root
 44from airbyte.cloud.connections import CloudConnection
 45from airbyte.cloud.connectors import CloudDestination, CloudSource
 46from airbyte.destinations.base import Destination
 47from airbyte.secrets.base import SecretString
 48
 49
 50if TYPE_CHECKING:
 51    from collections.abc import Callable
 52
 53    from airbyte.sources.base import Source
 54
 55
 56@dataclass
 57class CloudWorkspace:
 58    """A remote workspace on the Airbyte Cloud.
 59
 60    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 61    instances, both OSS and Enterprise.
 62    """
 63
 64    workspace_id: str
 65    client_id: SecretString
 66    client_secret: SecretString
 67    api_root: str = api_util.CLOUD_API_ROOT
 68
 69    def __post_init__(self) -> None:
 70        """Ensure that the client ID and secret are handled securely."""
 71        self.client_id = SecretString(self.client_id)
 72        self.client_secret = SecretString(self.client_secret)
 73
 74    @property
 75    def workspace_url(self) -> str | None:
 76        """The web URL of the workspace."""
 77        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 78
 79    # Test connection and creds
 80
 81    def connect(self) -> None:
 82        """Check that the workspace is reachable and raise an exception otherwise.
 83
 84        Note: It is not necessary to call this method before calling other operations. It
 85              serves primarily as a simple check to ensure that the workspace is reachable
 86              and credentials are correct.
 87        """
 88        _ = api_util.get_workspace(
 89            api_root=self.api_root,
 90            workspace_id=self.workspace_id,
 91            client_id=self.client_id,
 92            client_secret=self.client_secret,
 93        )
 94        print(f"Successfully connected to workspace: {self.workspace_url}")
 95
 96    # Get sources, destinations, and connections
 97
 98    def get_connection(
 99        self,
100        connection_id: str,
101    ) -> CloudConnection:
102        """Get a connection by ID.
103
104        This method does not fetch data from the API. It returns a `CloudConnection` object,
105        which will be loaded lazily as needed.
106        """
107        return CloudConnection(
108            workspace=self,
109            connection_id=connection_id,
110        )
111
112    def get_source(
113        self,
114        source_id: str,
115    ) -> CloudSource:
116        """Get a source by ID.
117
118        This method does not fetch data from the API. It returns a `CloudSource` object,
119        which will be loaded lazily as needed.
120        """
121        return CloudSource(
122            workspace=self,
123            connector_id=source_id,
124        )
125
126    def get_destination(
127        self,
128        destination_id: str,
129    ) -> CloudDestination:
130        """Get a destination by ID.
131
132        This method does not fetch data from the API. It returns a `CloudDestination` object,
133        which will be loaded lazily as needed.
134        """
135        return CloudDestination(
136            workspace=self,
137            connector_id=destination_id,
138        )
139
140    # Deploy sources and destinations
141
142    def deploy_source(
143        self,
144        name: str,
145        source: Source,
146        *,
147        unique: bool = True,
148        random_name_suffix: bool = False,
149    ) -> CloudSource:
150        """Deploy a source to the workspace.
151
152        Returns the newly deployed source.
153
154        Args:
155            name: The name to use when deploying.
156            source: The source object to deploy.
157            unique: Whether to require a unique name. If `True`, duplicate names
158                are not allowed. Defaults to `True`.
159            random_name_suffix: Whether to append a random suffix to the name.
160        """
161        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
162        source_config_dict["sourceType"] = source.name.replace("source-", "")
163
164        if random_name_suffix:
165            name += f" (ID: {text_util.generate_random_suffix()})"
166
167        if unique:
168            existing = self.list_sources(name=name)
169            if existing:
170                raise exc.AirbyteDuplicateResourcesError(
171                    resource_type="source",
172                    resource_name=name,
173                )
174
175        deployed_source = api_util.create_source(
176            name=name,
177            api_root=self.api_root,
178            workspace_id=self.workspace_id,
179            config=source_config_dict,
180            client_id=self.client_id,
181            client_secret=self.client_secret,
182        )
183        return CloudSource(
184            workspace=self,
185            connector_id=deployed_source.source_id,
186        )
187
188    def deploy_destination(
189        self,
190        name: str,
191        destination: Destination | dict[str, Any],
192        *,
193        unique: bool = True,
194        random_name_suffix: bool = False,
195    ) -> CloudDestination:
196        """Deploy a destination to the workspace.
197
198        Returns the newly deployed destination ID.
199
200        Args:
201            name: The name to use when deploying.
202            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
203                dictionary of configuration values.
204            unique: Whether to require a unique name. If `True`, duplicate names
205                are not allowed. Defaults to `True`.
206            random_name_suffix: Whether to append a random suffix to the name.
207        """
208        if isinstance(destination, Destination):
209            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
210            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
211            # raise ValueError(destination_conf_dict)
212        else:
213            destination_conf_dict = destination.copy()
214            if "destinationType" not in destination_conf_dict:
215                raise exc.PyAirbyteInputError(
216                    message="Missing `destinationType` in configuration dictionary.",
217                )
218
219        if random_name_suffix:
220            name += f" (ID: {text_util.generate_random_suffix()})"
221
222        if unique:
223            existing = self.list_destinations(name=name)
224            if existing:
225                raise exc.AirbyteDuplicateResourcesError(
226                    resource_type="destination",
227                    resource_name=name,
228                )
229
230        deployed_destination = api_util.create_destination(
231            name=name,
232            api_root=self.api_root,
233            workspace_id=self.workspace_id,
234            config=destination_conf_dict,  # Wants a dataclass but accepts dict
235            client_id=self.client_id,
236            client_secret=self.client_secret,
237        )
238        return CloudDestination(
239            workspace=self,
240            connector_id=deployed_destination.destination_id,
241        )
242
243    def permanently_delete_source(
244        self,
245        source: str | CloudSource,
246    ) -> None:
247        """Delete a source from the workspace.
248
249        You can pass either the source ID `str` or a deployed `Source` object.
250        """
251        if not isinstance(source, (str, CloudSource)):
252            raise exc.PyAirbyteInputError(
253                message="Invalid source type.",
254                input_value=type(source).__name__,
255            )
256
257        api_util.delete_source(
258            source_id=source.connector_id if isinstance(source, CloudSource) else source,
259            api_root=self.api_root,
260            client_id=self.client_id,
261            client_secret=self.client_secret,
262        )
263
264    # Deploy and delete destinations
265
266    def permanently_delete_destination(
267        self,
268        destination: str | CloudDestination,
269    ) -> None:
270        """Delete a deployed destination from the workspace.
271
272        You can pass either the `Cache` class or the deployed destination ID as a `str`.
273        """
274        if not isinstance(destination, (str, CloudDestination)):
275            raise exc.PyAirbyteInputError(
276                message="Invalid destination type.",
277                input_value=type(destination).__name__,
278            )
279
280        api_util.delete_destination(
281            destination_id=(
282                destination if isinstance(destination, str) else destination.destination_id
283            ),
284            api_root=self.api_root,
285            client_id=self.client_id,
286            client_secret=self.client_secret,
287        )
288
289    # Deploy and delete connections
290
291    def deploy_connection(
292        self,
293        connection_name: str,
294        *,
295        source: CloudSource | str,
296        selected_streams: list[str],
297        destination: CloudDestination | str,
298        table_prefix: str | None = None,
299    ) -> CloudConnection:
300        """Create a new connection between an already deployed source and destination.
301
302        Returns the newly deployed connection object.
303
304        Args:
305            connection_name: The name of the connection.
306            source: The deployed source. You can pass a source ID or a CloudSource object.
307            destination: The deployed destination. You can pass a destination ID or a
308                CloudDestination object.
309            table_prefix: Optional. The table prefix to use when syncing to the destination.
310            selected_streams: The selected stream names to sync within the connection.
311        """
312        if not selected_streams:
313            raise exc.PyAirbyteInputError(
314                guidance="You must provide `selected_streams` when creating a connection."
315            )
316
317        source_id: str = source if isinstance(source, str) else source.connector_id
318        destination_id: str = (
319            destination if isinstance(destination, str) else destination.connector_id
320        )
321
322        deployed_connection = api_util.create_connection(
323            name=connection_name,
324            source_id=source_id,
325            destination_id=destination_id,
326            api_root=self.api_root,
327            workspace_id=self.workspace_id,
328            selected_stream_names=selected_streams,
329            prefix=table_prefix or "",
330            client_id=self.client_id,
331            client_secret=self.client_secret,
332        )
333
334        return CloudConnection(
335            workspace=self,
336            connection_id=deployed_connection.connection_id,
337            source=deployed_connection.source_id,
338            destination=deployed_connection.destination_id,
339        )
340
341    def permanently_delete_connection(
342        self,
343        connection: str | CloudConnection,
344        *,
345        cascade_delete_source: bool = False,
346        cascade_delete_destination: bool = False,
347    ) -> None:
348        """Delete a deployed connection from the workspace."""
349        if connection is None:
350            raise ValueError("No connection ID provided.")
351
352        if isinstance(connection, str):
353            connection = CloudConnection(
354                workspace=self,
355                connection_id=connection,
356            )
357
358        api_util.delete_connection(
359            connection_id=connection.connection_id,
360            api_root=self.api_root,
361            workspace_id=self.workspace_id,
362            client_id=self.client_id,
363            client_secret=self.client_secret,
364        )
365
366        if cascade_delete_source:
367            self.permanently_delete_source(source=connection.source_id)
368        if cascade_delete_destination:
369            self.permanently_delete_destination(destination=connection.destination_id)
370
371    # List sources, destinations, and connections
372
373    def list_connections(
374        self,
375        name: str | None = None,
376        *,
377        name_filter: Callable | None = None,
378    ) -> list[CloudConnection]:
379        """List connections by name in the workspace.
380
381        TODO: Add pagination support
382        """
383        connections = api_util.list_connections(
384            api_root=self.api_root,
385            workspace_id=self.workspace_id,
386            name=name,
387            name_filter=name_filter,
388            client_id=self.client_id,
389            client_secret=self.client_secret,
390        )
391        return [
392            CloudConnection(
393                workspace=self,
394                connection_id=connection.connection_id,
395                source=None,
396                destination=None,
397            )
398            for connection in connections
399            if name is None or connection.name == name
400        ]
401
402    def list_sources(
403        self,
404        name: str | None = None,
405        *,
406        name_filter: Callable | None = None,
407    ) -> list[CloudSource]:
408        """List all sources in the workspace.
409
410        TODO: Add pagination support
411        """
412        sources = api_util.list_sources(
413            api_root=self.api_root,
414            workspace_id=self.workspace_id,
415            name=name,
416            name_filter=name_filter,
417            client_id=self.client_id,
418            client_secret=self.client_secret,
419        )
420        return [
421            CloudSource(
422                workspace=self,
423                connector_id=source.source_id,
424            )
425            for source in sources
426            if name is None or source.name == name
427        ]
428
429    def list_destinations(
430        self,
431        name: str | None = None,
432        *,
433        name_filter: Callable | None = None,
434    ) -> list[CloudDestination]:
435        """List all destinations in the workspace.
436
437        TODO: Add pagination support
438        """
439        destinations = api_util.list_destinations(
440            api_root=self.api_root,
441            workspace_id=self.workspace_id,
442            name=name,
443            name_filter=name_filter,
444            client_id=self.client_id,
445            client_secret=self.client_secret,
446        )
447        return [
448            CloudDestination(
449                workspace=self,
450                connector_id=destination.destination_id,
451            )
452            for destination in destinations
453            if name is None or destination.name == name
454        ]
@dataclass
class CloudWorkspace:
 57@dataclass
 58class CloudWorkspace:
 59    """A remote workspace on the Airbyte Cloud.
 60
 61    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 62    instances, both OSS and Enterprise.
 63    """
 64
 65    workspace_id: str
 66    client_id: SecretString
 67    client_secret: SecretString
 68    api_root: str = api_util.CLOUD_API_ROOT
 69
 70    def __post_init__(self) -> None:
 71        """Ensure that the client ID and secret are handled securely."""
 72        self.client_id = SecretString(self.client_id)
 73        self.client_secret = SecretString(self.client_secret)
 74
 75    @property
 76    def workspace_url(self) -> str | None:
 77        """The web URL of the workspace."""
 78        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 79
 80    # Test connection and creds
 81
 82    def connect(self) -> None:
 83        """Check that the workspace is reachable and raise an exception otherwise.
 84
 85        Note: It is not necessary to call this method before calling other operations. It
 86              serves primarily as a simple check to ensure that the workspace is reachable
 87              and credentials are correct.
 88        """
 89        _ = api_util.get_workspace(
 90            api_root=self.api_root,
 91            workspace_id=self.workspace_id,
 92            client_id=self.client_id,
 93            client_secret=self.client_secret,
 94        )
 95        print(f"Successfully connected to workspace: {self.workspace_url}")
 96
 97    # Get sources, destinations, and connections
 98
 99    def get_connection(
100        self,
101        connection_id: str,
102    ) -> CloudConnection:
103        """Get a connection by ID.
104
105        This method does not fetch data from the API. It returns a `CloudConnection` object,
106        which will be loaded lazily as needed.
107        """
108        return CloudConnection(
109            workspace=self,
110            connection_id=connection_id,
111        )
112
113    def get_source(
114        self,
115        source_id: str,
116    ) -> CloudSource:
117        """Get a source by ID.
118
119        This method does not fetch data from the API. It returns a `CloudSource` object,
120        which will be loaded lazily as needed.
121        """
122        return CloudSource(
123            workspace=self,
124            connector_id=source_id,
125        )
126
127    def get_destination(
128        self,
129        destination_id: str,
130    ) -> CloudDestination:
131        """Get a destination by ID.
132
133        This method does not fetch data from the API. It returns a `CloudDestination` object,
134        which will be loaded lazily as needed.
135        """
136        return CloudDestination(
137            workspace=self,
138            connector_id=destination_id,
139        )
140
141    # Deploy sources and destinations
142
143    def deploy_source(
144        self,
145        name: str,
146        source: Source,
147        *,
148        unique: bool = True,
149        random_name_suffix: bool = False,
150    ) -> CloudSource:
151        """Deploy a source to the workspace.
152
153        Returns the newly deployed source.
154
155        Args:
156            name: The name to use when deploying.
157            source: The source object to deploy.
158            unique: Whether to require a unique name. If `True`, duplicate names
159                are not allowed. Defaults to `True`.
160            random_name_suffix: Whether to append a random suffix to the name.
161        """
162        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
163        source_config_dict["sourceType"] = source.name.replace("source-", "")
164
165        if random_name_suffix:
166            name += f" (ID: {text_util.generate_random_suffix()})"
167
168        if unique:
169            existing = self.list_sources(name=name)
170            if existing:
171                raise exc.AirbyteDuplicateResourcesError(
172                    resource_type="source",
173                    resource_name=name,
174                )
175
176        deployed_source = api_util.create_source(
177            name=name,
178            api_root=self.api_root,
179            workspace_id=self.workspace_id,
180            config=source_config_dict,
181            client_id=self.client_id,
182            client_secret=self.client_secret,
183        )
184        return CloudSource(
185            workspace=self,
186            connector_id=deployed_source.source_id,
187        )
188
189    def deploy_destination(
190        self,
191        name: str,
192        destination: Destination | dict[str, Any],
193        *,
194        unique: bool = True,
195        random_name_suffix: bool = False,
196    ) -> CloudDestination:
197        """Deploy a destination to the workspace.
198
199        Returns the newly deployed destination ID.
200
201        Args:
202            name: The name to use when deploying.
203            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
204                dictionary of configuration values.
205            unique: Whether to require a unique name. If `True`, duplicate names
206                are not allowed. Defaults to `True`.
207            random_name_suffix: Whether to append a random suffix to the name.
208        """
209        if isinstance(destination, Destination):
210            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
211            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
212            # raise ValueError(destination_conf_dict)
213        else:
214            destination_conf_dict = destination.copy()
215            if "destinationType" not in destination_conf_dict:
216                raise exc.PyAirbyteInputError(
217                    message="Missing `destinationType` in configuration dictionary.",
218                )
219
220        if random_name_suffix:
221            name += f" (ID: {text_util.generate_random_suffix()})"
222
223        if unique:
224            existing = self.list_destinations(name=name)
225            if existing:
226                raise exc.AirbyteDuplicateResourcesError(
227                    resource_type="destination",
228                    resource_name=name,
229                )
230
231        deployed_destination = api_util.create_destination(
232            name=name,
233            api_root=self.api_root,
234            workspace_id=self.workspace_id,
235            config=destination_conf_dict,  # Wants a dataclass but accepts dict
236            client_id=self.client_id,
237            client_secret=self.client_secret,
238        )
239        return CloudDestination(
240            workspace=self,
241            connector_id=deployed_destination.destination_id,
242        )
243
244    def permanently_delete_source(
245        self,
246        source: str | CloudSource,
247    ) -> None:
248        """Delete a source from the workspace.
249
250        You can pass either the source ID `str` or a deployed `Source` object.
251        """
252        if not isinstance(source, (str, CloudSource)):
253            raise exc.PyAirbyteInputError(
254                message="Invalid source type.",
255                input_value=type(source).__name__,
256            )
257
258        api_util.delete_source(
259            source_id=source.connector_id if isinstance(source, CloudSource) else source,
260            api_root=self.api_root,
261            client_id=self.client_id,
262            client_secret=self.client_secret,
263        )
264
265    # Deploy and delete destinations
266
267    def permanently_delete_destination(
268        self,
269        destination: str | CloudDestination,
270    ) -> None:
271        """Delete a deployed destination from the workspace.
272
273        You can pass either the `Cache` class or the deployed destination ID as a `str`.
274        """
275        if not isinstance(destination, (str, CloudDestination)):
276            raise exc.PyAirbyteInputError(
277                message="Invalid destination type.",
278                input_value=type(destination).__name__,
279            )
280
281        api_util.delete_destination(
282            destination_id=(
283                destination if isinstance(destination, str) else destination.destination_id
284            ),
285            api_root=self.api_root,
286            client_id=self.client_id,
287            client_secret=self.client_secret,
288        )
289
290    # Deploy and delete connections
291
292    def deploy_connection(
293        self,
294        connection_name: str,
295        *,
296        source: CloudSource | str,
297        selected_streams: list[str],
298        destination: CloudDestination | str,
299        table_prefix: str | None = None,
300    ) -> CloudConnection:
301        """Create a new connection between an already deployed source and destination.
302
303        Returns the newly deployed connection object.
304
305        Args:
306            connection_name: The name of the connection.
307            source: The deployed source. You can pass a source ID or a CloudSource object.
308            destination: The deployed destination. You can pass a destination ID or a
309                CloudDestination object.
310            table_prefix: Optional. The table prefix to use when syncing to the destination.
311            selected_streams: The selected stream names to sync within the connection.
312        """
313        if not selected_streams:
314            raise exc.PyAirbyteInputError(
315                guidance="You must provide `selected_streams` when creating a connection."
316            )
317
318        source_id: str = source if isinstance(source, str) else source.connector_id
319        destination_id: str = (
320            destination if isinstance(destination, str) else destination.connector_id
321        )
322
323        deployed_connection = api_util.create_connection(
324            name=connection_name,
325            source_id=source_id,
326            destination_id=destination_id,
327            api_root=self.api_root,
328            workspace_id=self.workspace_id,
329            selected_stream_names=selected_streams,
330            prefix=table_prefix or "",
331            client_id=self.client_id,
332            client_secret=self.client_secret,
333        )
334
335        return CloudConnection(
336            workspace=self,
337            connection_id=deployed_connection.connection_id,
338            source=deployed_connection.source_id,
339            destination=deployed_connection.destination_id,
340        )
341
342    def permanently_delete_connection(
343        self,
344        connection: str | CloudConnection,
345        *,
346        cascade_delete_source: bool = False,
347        cascade_delete_destination: bool = False,
348    ) -> None:
349        """Delete a deployed connection from the workspace."""
350        if connection is None:
351            raise ValueError("No connection ID provided.")
352
353        if isinstance(connection, str):
354            connection = CloudConnection(
355                workspace=self,
356                connection_id=connection,
357            )
358
359        api_util.delete_connection(
360            connection_id=connection.connection_id,
361            api_root=self.api_root,
362            workspace_id=self.workspace_id,
363            client_id=self.client_id,
364            client_secret=self.client_secret,
365        )
366
367        if cascade_delete_source:
368            self.permanently_delete_source(source=connection.source_id)
369        if cascade_delete_destination:
370            self.permanently_delete_destination(destination=connection.destination_id)
371
372    # List sources, destinations, and connections
373
374    def list_connections(
375        self,
376        name: str | None = None,
377        *,
378        name_filter: Callable | None = None,
379    ) -> list[CloudConnection]:
380        """List connections by name in the workspace.
381
382        TODO: Add pagination support
383        """
384        connections = api_util.list_connections(
385            api_root=self.api_root,
386            workspace_id=self.workspace_id,
387            name=name,
388            name_filter=name_filter,
389            client_id=self.client_id,
390            client_secret=self.client_secret,
391        )
392        return [
393            CloudConnection(
394                workspace=self,
395                connection_id=connection.connection_id,
396                source=None,
397                destination=None,
398            )
399            for connection in connections
400            if name is None or connection.name == name
401        ]
402
403    def list_sources(
404        self,
405        name: str | None = None,
406        *,
407        name_filter: Callable | None = None,
408    ) -> list[CloudSource]:
409        """List all sources in the workspace.
410
411        TODO: Add pagination support
412        """
413        sources = api_util.list_sources(
414            api_root=self.api_root,
415            workspace_id=self.workspace_id,
416            name=name,
417            name_filter=name_filter,
418            client_id=self.client_id,
419            client_secret=self.client_secret,
420        )
421        return [
422            CloudSource(
423                workspace=self,
424                connector_id=source.source_id,
425            )
426            for source in sources
427            if name is None or source.name == name
428        ]
429
430    def list_destinations(
431        self,
432        name: str | None = None,
433        *,
434        name_filter: Callable | None = None,
435    ) -> list[CloudDestination]:
436        """List all destinations in the workspace.
437
438        TODO: Add pagination support
439        """
440        destinations = api_util.list_destinations(
441            api_root=self.api_root,
442            workspace_id=self.workspace_id,
443            name=name,
444            name_filter=name_filter,
445            client_id=self.client_id,
446            client_secret=self.client_secret,
447        )
448        return [
449            CloudDestination(
450                workspace=self,
451                connector_id=destination.destination_id,
452            )
453            for destination in destinations
454            if name is None or destination.name == name
455        ]

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
75    @property
76    def workspace_url(self) -> str | None:
77        """The web URL of the workspace."""
78        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def connect(self) -> None:
82    def connect(self) -> None:
83        """Check that the workspace is reachable and raise an exception otherwise.
84
85        Note: It is not necessary to call this method before calling other operations. It
86              serves primarily as a simple check to ensure that the workspace is reachable
87              and credentials are correct.
88        """
89        _ = api_util.get_workspace(
90            api_root=self.api_root,
91            workspace_id=self.workspace_id,
92            client_id=self.client_id,
93            client_secret=self.client_secret,
94        )
95        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
 99    def get_connection(
100        self,
101        connection_id: str,
102    ) -> CloudConnection:
103        """Get a connection by ID.
104
105        This method does not fetch data from the API. It returns a `CloudConnection` object,
106        which will be loaded lazily as needed.
107        """
108        return CloudConnection(
109            workspace=self,
110            connection_id=connection_id,
111        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
113    def get_source(
114        self,
115        source_id: str,
116    ) -> CloudSource:
117        """Get a source by ID.
118
119        This method does not fetch data from the API. It returns a `CloudSource` object,
120        which will be loaded lazily as needed.
121        """
122        return CloudSource(
123            workspace=self,
124            connector_id=source_id,
125        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
127    def get_destination(
128        self,
129        destination_id: str,
130    ) -> CloudDestination:
131        """Get a destination by ID.
132
133        This method does not fetch data from the API. It returns a `CloudDestination` object,
134        which will be loaded lazily as needed.
135        """
136        return CloudDestination(
137            workspace=self,
138            connector_id=destination_id,
139        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
143    def deploy_source(
144        self,
145        name: str,
146        source: Source,
147        *,
148        unique: bool = True,
149        random_name_suffix: bool = False,
150    ) -> CloudSource:
151        """Deploy a source to the workspace.
152
153        Returns the newly deployed source.
154
155        Args:
156            name: The name to use when deploying.
157            source: The source object to deploy.
158            unique: Whether to require a unique name. If `True`, duplicate names
159                are not allowed. Defaults to `True`.
160            random_name_suffix: Whether to append a random suffix to the name.
161        """
162        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
163        source_config_dict["sourceType"] = source.name.replace("source-", "")
164
165        if random_name_suffix:
166            name += f" (ID: {text_util.generate_random_suffix()})"
167
168        if unique:
169            existing = self.list_sources(name=name)
170            if existing:
171                raise exc.AirbyteDuplicateResourcesError(
172                    resource_type="source",
173                    resource_name=name,
174                )
175
176        deployed_source = api_util.create_source(
177            name=name,
178            api_root=self.api_root,
179            workspace_id=self.workspace_id,
180            config=source_config_dict,
181            client_id=self.client_id,
182            client_secret=self.client_secret,
183        )
184        return CloudSource(
185            workspace=self,
186            connector_id=deployed_source.source_id,
187        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
189    def deploy_destination(
190        self,
191        name: str,
192        destination: Destination | dict[str, Any],
193        *,
194        unique: bool = True,
195        random_name_suffix: bool = False,
196    ) -> CloudDestination:
197        """Deploy a destination to the workspace.
198
199        Returns the newly deployed destination ID.
200
201        Args:
202            name: The name to use when deploying.
203            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
204                dictionary of configuration values.
205            unique: Whether to require a unique name. If `True`, duplicate names
206                are not allowed. Defaults to `True`.
207            random_name_suffix: Whether to append a random suffix to the name.
208        """
209        if isinstance(destination, Destination):
210            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
211            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
212            # raise ValueError(destination_conf_dict)
213        else:
214            destination_conf_dict = destination.copy()
215            if "destinationType" not in destination_conf_dict:
216                raise exc.PyAirbyteInputError(
217                    message="Missing `destinationType` in configuration dictionary.",
218                )
219
220        if random_name_suffix:
221            name += f" (ID: {text_util.generate_random_suffix()})"
222
223        if unique:
224            existing = self.list_destinations(name=name)
225            if existing:
226                raise exc.AirbyteDuplicateResourcesError(
227                    resource_type="destination",
228                    resource_name=name,
229                )
230
231        deployed_destination = api_util.create_destination(
232            name=name,
233            api_root=self.api_root,
234            workspace_id=self.workspace_id,
235            config=destination_conf_dict,  # Wants a dataclass but accepts dict
236            client_id=self.client_id,
237            client_secret=self.client_secret,
238        )
239        return CloudDestination(
240            workspace=self,
241            connector_id=deployed_destination.destination_id,
242        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
244    def permanently_delete_source(
245        self,
246        source: str | CloudSource,
247    ) -> None:
248        """Delete a source from the workspace.
249
250        You can pass either the source ID `str` or a deployed `Source` object.
251        """
252        if not isinstance(source, (str, CloudSource)):
253            raise exc.PyAirbyteInputError(
254                message="Invalid source type.",
255                input_value=type(source).__name__,
256            )
257
258        api_util.delete_source(
259            source_id=source.connector_id if isinstance(source, CloudSource) else source,
260            api_root=self.api_root,
261            client_id=self.client_id,
262            client_secret=self.client_secret,
263        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
267    def permanently_delete_destination(
268        self,
269        destination: str | CloudDestination,
270    ) -> None:
271        """Delete a deployed destination from the workspace.
272
273        You can pass either the `Cache` class or the deployed destination ID as a `str`.
274        """
275        if not isinstance(destination, (str, CloudDestination)):
276            raise exc.PyAirbyteInputError(
277                message="Invalid destination type.",
278                input_value=type(destination).__name__,
279            )
280
281        api_util.delete_destination(
282            destination_id=(
283                destination if isinstance(destination, str) else destination.destination_id
284            ),
285            api_root=self.api_root,
286            client_id=self.client_id,
287            client_secret=self.client_secret,
288        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
292    def deploy_connection(
293        self,
294        connection_name: str,
295        *,
296        source: CloudSource | str,
297        selected_streams: list[str],
298        destination: CloudDestination | str,
299        table_prefix: str | None = None,
300    ) -> CloudConnection:
301        """Create a new connection between an already deployed source and destination.
302
303        Returns the newly deployed connection object.
304
305        Args:
306            connection_name: The name of the connection.
307            source: The deployed source. You can pass a source ID or a CloudSource object.
308            destination: The deployed destination. You can pass a destination ID or a
309                CloudDestination object.
310            table_prefix: Optional. The table prefix to use when syncing to the destination.
311            selected_streams: The selected stream names to sync within the connection.
312        """
313        if not selected_streams:
314            raise exc.PyAirbyteInputError(
315                guidance="You must provide `selected_streams` when creating a connection."
316            )
317
318        source_id: str = source if isinstance(source, str) else source.connector_id
319        destination_id: str = (
320            destination if isinstance(destination, str) else destination.connector_id
321        )
322
323        deployed_connection = api_util.create_connection(
324            name=connection_name,
325            source_id=source_id,
326            destination_id=destination_id,
327            api_root=self.api_root,
328            workspace_id=self.workspace_id,
329            selected_stream_names=selected_streams,
330            prefix=table_prefix or "",
331            client_id=self.client_id,
332            client_secret=self.client_secret,
333        )
334
335        return CloudConnection(
336            workspace=self,
337            connection_id=deployed_connection.connection_id,
338            source=deployed_connection.source_id,
339            destination=deployed_connection.destination_id,
340        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
342    def permanently_delete_connection(
343        self,
344        connection: str | CloudConnection,
345        *,
346        cascade_delete_source: bool = False,
347        cascade_delete_destination: bool = False,
348    ) -> None:
349        """Delete a deployed connection from the workspace."""
350        if connection is None:
351            raise ValueError("No connection ID provided.")
352
353        if isinstance(connection, str):
354            connection = CloudConnection(
355                workspace=self,
356                connection_id=connection,
357            )
358
359        api_util.delete_connection(
360            connection_id=connection.connection_id,
361            api_root=self.api_root,
362            workspace_id=self.workspace_id,
363            client_id=self.client_id,
364            client_secret=self.client_secret,
365        )
366
367        if cascade_delete_source:
368            self.permanently_delete_source(source=connection.source_id)
369        if cascade_delete_destination:
370            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.CloudConnection]:
374    def list_connections(
375        self,
376        name: str | None = None,
377        *,
378        name_filter: Callable | None = None,
379    ) -> list[CloudConnection]:
380        """List connections by name in the workspace.
381
382        TODO: Add pagination support
383        """
384        connections = api_util.list_connections(
385            api_root=self.api_root,
386            workspace_id=self.workspace_id,
387            name=name,
388            name_filter=name_filter,
389            client_id=self.client_id,
390            client_secret=self.client_secret,
391        )
392        return [
393            CloudConnection(
394                workspace=self,
395                connection_id=connection.connection_id,
396                source=None,
397                destination=None,
398            )
399            for connection in connections
400            if name is None or connection.name == name
401        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
403    def list_sources(
404        self,
405        name: str | None = None,
406        *,
407        name_filter: Callable | None = None,
408    ) -> list[CloudSource]:
409        """List all sources in the workspace.
410
411        TODO: Add pagination support
412        """
413        sources = api_util.list_sources(
414            api_root=self.api_root,
415            workspace_id=self.workspace_id,
416            name=name,
417            name_filter=name_filter,
418            client_id=self.client_id,
419            client_secret=self.client_secret,
420        )
421        return [
422            CloudSource(
423                workspace=self,
424                connector_id=source.source_id,
425            )
426            for source in sources
427            if name is None or source.name == name
428        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
430    def list_destinations(
431        self,
432        name: str | None = None,
433        *,
434        name_filter: Callable | None = None,
435    ) -> list[CloudDestination]:
436        """List all destinations in the workspace.
437
438        TODO: Add pagination support
439        """
440        destinations = api_util.list_destinations(
441            api_root=self.api_root,
442            workspace_id=self.workspace_id,
443            name=name,
444            name_filter=name_filter,
445            client_id=self.client_id,
446            client_secret=self.client_secret,
447        )
448        return [
449            CloudDestination(
450                workspace=self,
451                connector_id=destination.destination_id,
452            )
453            for destination in destinations
454            if name is None or destination.name == name
455        ]

List all destinations in the workspace.

TODO: Add pagination support