airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

Usage Examples

Get a new workspace object and deploy a source to it:

import airbyte as ab
from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)

# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)

# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)

# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6
  7## Usage Examples
  8
  9Get a new workspace object and deploy a source to it:
 10
 11```python
 12import airbyte as ab
 13from airbyte import cloud
 14
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="...",
 17    client_id="...",
 18    client_secret="...",
 19)
 20
 21# Deploy a source to the workspace
 22source = ab.get_source("source-faker", config={"count": 100})
 23deployed_source = workspace.deploy_source(
 24    name="test-source",
 25    source=source,
 26)
 27
 28# Run a check on the deployed source and raise an exception if the check fails
 29check_result = deployed_source.check(raise_on_error=True)
 30
 31# Permanently delete the newly-created source
 32workspace.permanently_delete_source(deployed_source)
 33```
 34"""
 35
 36from __future__ import annotations
 37
 38from dataclasses import dataclass
 39from typing import TYPE_CHECKING, Any
 40
 41from airbyte import exceptions as exc
 42from airbyte._util import api_util, text_util
 43from airbyte.cloud.connections import CloudConnection
 44from airbyte.cloud.connectors import CloudDestination, CloudSource
 45from airbyte.destinations.base import Destination
 46from airbyte.secrets.base import SecretString
 47
 48
 49if TYPE_CHECKING:
 50    from collections.abc import Callable
 51
 52    from airbyte.sources.base import Source
 53
 54
 55@dataclass
 56class CloudWorkspace:
 57    """A remote workspace on the Airbyte Cloud.
 58
 59    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 60    instances, both OSS and Enterprise.
 61    """
 62
 63    workspace_id: str
 64    client_id: SecretString
 65    client_secret: SecretString
 66    api_root: str = api_util.CLOUD_API_ROOT
 67
 68    def __post_init__(self) -> None:
 69        """Ensure that the client ID and secret are handled securely."""
 70        self.client_id = SecretString(self.client_id)
 71        self.client_secret = SecretString(self.client_secret)
 72
 73    @property
 74    def workspace_url(self) -> str | None:
 75        """The URL of the workspace."""
 76        return f"{self.api_root}/workspaces/{self.workspace_id}"
 77
 78    # Test connection and creds
 79
 80    def connect(self) -> None:
 81        """Check that the workspace is reachable and raise an exception otherwise.
 82
 83        Note: It is not necessary to call this method before calling other operations. It
 84              serves primarily as a simple check to ensure that the workspace is reachable
 85              and credentials are correct.
 86        """
 87        _ = api_util.get_workspace(
 88            api_root=self.api_root,
 89            workspace_id=self.workspace_id,
 90            client_id=self.client_id,
 91            client_secret=self.client_secret,
 92        )
 93        print(f"Successfully connected to workspace: {self.workspace_url}")
 94
 95    # Get sources, destinations, and connections
 96
 97    def get_connection(
 98        self,
 99        connection_id: str,
100    ) -> CloudConnection:
101        """Get a connection by ID.
102
103        This method does not fetch data from the API. It returns a `CloudConnection` object,
104        which will be loaded lazily as needed.
105        """
106        return CloudConnection(
107            workspace=self,
108            connection_id=connection_id,
109        )
110
111    def get_source(
112        self,
113        source_id: str,
114    ) -> CloudSource:
115        """Get a source by ID.
116
117        This method does not fetch data from the API. It returns a `CloudSource` object,
118        which will be loaded lazily as needed.
119        """
120        return CloudSource(
121            workspace=self,
122            connector_id=source_id,
123        )
124
125    def get_destination(
126        self,
127        destination_id: str,
128    ) -> CloudDestination:
129        """Get a destination by ID.
130
131        This method does not fetch data from the API. It returns a `CloudDestination` object,
132        which will be loaded lazily as needed.
133        """
134        return CloudDestination(
135            workspace=self,
136            connector_id=destination_id,
137        )
138
139    # Deploy sources and destinations
140
141    def deploy_source(
142        self,
143        name: str,
144        source: Source,
145        *,
146        unique: bool = True,
147        random_name_suffix: bool = False,
148    ) -> CloudSource:
149        """Deploy a source to the workspace.
150
151        Returns the newly deployed source.
152
153        Args:
154            name: The name to use when deploying.
155            source: The source object to deploy.
156            unique: Whether to require a unique name. If `True`, duplicate names
157                are not allowed. Defaults to `True`.
158            random_name_suffix: Whether to append a random suffix to the name.
159        """
160        source_config_dict = source.get_config().copy()
161        source_config_dict["sourceType"] = source.name.replace("source-", "")
162
163        if random_name_suffix:
164            name += f" (ID: {text_util.generate_random_suffix()})"
165
166        if unique:
167            existing = self.list_sources(name=name)
168            if existing:
169                raise exc.AirbyteDuplicateResourcesError(
170                    resource_type="source",
171                    resource_name=name,
172                )
173
174        deployed_source = api_util.create_source(
175            name=name,
176            api_root=self.api_root,
177            workspace_id=self.workspace_id,
178            config=source_config_dict,
179            client_id=self.client_id,
180            client_secret=self.client_secret,
181        )
182        return CloudSource(
183            workspace=self,
184            connector_id=deployed_source.source_id,
185        )
186
187    def deploy_destination(
188        self,
189        name: str,
190        destination: Destination | dict[str, Any],
191        *,
192        unique: bool = True,
193        random_name_suffix: bool = False,
194    ) -> CloudDestination:
195        """Deploy a destination to the workspace.
196
197        Returns the newly deployed destination ID.
198
199        Args:
200            name: The name to use when deploying.
201            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
202                dictionary of configuration values.
203            unique: Whether to require a unique name. If `True`, duplicate names
204                are not allowed. Defaults to `True`.
205            random_name_suffix: Whether to append a random suffix to the name.
206        """
207        if isinstance(destination, Destination):
208            destination_conf_dict = destination.get_config().copy()
209            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
210            # raise ValueError(destination_conf_dict)
211        else:
212            destination_conf_dict = destination.copy()
213            if "destinationType" not in destination_conf_dict:
214                raise exc.PyAirbyteInputError(
215                    message="Missing `destinationType` in configuration dictionary.",
216                )
217
218        if random_name_suffix:
219            name += f" (ID: {text_util.generate_random_suffix()})"
220
221        if unique:
222            existing = self.list_destinations(name=name)
223            if existing:
224                raise exc.AirbyteDuplicateResourcesError(
225                    resource_type="destination",
226                    resource_name=name,
227                )
228
229        deployed_destination = api_util.create_destination(
230            name=name,
231            api_root=self.api_root,
232            workspace_id=self.workspace_id,
233            config=destination_conf_dict,  # Wants a dataclass but accepts dict
234            client_id=self.client_id,
235            client_secret=self.client_secret,
236        )
237        return CloudDestination(
238            workspace=self,
239            connector_id=deployed_destination.destination_id,
240        )
241
242    def permanently_delete_source(
243        self,
244        source: str | CloudSource,
245    ) -> None:
246        """Delete a source from the workspace.
247
248        You can pass either the source ID `str` or a deployed `Source` object.
249        """
250        if not isinstance(source, (str, CloudSource)):
251            raise exc.PyAirbyteInputError(
252                message="Invalid source type.",
253                input_value=type(source).__name__,
254            )
255
256        api_util.delete_source(
257            source_id=source.connector_id if isinstance(source, CloudSource) else source,
258            api_root=self.api_root,
259            client_id=self.client_id,
260            client_secret=self.client_secret,
261        )
262
263    # Deploy and delete destinations
264
265    def permanently_delete_destination(
266        self,
267        destination: str | CloudDestination,
268    ) -> None:
269        """Delete a deployed destination from the workspace.
270
271        You can pass either the `Cache` class or the deployed destination ID as a `str`.
272        """
273        if not isinstance(destination, (str, CloudDestination)):
274            raise exc.PyAirbyteInputError(
275                message="Invalid destination type.",
276                input_value=type(destination).__name__,
277            )
278
279        api_util.delete_destination(
280            destination_id=(
281                destination if isinstance(destination, str) else destination.destination_id
282            ),
283            api_root=self.api_root,
284            client_id=self.client_id,
285            client_secret=self.client_secret,
286        )
287
288    # Deploy and delete connections
289
290    def deploy_connection(
291        self,
292        connection_name: str,
293        *,
294        source: CloudSource | str,
295        selected_streams: list[str],
296        destination: CloudDestination | str,
297        table_prefix: str | None = None,
298    ) -> CloudConnection:
299        """Create a new connection between an already deployed source and destination.
300
301        Returns the newly deployed connection object.
302
303        Args:
304            connection_name: The name of the connection.
305            source: The deployed source. You can pass a source ID or a CloudSource object.
306            destination: The deployed destination. You can pass a destination ID or a
307                CloudDestination object.
308            table_prefix: Optional. The table prefix to use when syncing to the destination.
309            selected_streams: The selected stream names to sync within the connection.
310        """
311        if not selected_streams:
312            raise exc.PyAirbyteInputError(
313                guidance="You must provide `selected_streams` when creating a connection."
314            )
315
316        source_id: str = source if isinstance(source, str) else source.connector_id
317        destination_id: str = (
318            destination if isinstance(destination, str) else destination.connector_id
319        )
320
321        deployed_connection = api_util.create_connection(
322            name=connection_name,
323            source_id=source_id,
324            destination_id=destination_id,
325            api_root=self.api_root,
326            workspace_id=self.workspace_id,
327            selected_stream_names=selected_streams,
328            prefix=table_prefix or "",
329            client_id=self.client_id,
330            client_secret=self.client_secret,
331        )
332
333        return CloudConnection(
334            workspace=self,
335            connection_id=deployed_connection.connection_id,
336            source=deployed_connection.source_id,
337            destination=deployed_connection.destination_id,
338        )
339
340    def permanently_delete_connection(
341        self,
342        connection: str | CloudConnection,
343        *,
344        cascade_delete_source: bool = False,
345        cascade_delete_destination: bool = False,
346    ) -> None:
347        """Delete a deployed connection from the workspace."""
348        if connection is None:
349            raise ValueError("No connection ID provided.")
350
351        if isinstance(connection, str):
352            connection = CloudConnection(
353                workspace=self,
354                connection_id=connection,
355            )
356
357        api_util.delete_connection(
358            connection_id=connection.connection_id,
359            api_root=self.api_root,
360            workspace_id=self.workspace_id,
361            client_id=self.client_id,
362            client_secret=self.client_secret,
363        )
364
365        if cascade_delete_source:
366            self.permanently_delete_source(source=connection.source_id)
367        if cascade_delete_destination:
368            self.permanently_delete_destination(destination=connection.destination_id)
369
370    # List sources, destinations, and connections
371
372    def list_connections(
373        self,
374        name: str | None = None,
375        *,
376        name_filter: Callable | None = None,
377    ) -> list[CloudConnection]:
378        """List connections by name in the workspace."""
379        connections = api_util.list_connections(
380            api_root=self.api_root,
381            workspace_id=self.workspace_id,
382            name=name,
383            name_filter=name_filter,
384            client_id=self.client_id,
385            client_secret=self.client_secret,
386        )
387        return [
388            CloudConnection(
389                workspace=self,
390                connection_id=connection.connection_id,
391                source=None,
392                destination=None,
393            )
394            for connection in connections
395            if name is None or connection.name == name
396        ]
397
398    def list_sources(
399        self,
400        name: str | None = None,
401        *,
402        name_filter: Callable | None = None,
403    ) -> list[CloudSource]:
404        """List all sources in the workspace."""
405        sources = api_util.list_sources(
406            api_root=self.api_root,
407            workspace_id=self.workspace_id,
408            name=name,
409            name_filter=name_filter,
410            client_id=self.client_id,
411            client_secret=self.client_secret,
412        )
413        return [
414            CloudSource(
415                workspace=self,
416                connector_id=source.source_id,
417            )
418            for source in sources
419            if name is None or source.name == name
420        ]
421
422    def list_destinations(
423        self,
424        name: str | None = None,
425        *,
426        name_filter: Callable | None = None,
427    ) -> list[CloudDestination]:
428        """List all destinations in the workspace."""
429        destinations = api_util.list_destinations(
430            api_root=self.api_root,
431            workspace_id=self.workspace_id,
432            name=name,
433            name_filter=name_filter,
434            client_id=self.client_id,
435            client_secret=self.client_secret,
436        )
437        return [
438            CloudDestination(
439                workspace=self,
440                connector_id=destination.destination_id,
441            )
442            for destination in destinations
443            if name is None or destination.name == name
444        ]
@dataclass
class CloudWorkspace:
 56@dataclass
 57class CloudWorkspace:
 58    """A remote workspace on the Airbyte Cloud.
 59
 60    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 61    instances, both OSS and Enterprise.
 62    """
 63
 64    workspace_id: str
 65    client_id: SecretString
 66    client_secret: SecretString
 67    api_root: str = api_util.CLOUD_API_ROOT
 68
 69    def __post_init__(self) -> None:
 70        """Ensure that the client ID and secret are handled securely."""
 71        self.client_id = SecretString(self.client_id)
 72        self.client_secret = SecretString(self.client_secret)
 73
 74    @property
 75    def workspace_url(self) -> str | None:
 76        """The URL of the workspace."""
 77        return f"{self.api_root}/workspaces/{self.workspace_id}"
 78
 79    # Test connection and creds
 80
 81    def connect(self) -> None:
 82        """Check that the workspace is reachable and raise an exception otherwise.
 83
 84        Note: It is not necessary to call this method before calling other operations. It
 85              serves primarily as a simple check to ensure that the workspace is reachable
 86              and credentials are correct.
 87        """
 88        _ = api_util.get_workspace(
 89            api_root=self.api_root,
 90            workspace_id=self.workspace_id,
 91            client_id=self.client_id,
 92            client_secret=self.client_secret,
 93        )
 94        print(f"Successfully connected to workspace: {self.workspace_url}")
 95
 96    # Get sources, destinations, and connections
 97
 98    def get_connection(
 99        self,
100        connection_id: str,
101    ) -> CloudConnection:
102        """Get a connection by ID.
103
104        This method does not fetch data from the API. It returns a `CloudConnection` object,
105        which will be loaded lazily as needed.
106        """
107        return CloudConnection(
108            workspace=self,
109            connection_id=connection_id,
110        )
111
112    def get_source(
113        self,
114        source_id: str,
115    ) -> CloudSource:
116        """Get a source by ID.
117
118        This method does not fetch data from the API. It returns a `CloudSource` object,
119        which will be loaded lazily as needed.
120        """
121        return CloudSource(
122            workspace=self,
123            connector_id=source_id,
124        )
125
126    def get_destination(
127        self,
128        destination_id: str,
129    ) -> CloudDestination:
130        """Get a destination by ID.
131
132        This method does not fetch data from the API. It returns a `CloudDestination` object,
133        which will be loaded lazily as needed.
134        """
135        return CloudDestination(
136            workspace=self,
137            connector_id=destination_id,
138        )
139
140    # Deploy sources and destinations
141
142    def deploy_source(
143        self,
144        name: str,
145        source: Source,
146        *,
147        unique: bool = True,
148        random_name_suffix: bool = False,
149    ) -> CloudSource:
150        """Deploy a source to the workspace.
151
152        Returns the newly deployed source.
153
154        Args:
155            name: The name to use when deploying.
156            source: The source object to deploy.
157            unique: Whether to require a unique name. If `True`, duplicate names
158                are not allowed. Defaults to `True`.
159            random_name_suffix: Whether to append a random suffix to the name.
160        """
161        source_config_dict = source.get_config().copy()
162        source_config_dict["sourceType"] = source.name.replace("source-", "")
163
164        if random_name_suffix:
165            name += f" (ID: {text_util.generate_random_suffix()})"
166
167        if unique:
168            existing = self.list_sources(name=name)
169            if existing:
170                raise exc.AirbyteDuplicateResourcesError(
171                    resource_type="source",
172                    resource_name=name,
173                )
174
175        deployed_source = api_util.create_source(
176            name=name,
177            api_root=self.api_root,
178            workspace_id=self.workspace_id,
179            config=source_config_dict,
180            client_id=self.client_id,
181            client_secret=self.client_secret,
182        )
183        return CloudSource(
184            workspace=self,
185            connector_id=deployed_source.source_id,
186        )
187
188    def deploy_destination(
189        self,
190        name: str,
191        destination: Destination | dict[str, Any],
192        *,
193        unique: bool = True,
194        random_name_suffix: bool = False,
195    ) -> CloudDestination:
196        """Deploy a destination to the workspace.
197
198        Returns the newly deployed destination ID.
199
200        Args:
201            name: The name to use when deploying.
202            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
203                dictionary of configuration values.
204            unique: Whether to require a unique name. If `True`, duplicate names
205                are not allowed. Defaults to `True`.
206            random_name_suffix: Whether to append a random suffix to the name.
207        """
208        if isinstance(destination, Destination):
209            destination_conf_dict = destination.get_config().copy()
210            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
211            # raise ValueError(destination_conf_dict)
212        else:
213            destination_conf_dict = destination.copy()
214            if "destinationType" not in destination_conf_dict:
215                raise exc.PyAirbyteInputError(
216                    message="Missing `destinationType` in configuration dictionary.",
217                )
218
219        if random_name_suffix:
220            name += f" (ID: {text_util.generate_random_suffix()})"
221
222        if unique:
223            existing = self.list_destinations(name=name)
224            if existing:
225                raise exc.AirbyteDuplicateResourcesError(
226                    resource_type="destination",
227                    resource_name=name,
228                )
229
230        deployed_destination = api_util.create_destination(
231            name=name,
232            api_root=self.api_root,
233            workspace_id=self.workspace_id,
234            config=destination_conf_dict,  # Wants a dataclass but accepts dict
235            client_id=self.client_id,
236            client_secret=self.client_secret,
237        )
238        return CloudDestination(
239            workspace=self,
240            connector_id=deployed_destination.destination_id,
241        )
242
243    def permanently_delete_source(
244        self,
245        source: str | CloudSource,
246    ) -> None:
247        """Delete a source from the workspace.
248
249        You can pass either the source ID `str` or a deployed `Source` object.
250        """
251        if not isinstance(source, (str, CloudSource)):
252            raise exc.PyAirbyteInputError(
253                message="Invalid source type.",
254                input_value=type(source).__name__,
255            )
256
257        api_util.delete_source(
258            source_id=source.connector_id if isinstance(source, CloudSource) else source,
259            api_root=self.api_root,
260            client_id=self.client_id,
261            client_secret=self.client_secret,
262        )
263
264    # Deploy and delete destinations
265
266    def permanently_delete_destination(
267        self,
268        destination: str | CloudDestination,
269    ) -> None:
270        """Delete a deployed destination from the workspace.
271
272        You can pass either the `Cache` class or the deployed destination ID as a `str`.
273        """
274        if not isinstance(destination, (str, CloudDestination)):
275            raise exc.PyAirbyteInputError(
276                message="Invalid destination type.",
277                input_value=type(destination).__name__,
278            )
279
280        api_util.delete_destination(
281            destination_id=(
282                destination if isinstance(destination, str) else destination.destination_id
283            ),
284            api_root=self.api_root,
285            client_id=self.client_id,
286            client_secret=self.client_secret,
287        )
288
289    # Deploy and delete connections
290
291    def deploy_connection(
292        self,
293        connection_name: str,
294        *,
295        source: CloudSource | str,
296        selected_streams: list[str],
297        destination: CloudDestination | str,
298        table_prefix: str | None = None,
299    ) -> CloudConnection:
300        """Create a new connection between an already deployed source and destination.
301
302        Returns the newly deployed connection object.
303
304        Args:
305            connection_name: The name of the connection.
306            source: The deployed source. You can pass a source ID or a CloudSource object.
307            destination: The deployed destination. You can pass a destination ID or a
308                CloudDestination object.
309            table_prefix: Optional. The table prefix to use when syncing to the destination.
310            selected_streams: The selected stream names to sync within the connection.
311        """
312        if not selected_streams:
313            raise exc.PyAirbyteInputError(
314                guidance="You must provide `selected_streams` when creating a connection."
315            )
316
317        source_id: str = source if isinstance(source, str) else source.connector_id
318        destination_id: str = (
319            destination if isinstance(destination, str) else destination.connector_id
320        )
321
322        deployed_connection = api_util.create_connection(
323            name=connection_name,
324            source_id=source_id,
325            destination_id=destination_id,
326            api_root=self.api_root,
327            workspace_id=self.workspace_id,
328            selected_stream_names=selected_streams,
329            prefix=table_prefix or "",
330            client_id=self.client_id,
331            client_secret=self.client_secret,
332        )
333
334        return CloudConnection(
335            workspace=self,
336            connection_id=deployed_connection.connection_id,
337            source=deployed_connection.source_id,
338            destination=deployed_connection.destination_id,
339        )
340
341    def permanently_delete_connection(
342        self,
343        connection: str | CloudConnection,
344        *,
345        cascade_delete_source: bool = False,
346        cascade_delete_destination: bool = False,
347    ) -> None:
348        """Delete a deployed connection from the workspace."""
349        if connection is None:
350            raise ValueError("No connection ID provided.")
351
352        if isinstance(connection, str):
353            connection = CloudConnection(
354                workspace=self,
355                connection_id=connection,
356            )
357
358        api_util.delete_connection(
359            connection_id=connection.connection_id,
360            api_root=self.api_root,
361            workspace_id=self.workspace_id,
362            client_id=self.client_id,
363            client_secret=self.client_secret,
364        )
365
366        if cascade_delete_source:
367            self.permanently_delete_source(source=connection.source_id)
368        if cascade_delete_destination:
369            self.permanently_delete_destination(destination=connection.destination_id)
370
371    # List sources, destinations, and connections
372
373    def list_connections(
374        self,
375        name: str | None = None,
376        *,
377        name_filter: Callable | None = None,
378    ) -> list[CloudConnection]:
379        """List connections by name in the workspace."""
380        connections = api_util.list_connections(
381            api_root=self.api_root,
382            workspace_id=self.workspace_id,
383            name=name,
384            name_filter=name_filter,
385            client_id=self.client_id,
386            client_secret=self.client_secret,
387        )
388        return [
389            CloudConnection(
390                workspace=self,
391                connection_id=connection.connection_id,
392                source=None,
393                destination=None,
394            )
395            for connection in connections
396            if name is None or connection.name == name
397        ]
398
399    def list_sources(
400        self,
401        name: str | None = None,
402        *,
403        name_filter: Callable | None = None,
404    ) -> list[CloudSource]:
405        """List all sources in the workspace."""
406        sources = api_util.list_sources(
407            api_root=self.api_root,
408            workspace_id=self.workspace_id,
409            name=name,
410            name_filter=name_filter,
411            client_id=self.client_id,
412            client_secret=self.client_secret,
413        )
414        return [
415            CloudSource(
416                workspace=self,
417                connector_id=source.source_id,
418            )
419            for source in sources
420            if name is None or source.name == name
421        ]
422
423    def list_destinations(
424        self,
425        name: str | None = None,
426        *,
427        name_filter: Callable | None = None,
428    ) -> list[CloudDestination]:
429        """List all destinations in the workspace."""
430        destinations = api_util.list_destinations(
431            api_root=self.api_root,
432            workspace_id=self.workspace_id,
433            name=name,
434            name_filter=name_filter,
435            client_id=self.client_id,
436            client_secret=self.client_secret,
437        )
438        return [
439            CloudDestination(
440                workspace=self,
441                connector_id=destination.destination_id,
442            )
443            for destination in destinations
444            if name is None or destination.name == name
445        ]

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
74    @property
75    def workspace_url(self) -> str | None:
76        """The URL of the workspace."""
77        return f"{self.api_root}/workspaces/{self.workspace_id}"

The URL of the workspace.

def connect(self) -> None:
81    def connect(self) -> None:
82        """Check that the workspace is reachable and raise an exception otherwise.
83
84        Note: It is not necessary to call this method before calling other operations. It
85              serves primarily as a simple check to ensure that the workspace is reachable
86              and credentials are correct.
87        """
88        _ = api_util.get_workspace(
89            api_root=self.api_root,
90            workspace_id=self.workspace_id,
91            client_id=self.client_id,
92            client_secret=self.client_secret,
93        )
94        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
 98    def get_connection(
 99        self,
100        connection_id: str,
101    ) -> CloudConnection:
102        """Get a connection by ID.
103
104        This method does not fetch data from the API. It returns a `CloudConnection` object,
105        which will be loaded lazily as needed.
106        """
107        return CloudConnection(
108            workspace=self,
109            connection_id=connection_id,
110        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
112    def get_source(
113        self,
114        source_id: str,
115    ) -> CloudSource:
116        """Get a source by ID.
117
118        This method does not fetch data from the API. It returns a `CloudSource` object,
119        which will be loaded lazily as needed.
120        """
121        return CloudSource(
122            workspace=self,
123            connector_id=source_id,
124        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
126    def get_destination(
127        self,
128        destination_id: str,
129    ) -> CloudDestination:
130        """Get a destination by ID.
131
132        This method does not fetch data from the API. It returns a `CloudDestination` object,
133        which will be loaded lazily as needed.
134        """
135        return CloudDestination(
136            workspace=self,
137            connector_id=destination_id,
138        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
142    def deploy_source(
143        self,
144        name: str,
145        source: Source,
146        *,
147        unique: bool = True,
148        random_name_suffix: bool = False,
149    ) -> CloudSource:
150        """Deploy a source to the workspace.
151
152        Returns the newly deployed source.
153
154        Args:
155            name: The name to use when deploying.
156            source: The source object to deploy.
157            unique: Whether to require a unique name. If `True`, duplicate names
158                are not allowed. Defaults to `True`.
159            random_name_suffix: Whether to append a random suffix to the name.
160        """
161        source_config_dict = source.get_config().copy()
162        source_config_dict["sourceType"] = source.name.replace("source-", "")
163
164        if random_name_suffix:
165            name += f" (ID: {text_util.generate_random_suffix()})"
166
167        if unique:
168            existing = self.list_sources(name=name)
169            if existing:
170                raise exc.AirbyteDuplicateResourcesError(
171                    resource_type="source",
172                    resource_name=name,
173                )
174
175        deployed_source = api_util.create_source(
176            name=name,
177            api_root=self.api_root,
178            workspace_id=self.workspace_id,
179            config=source_config_dict,
180            client_id=self.client_id,
181            client_secret=self.client_secret,
182        )
183        return CloudSource(
184            workspace=self,
185            connector_id=deployed_source.source_id,
186        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
188    def deploy_destination(
189        self,
190        name: str,
191        destination: Destination | dict[str, Any],
192        *,
193        unique: bool = True,
194        random_name_suffix: bool = False,
195    ) -> CloudDestination:
196        """Deploy a destination to the workspace.
197
198        Returns the newly deployed destination ID.
199
200        Args:
201            name: The name to use when deploying.
202            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
203                dictionary of configuration values.
204            unique: Whether to require a unique name. If `True`, duplicate names
205                are not allowed. Defaults to `True`.
206            random_name_suffix: Whether to append a random suffix to the name.
207        """
208        if isinstance(destination, Destination):
209            destination_conf_dict = destination.get_config().copy()
210            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
211            # raise ValueError(destination_conf_dict)
212        else:
213            destination_conf_dict = destination.copy()
214            if "destinationType" not in destination_conf_dict:
215                raise exc.PyAirbyteInputError(
216                    message="Missing `destinationType` in configuration dictionary.",
217                )
218
219        if random_name_suffix:
220            name += f" (ID: {text_util.generate_random_suffix()})"
221
222        if unique:
223            existing = self.list_destinations(name=name)
224            if existing:
225                raise exc.AirbyteDuplicateResourcesError(
226                    resource_type="destination",
227                    resource_name=name,
228                )
229
230        deployed_destination = api_util.create_destination(
231            name=name,
232            api_root=self.api_root,
233            workspace_id=self.workspace_id,
234            config=destination_conf_dict,  # Wants a dataclass but accepts dict
235            client_id=self.client_id,
236            client_secret=self.client_secret,
237        )
238        return CloudDestination(
239            workspace=self,
240            connector_id=deployed_destination.destination_id,
241        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
243    def permanently_delete_source(
244        self,
245        source: str | CloudSource,
246    ) -> None:
247        """Delete a source from the workspace.
248
249        You can pass either the source ID `str` or a deployed `Source` object.
250        """
251        if not isinstance(source, (str, CloudSource)):
252            raise exc.PyAirbyteInputError(
253                message="Invalid source type.",
254                input_value=type(source).__name__,
255            )
256
257        api_util.delete_source(
258            source_id=source.connector_id if isinstance(source, CloudSource) else source,
259            api_root=self.api_root,
260            client_id=self.client_id,
261            client_secret=self.client_secret,
262        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
266    def permanently_delete_destination(
267        self,
268        destination: str | CloudDestination,
269    ) -> None:
270        """Delete a deployed destination from the workspace.
271
272        You can pass either the `Cache` class or the deployed destination ID as a `str`.
273        """
274        if not isinstance(destination, (str, CloudDestination)):
275            raise exc.PyAirbyteInputError(
276                message="Invalid destination type.",
277                input_value=type(destination).__name__,
278            )
279
280        api_util.delete_destination(
281            destination_id=(
282                destination if isinstance(destination, str) else destination.destination_id
283            ),
284            api_root=self.api_root,
285            client_id=self.client_id,
286            client_secret=self.client_secret,
287        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> airbyte.cloud.CloudConnection:
291    def deploy_connection(
292        self,
293        connection_name: str,
294        *,
295        source: CloudSource | str,
296        selected_streams: list[str],
297        destination: CloudDestination | str,
298        table_prefix: str | None = None,
299    ) -> CloudConnection:
300        """Create a new connection between an already deployed source and destination.
301
302        Returns the newly deployed connection object.
303
304        Args:
305            connection_name: The name of the connection.
306            source: The deployed source. You can pass a source ID or a CloudSource object.
307            destination: The deployed destination. You can pass a destination ID or a
308                CloudDestination object.
309            table_prefix: Optional. The table prefix to use when syncing to the destination.
310            selected_streams: The selected stream names to sync within the connection.
311        """
312        if not selected_streams:
313            raise exc.PyAirbyteInputError(
314                guidance="You must provide `selected_streams` when creating a connection."
315            )
316
317        source_id: str = source if isinstance(source, str) else source.connector_id
318        destination_id: str = (
319            destination if isinstance(destination, str) else destination.connector_id
320        )
321
322        deployed_connection = api_util.create_connection(
323            name=connection_name,
324            source_id=source_id,
325            destination_id=destination_id,
326            api_root=self.api_root,
327            workspace_id=self.workspace_id,
328            selected_stream_names=selected_streams,
329            prefix=table_prefix or "",
330            client_id=self.client_id,
331            client_secret=self.client_secret,
332        )
333
334        return CloudConnection(
335            workspace=self,
336            connection_id=deployed_connection.connection_id,
337            source=deployed_connection.source_id,
338            destination=deployed_connection.destination_id,
339        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
341    def permanently_delete_connection(
342        self,
343        connection: str | CloudConnection,
344        *,
345        cascade_delete_source: bool = False,
346        cascade_delete_destination: bool = False,
347    ) -> None:
348        """Delete a deployed connection from the workspace."""
349        if connection is None:
350            raise ValueError("No connection ID provided.")
351
352        if isinstance(connection, str):
353            connection = CloudConnection(
354                workspace=self,
355                connection_id=connection,
356            )
357
358        api_util.delete_connection(
359            connection_id=connection.connection_id,
360            api_root=self.api_root,
361            workspace_id=self.workspace_id,
362            client_id=self.client_id,
363            client_secret=self.client_secret,
364        )
365
366        if cascade_delete_source:
367            self.permanently_delete_source(source=connection.source_id)
368        if cascade_delete_destination:
369            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.CloudConnection]:
373    def list_connections(
374        self,
375        name: str | None = None,
376        *,
377        name_filter: Callable | None = None,
378    ) -> list[CloudConnection]:
379        """List connections by name in the workspace."""
380        connections = api_util.list_connections(
381            api_root=self.api_root,
382            workspace_id=self.workspace_id,
383            name=name,
384            name_filter=name_filter,
385            client_id=self.client_id,
386            client_secret=self.client_secret,
387        )
388        return [
389            CloudConnection(
390                workspace=self,
391                connection_id=connection.connection_id,
392                source=None,
393                destination=None,
394            )
395            for connection in connections
396            if name is None or connection.name == name
397        ]

List connections by name in the workspace.

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
399    def list_sources(
400        self,
401        name: str | None = None,
402        *,
403        name_filter: Callable | None = None,
404    ) -> list[CloudSource]:
405        """List all sources in the workspace."""
406        sources = api_util.list_sources(
407            api_root=self.api_root,
408            workspace_id=self.workspace_id,
409            name=name,
410            name_filter=name_filter,
411            client_id=self.client_id,
412            client_secret=self.client_secret,
413        )
414        return [
415            CloudSource(
416                workspace=self,
417                connector_id=source.source_id,
418            )
419            for source in sources
420            if name is None or source.name == name
421        ]

List all sources in the workspace.

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
423    def list_destinations(
424        self,
425        name: str | None = None,
426        *,
427        name_filter: Callable | None = None,
428    ) -> list[CloudDestination]:
429        """List all destinations in the workspace."""
430        destinations = api_util.list_destinations(
431            api_root=self.api_root,
432            workspace_id=self.workspace_id,
433            name=name,
434            name_filter=name_filter,
435            client_id=self.client_id,
436            client_secret=self.client_secret,
437        )
438        return [
439            CloudDestination(
440                workspace=self,
441                connector_id=destination.destination_id,
442            )
443            for destination in destinations
444            if name is None or destination.name == name
445        ]

List all destinations in the workspace.