airbyte.cloud.experimental

Experimental features for interacting with the Airbyte Cloud API.

You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These features are subject to change and may not be available in all environments. Future versions of PyAirbyte may remove or change these features without notice.

To use this module, replace an import like this:

from airbyte.cloud import CloudConnection, CloudWorkspace

with an import like this:

from airbyte.cloud.experimental import CloudConnection, CloudWorkspace

You can toggle between the stable and experimental versions of these classes by changing the import path. This allows you to test new features without requiring substantial changes to your codebase.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Experimental features for interacting with the Airbyte Cloud API.
 3
 4You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These
 5features are subject to change and may not be available in all environments. **Future versions of
 6PyAirbyte may remove or change these features without notice.**
 7
 8To use this module, replace an import like this:
 9
10```python
11from airbyte.cloud import CloudConnection, CloudWorkspace
12```
13
14with an import like this:
15
16```python
17from airbyte.cloud.experimental import CloudConnection, CloudWorkspace
18```
19
20You can toggle between the stable and experimental versions of these classes by changing the import
21path. This allows you to test new features without requiring substantial changes to your codebase.
22
23"""
24# ruff: noqa: SLF001  # This file accesses private members of other classes.
25
26from __future__ import annotations
27
28import warnings
29
30from airbyte import exceptions as exc
31from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection
32from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace
33
34
35# This module is not imported anywhere by default, so this warning should only print if the user
36# explicitly imports it.
37warnings.warn(
38    message="The `airbyte.cloud.experimental` module is experimental and may change in the future.",
39    category=exc.AirbyteExperimentalFeatureWarning,
40    stacklevel=2,
41)
42
43
44class CloudWorkspace(Stable_CloudWorkspace):  # noqa: D101  # Docstring inherited from parent.
45    __doc__ = (
46        f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
47    )
48    deploy_connection = Stable_CloudWorkspace._deploy_connection
49    deploy_source = Stable_CloudWorkspace._deploy_source
50    deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination
51    permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection
52    permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source
53    permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination
54
55
56class CloudConnection(Stable_CloudConnection):  # noqa: D101  # Docstring inherited from parent.
57    __doc__ = (
58        f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
59    )
60    permanently_delete = Stable_CloudConnection._permanently_delete
class CloudWorkspace(airbyte.cloud.workspaces.CloudWorkspace):
45class CloudWorkspace(Stable_CloudWorkspace):  # noqa: D101  # Docstring inherited from parent.
46    __doc__ = (
47        f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
48    )
49    deploy_connection = Stable_CloudWorkspace._deploy_connection
50    deploy_source = Stable_CloudWorkspace._deploy_source
51    deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination
52    permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection
53    permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source
54    permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination

Experimental implementation of .CloudWorkspace.

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
def deploy_connection( self, source: airbyte.Source | str, cache: airbyte.caches.CacheBase | None = None, destination: str | None = None, table_prefix: str | None = None, selected_streams: list[str] | None = None) -> airbyte.cloud.CloudConnection:
190    def _deploy_connection(
191        self,
192        source: Source | str,
193        cache: CacheBase | None = None,
194        destination: str | None = None,
195        table_prefix: str | None = None,
196        selected_streams: list[str] | None = None,
197    ) -> CloudConnection:
198        """Deploy a source and cache to the workspace as a new connection.
199
200        Returns the newly deployed connection ID as a `str`.
201
202        Args:
203            source (Source | str): The source to deploy. You can pass either an already deployed
204                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
205                it will be deployed automatically.
206            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
207                `cache` or `destination`, but not both.
208            destination (str, optional): The destination ID to use. You can provide
209                `cache` or `destination`, but not both.
210            table_prefix (str, optional): The table prefix to use for the cache. If not provided,
211                the cache's table prefix will be used.
212            selected_streams (list[str], optional): The selected stream names to use for the
213                connection. If not provided, the source's selected streams will be used.
214        """
215        # Resolve source ID
216        source_id: str
217        if isinstance(source, Source):
218            selected_streams = selected_streams or source.get_selected_streams()
219            source_id = (
220                source._deployed_source_id  # noqa: SLF001  # Access to non-public API
221                or self._deploy_source(source)
222            )
223        else:
224            source_id = source
225            if not selected_streams:
226                raise exc.PyAirbyteInputError(
227                    guidance="You must provide `selected_streams` when deploying a source ID."
228                )
229
230        # Resolve destination ID
231        destination_id: str
232        if destination:
233            destination_id = destination
234        elif cache:
235            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
236            if not cache._deployed_destination_id:  # noqa: SLF001
237                destination_id = self._deploy_cache_as_destination(cache)
238            else:
239                destination_id = cache._deployed_destination_id  # noqa: SLF001
240        else:
241            raise exc.PyAirbyteInputError(
242                guidance="You must provide either a destination ID or a cache object."
243            )
244
245        assert source_id is not None
246        assert destination_id is not None
247
248        deployed_connection = create_connection(
249            name="Connection (Deployed by PyAirbyte)",
250            source_id=source_id,
251            destination_id=destination_id,
252            api_root=self.api_root,
253            api_key=self.api_key,
254            workspace_id=self.workspace_id,
255            selected_stream_names=selected_streams,
256            prefix=table_prefix or "",
257        )
258
259        if isinstance(source, Source):
260            source._deployed_api_root = self.api_root  # noqa: SLF001
261            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
262            source._deployed_source_id = source_id  # noqa: SLF001
263        if cache:
264            cache._deployed_api_root = self.api_root  # noqa: SLF001
265            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
266            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
267
268        return CloudConnection(
269            workspace=self,
270            connection_id=deployed_connection.connection_id,
271            source=deployed_connection.source_id,
272            destination=deployed_connection.destination_id,
273        )

Deploy a source and cache to the workspace as a new connection.

Returns the newly deployed connection ID as a str.

Arguments:
  • source (Source | str): The source to deploy. You can pass either an already deployed source ID str or a PyAirbyte Source object. If you pass a Source object, it will be deployed automatically.
  • cache (CacheBase, optional): The cache to deploy as a new destination. You can provide cache or destination, but not both.
  • destination (str, optional): The destination ID to use. You can provide cache or destination, but not both.
  • table_prefix (str, optional): The table prefix to use for the cache. If not provided, the cache's table prefix will be used.
  • selected_streams (list[str], optional): The selected stream names to use for the connection. If not provided, the source's selected streams will be used.
def deploy_source(self, source: airbyte.Source) -> str:
73    def _deploy_source(
74        self,
75        source: Source,
76    ) -> str:
77        """Deploy a source to the workspace.
78
79        Returns the newly deployed source ID.
80        """
81        source_configuration = source.get_config().copy()
82        source_configuration["sourceType"] = source.name.replace("source-", "")
83
84        deployed_source = create_source(
85            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
86            api_root=self.api_root,
87            api_key=self.api_key,
88            workspace_id=self.workspace_id,
89            config=source_configuration,
90        )
91
92        # Set the deployment Ids on the source object
93        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
94        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
95        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
96
97        return deployed_source.source_id

Deploy a source to the workspace.

Returns the newly deployed source ID.

def deploy_cache_as_destination(self, cache: airbyte.caches.CacheBase) -> str:
129    def _deploy_cache_as_destination(
130        self,
131        cache: CacheBase,
132    ) -> str:
133        """Deploy a cache to the workspace as a new destination.
134
135        Returns the newly deployed destination ID.
136        """
137        cache_type_name = cache.__class__.__name__.replace("Cache", "")
138
139        deployed_destination: DestinationResponse = create_destination(
140            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
141            api_root=self.api_root,
142            api_key=self.api_key,
143            workspace_id=self.workspace_id,
144            config=get_destination_config_from_cache(cache),
145        )
146
147        # Set the deployment Ids on the source object
148        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
149        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
150        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
151
152        return deployed_destination.destination_id

Deploy a cache to the workspace as a new destination.

Returns the newly deployed destination ID.

def permanently_delete_connection( self, connection: str | airbyte.cloud.CloudConnection, *, delete_source: bool = False, delete_destination: bool = False) -> None:
289    def _permanently_delete_connection(
290        self,
291        connection: str | CloudConnection,
292        *,
293        delete_source: bool = False,
294        delete_destination: bool = False,
295    ) -> None:
296        """Delete a deployed connection from the workspace."""
297        if connection is None:
298            raise ValueError("No connection ID provided.")  # noqa: TRY003
299
300        if isinstance(connection, str):
301            connection = CloudConnection(
302                workspace=self,
303                connection_id=connection,
304            )
305
306        delete_connection(
307            connection_id=connection.connection_id,
308            api_root=self.api_root,
309            api_key=self.api_key,
310            workspace_id=self.workspace_id,
311        )
312        if delete_source:
313            self._permanently_delete_source(source=connection.source_id)
314
315        if delete_destination:
316            self._permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def permanently_delete_source(self, source: str | airbyte.Source) -> None:
 99    def _permanently_delete_source(
100        self,
101        source: str | Source,
102    ) -> None:
103        """Delete a source from the workspace.
104
105        You can pass either the source ID `str` or a deployed `Source` object.
106        """
107        if not isinstance(source, str | Source):
108            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
109
110        if isinstance(source, Source):
111            if not source._deployed_source_id:  # noqa: SLF001
112                raise ValueError("Source has not been deployed.")  # noqa: TRY003
113
114            source_id = source._deployed_source_id  # noqa: SLF001
115
116        elif isinstance(source, str):
117            source_id = source
118
119        delete_source(
120            source_id=source_id,
121            api_root=self.api_root,
122            api_key=self.api_key,
123        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, *, destination: str | None = None, cache: airbyte.caches.CacheBase | None = None) -> None:
154    def _permanently_delete_destination(
155        self,
156        *,
157        destination: str | None = None,
158        cache: CacheBase | None = None,
159    ) -> None:
160        """Delete a deployed destination from the workspace.
161
162        You can pass either the `Cache` class or the deployed destination ID as a `str`.
163        """
164        if destination is None and cache is None:
165            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
166        if destination is not None and cache is not None:
167            raise ValueError(  # noqa: TRY003
168                "You must provide either a destination ID or a cache object, not both."
169            )
170
171        if cache:
172            if not cache._deployed_destination_id:  # noqa: SLF001
173                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
174
175            destination = cache._deployed_destination_id  # noqa: SLF001
176
177        if destination is None:
178            raise ValueError("No destination ID provided.")  # noqa: TRY003
179
180        delete_destination(
181            destination_id=destination,
182            api_root=self.api_root,
183            api_key=self.api_key,
184        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

class CloudConnection(airbyte.cloud.connections.CloudConnection):
57class CloudConnection(Stable_CloudConnection):  # noqa: D101  # Docstring inherited from parent.
58    __doc__ = (
59        f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
60    )
61    permanently_delete = Stable_CloudConnection._permanently_delete

Experimental implementation of .CloudConnection.

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
def permanently_delete( self, *, delete_source: bool = False, delete_destination: bool = False) -> None:
193    def _permanently_delete(
194        self,
195        *,
196        delete_source: bool = False,
197        delete_destination: bool = False,
198    ) -> None:
199        """Delete the connection.
200
201        Args:
202            delete_source: Whether to also delete the source.
203            delete_destination: Whether to also delete the destination.
204        """
205        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
206            connection=self
207        )
208
209        if delete_source:
210            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
211                source=self.source_id
212            )
213
214        if delete_destination:
215            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
216                destination=self.destination_id,
217            )

Delete the connection.

Arguments:
  • delete_source: Whether to also delete the source.
  • delete_destination: Whether to also delete the destination.