airbyte.cloud.experimental

Experimental features for interacting with the Airbyte Cloud API.

You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These features are subject to change and may not be available in all environments. Future versions of PyAirbyte may remove or change these features without notice.

To use this module, replace an import like this:

from airbyte.cloud import CloudConnection, CloudWorkspace

with an import like this:

from airbyte.cloud.experimental import CloudConnection, CloudWorkspace

You can toggle between the stable and experimental versions of these classes by changing the import path. This allows you to test new features without requiring substantial changes to your codebase.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""Experimental features for interacting with the Airbyte Cloud API.
 3
 4You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These
 5features are subject to change and may not be available in all environments. **Future versions of
 6PyAirbyte may remove or change these features without notice.**
 7
 8To use this module, replace an import like this:
 9
10```python
11from airbyte.cloud import CloudConnection, CloudWorkspace
12```
13
14with an import like this:
15
16```python
17from airbyte.cloud.experimental import CloudConnection, CloudWorkspace
18```
19
20You can toggle between the stable and experimental versions of these classes by changing the import
21path. This allows you to test new features without requiring substantial changes to your codebase.
22
23"""
24# ruff: noqa: SLF001  # This file accesses private members of other classes.
25
26from __future__ import annotations
27
28import warnings
29
30from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection
31from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace
32
33
34# This module is not imported anywhere by default, so this warning should only print if the user
35# explicitly imports it.
36warnings.warn(
37    message="The `airbyte.cloud.experimental` module is experimental and may change in the future.",
38    category=FutureWarning,
39    stacklevel=2,
40)
41
42
43class CloudWorkspace(Stable_CloudWorkspace):
44    __doc__ = (
45        f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
46    )
47    deploy_connection = Stable_CloudWorkspace._deploy_connection
48    deploy_source = Stable_CloudWorkspace._deploy_source
49    deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination
50    permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection
51    permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source
52    permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination
53
54
55class CloudConnection(Stable_CloudConnection):
56    __doc__ = (
57        f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
58    )
59    permanently_delete = Stable_CloudConnection._permanently_delete
class CloudWorkspace(airbyte.cloud.workspaces.CloudWorkspace):
44class CloudWorkspace(Stable_CloudWorkspace):
45    __doc__ = (
46        f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}"
47    )
48    deploy_connection = Stable_CloudWorkspace._deploy_connection
49    deploy_source = Stable_CloudWorkspace._deploy_source
50    deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination
51    permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection
52    permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source
53    permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination

Experimental implementation of airbyte.cloud.CloudWorkspace.

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
def deploy_connection( self, source: airbyte.sources.base.Source | str, cache: airbyte.caches.base.CacheBase | None = None, destination: str | None = None, table_prefix: str | None = None, selected_streams: list[str] | None = None) -> airbyte.cloud.connections.CloudConnection:
186    def _deploy_connection(
187        self,
188        source: Source | str,
189        cache: CacheBase | None = None,
190        destination: str | None = None,
191        table_prefix: str | None = None,
192        selected_streams: list[str] | None = None,
193    ) -> CloudConnection:
194        """Deploy a source and cache to the workspace as a new connection.
195
196        Returns the newly deployed connection ID as a `str`.
197
198        Args:
199            source (Source | str): The source to deploy. You can pass either an already deployed
200                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
201                it will be deployed automatically.
202            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
203                `cache` or `destination`, but not both.
204            destination (str, optional): The destination ID to use. You can provide
205                `cache` or `destination`, but not both.
206        """
207        # Resolve source ID
208        source_id: str
209        if isinstance(source, Source):
210            selected_streams = selected_streams or source.get_selected_streams()
211            if source._deployed_source_id:  # noqa: SLF001
212                source_id = source._deployed_source_id  # noqa: SLF001
213            else:
214                source_id = self._deploy_source(source)
215        else:
216            source_id = source
217            if not selected_streams:
218                raise exc.PyAirbyteInputError(
219                    guidance="You must provide `selected_streams` when deploying a source ID."
220                )
221
222        # Resolve destination ID
223        destination_id: str
224        if destination:
225            destination_id = destination
226        elif cache:
227            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
228            if not cache._deployed_destination_id:  # noqa: SLF001
229                destination_id = self._deploy_cache_as_destination(cache)
230            else:
231                destination_id = cache._deployed_destination_id  # noqa: SLF001
232        else:
233            raise exc.PyAirbyteInputError(
234                guidance="You must provide either a destination ID or a cache object."
235            )
236
237        assert source_id is not None
238        assert destination_id is not None
239
240        deployed_connection = create_connection(
241            name="Connection (Deployed by PyAirbyte)",
242            source_id=source_id,
243            destination_id=destination_id,
244            api_root=self.api_root,
245            api_key=self.api_key,
246            workspace_id=self.workspace_id,
247            selected_stream_names=selected_streams,
248            prefix=table_prefix or "",
249        )
250
251        if isinstance(source, Source):
252            source._deployed_api_root = self.api_root  # noqa: SLF001
253            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
254            source._deployed_source_id = source_id  # noqa: SLF001
255        if cache:
256            cache._deployed_api_root = self.api_root  # noqa: SLF001
257            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
258            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
259
260        return CloudConnection(
261            workspace=self,
262            connection_id=deployed_connection.connection_id,
263            source=deployed_connection.source_id,
264            destination=deployed_connection.destination_id,
265        )

Deploy a source and cache to the workspace as a new connection.

Returns the newly deployed connection ID as a str.

Arguments:
  • source (Source | str): The source to deploy. You can pass either an already deployed source ID str or a PyAirbyte Source object. If you pass a Source object, it will be deployed automatically.
  • cache (CacheBase, optional): The cache to deploy as a new destination. You can provide cache or destination, but not both.
  • destination (str, optional): The destination ID to use. You can provide cache or destination, but not both.
def deploy_source(self, source: airbyte.sources.base.Source) -> str:
71    def _deploy_source(
72        self,
73        source: Source,
74    ) -> str:
75        """Deploy a source to the workspace.
76
77        Returns the newly deployed source ID.
78        """
79        source_configuration = source.get_config().copy()
80        source_configuration["sourceType"] = source.name.replace("source-", "")
81
82        deployed_source = create_source(
83            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
84            api_root=self.api_root,
85            api_key=self.api_key,
86            workspace_id=self.workspace_id,
87            config=source_configuration,
88        )
89
90        # Set the deployment Ids on the source object
91        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
92        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
93        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
94
95        return deployed_source.source_id

Deploy a source to the workspace.

Returns the newly deployed source ID.

def deploy_cache_as_destination(self, cache: airbyte.caches.base.CacheBase) -> str:
126    def _deploy_cache_as_destination(
127        self,
128        cache: CacheBase,
129    ) -> str:
130        """Deploy a cache to the workspace as a new destination.
131
132        Returns the newly deployed destination ID.
133        """
134        cache_type_name = cache.__class__.__name__.replace("Cache", "")
135
136        deployed_destination: DestinationResponse = create_destination(
137            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
138            api_root=self.api_root,
139            api_key=self.api_key,
140            workspace_id=self.workspace_id,
141            config=get_destination_config_from_cache(cache),
142        )
143
144        # Set the deployment Ids on the source object
145        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
146        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
147        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
148
149        return deployed_destination.destination_id

Deploy a cache to the workspace as a new destination.

Returns the newly deployed destination ID.

def permanently_delete_connection( self, connection: str | airbyte.cloud.connections.CloudConnection, *, delete_source: bool = False, delete_destination: bool = False) -> None:
281    def _permanently_delete_connection(
282        self,
283        connection: str | CloudConnection,
284        *,
285        delete_source: bool = False,
286        delete_destination: bool = False,
287    ) -> None:
288        """Delete a deployed connection from the workspace."""
289        if connection is None:
290            raise ValueError("No connection ID provided.")  # noqa: TRY003
291
292        if isinstance(connection, str):
293            connection = CloudConnection(
294                workspace=self,
295                connection_id=connection,
296            )
297
298        delete_connection(
299            connection_id=connection.connection_id,
300            api_root=self.api_root,
301            api_key=self.api_key,
302            workspace_id=self.workspace_id,
303        )
304        if delete_source:
305            self._permanently_delete_source(source=connection.source_id)
306
307        if delete_destination:
308            self._permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def permanently_delete_source(self, source: str | airbyte.sources.base.Source) -> None:
 97    def _permanently_delete_source(
 98        self,
 99        source: str | Source,
100    ) -> None:
101        """Delete a source from the workspace.
102
103        You can pass either the source ID `str` or a deployed `Source` object.
104        """
105        if not isinstance(source, (str, Source)):
106            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
107
108        if isinstance(source, Source):
109            if not source._deployed_source_id:  # noqa: SLF001
110                raise ValueError("Source has not been deployed.")  # noqa: TRY003
111
112            source_id = source._deployed_source_id  # noqa: SLF001
113
114        elif isinstance(source, str):
115            source_id = source
116
117        delete_source(
118            source_id=source_id,
119            api_root=self.api_root,
120            api_key=self.api_key,
121        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, *, destination: str | None = None, cache: airbyte.caches.base.CacheBase | None = None) -> None:
151    def _permanently_delete_destination(
152        self,
153        *,
154        destination: str | None = None,
155        cache: CacheBase | None = None,
156    ) -> None:
157        """Delete a deployed destination from the workspace.
158
159        You can pass either the `Cache` class or the deployed destination ID as a `str`.
160        """
161        if destination is None and cache is None:
162            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
163        if destination is not None and cache is not None:
164            raise ValueError(  # noqa: TRY003
165                "You must provide either a destination ID or a cache object, not both."
166            )
167
168        if cache:
169            if not cache._deployed_destination_id:  # noqa: SLF001
170                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
171
172            destination = cache._deployed_destination_id  # noqa: SLF001
173
174        if destination is None:
175            raise ValueError("No destination ID provided.")  # noqa: TRY003
176
177        delete_destination(
178            destination_id=destination,
179            api_root=self.api_root,
180            api_key=self.api_key,
181        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

class CloudConnection(airbyte.cloud.connections.CloudConnection):
56class CloudConnection(Stable_CloudConnection):
57    __doc__ = (
58        f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}"
59    )
60    permanently_delete = Stable_CloudConnection._permanently_delete

Experimental implementation of airbyte.cloud.CloudConnection.

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
def permanently_delete( self, *, delete_source: bool = False, delete_destination: bool = False) -> None:
191    def _permanently_delete(
192        self,
193        *,
194        delete_source: bool = False,
195        delete_destination: bool = False,
196    ) -> None:
197        """Delete the connection.
198
199        Args:
200            delete_source: Whether to also delete the source.
201            delete_destination: Whether to also delete the destination.
202        """
203        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
204            connection=self
205        )
206
207        if delete_source:
208            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
209                source=self.source_id
210            )
211
212        if delete_destination:
213            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
214                destination=self.destination_id,
215            )

Delete the connection.

Arguments:
  • delete_source: Whether to also delete the source.
  • delete_destination: Whether to also delete the destination.