airbyte.cloud.experimental
Experimental features for interacting with the Airbyte Cloud API.
You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These features are subject to change and may not be available in all environments. Future versions of PyAirbyte may remove or change these features without notice.
To use this module, replace an import like this:
from airbyte.cloud import CloudConnection, CloudWorkspace
with an import like this:
from airbyte.cloud.experimental import CloudConnection, CloudWorkspace
You can toggle between the stable and experimental versions of these classes by changing the import path. This allows you to test new features without requiring substantial changes to your codebase.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Experimental features for interacting with the Airbyte Cloud API. 3 4You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These 5features are subject to change and may not be available in all environments. **Future versions of 6PyAirbyte may remove or change these features without notice.** 7 8To use this module, replace an import like this: 9 10```python 11from airbyte.cloud import CloudConnection, CloudWorkspace 12``` 13 14with an import like this: 15 16```python 17from airbyte.cloud.experimental import CloudConnection, CloudWorkspace 18``` 19 20You can toggle between the stable and experimental versions of these classes by changing the import 21path. This allows you to test new features without requiring substantial changes to your codebase. 22 23""" 24# ruff: noqa: SLF001 # This file accesses private members of other classes. 25 26from __future__ import annotations 27 28import warnings 29 30from airbyte import exceptions as exc 31from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection 32from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace 33 34 35# This module is not imported anywhere by default, so this warning should only print if the user 36# explicitly imports it. 37warnings.warn( 38 message="The `airbyte.cloud.experimental` module is experimental and may change in the future.", 39 category=exc.AirbyteExperimentalFeatureWarning, 40 stacklevel=2, 41) 42 43 44class CloudWorkspace(Stable_CloudWorkspace): # noqa: D101 # Docstring inherited from parent. 45 __doc__ = ( 46 f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}" 47 ) 48 deploy_connection = Stable_CloudWorkspace._deploy_connection 49 deploy_source = Stable_CloudWorkspace._deploy_source 50 deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination 51 permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection 52 permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source 53 permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination 54 55 56class CloudConnection(Stable_CloudConnection): # noqa: D101 # Docstring inherited from parent. 57 __doc__ = ( 58 f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}" 59 ) 60 permanently_delete = Stable_CloudConnection._permanently_delete
45class CloudWorkspace(Stable_CloudWorkspace): # noqa: D101 # Docstring inherited from parent. 46 __doc__ = ( 47 f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}" 48 ) 49 deploy_connection = Stable_CloudWorkspace._deploy_connection 50 deploy_source = Stable_CloudWorkspace._deploy_source 51 deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination 52 permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection 53 permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source 54 permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination
Experimental implementation of .CloudWorkspace
.
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
190 def _deploy_connection( 191 self, 192 source: Source | str, 193 cache: CacheBase | None = None, 194 destination: str | None = None, 195 table_prefix: str | None = None, 196 selected_streams: list[str] | None = None, 197 ) -> CloudConnection: 198 """Deploy a source and cache to the workspace as a new connection. 199 200 Returns the newly deployed connection ID as a `str`. 201 202 Args: 203 source (Source | str): The source to deploy. You can pass either an already deployed 204 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 205 it will be deployed automatically. 206 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 207 `cache` or `destination`, but not both. 208 destination (str, optional): The destination ID to use. You can provide 209 `cache` or `destination`, but not both. 210 table_prefix (str, optional): The table prefix to use for the cache. If not provided, 211 the cache's table prefix will be used. 212 selected_streams (list[str], optional): The selected stream names to use for the 213 connection. If not provided, the source's selected streams will be used. 214 """ 215 # Resolve source ID 216 source_id: str 217 if isinstance(source, Source): 218 selected_streams = selected_streams or source.get_selected_streams() 219 source_id = ( 220 source._deployed_source_id # noqa: SLF001 # Access to non-public API 221 or self._deploy_source(source) 222 ) 223 else: 224 source_id = source 225 if not selected_streams: 226 raise exc.PyAirbyteInputError( 227 guidance="You must provide `selected_streams` when deploying a source ID." 228 ) 229 230 # Resolve destination ID 231 destination_id: str 232 if destination: 233 destination_id = destination 234 elif cache: 235 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 236 if not cache._deployed_destination_id: # noqa: SLF001 237 destination_id = self._deploy_cache_as_destination(cache) 238 else: 239 destination_id = cache._deployed_destination_id # noqa: SLF001 240 else: 241 raise exc.PyAirbyteInputError( 242 guidance="You must provide either a destination ID or a cache object." 243 ) 244 245 assert source_id is not None 246 assert destination_id is not None 247 248 deployed_connection = create_connection( 249 name="Connection (Deployed by PyAirbyte)", 250 source_id=source_id, 251 destination_id=destination_id, 252 api_root=self.api_root, 253 api_key=self.api_key, 254 workspace_id=self.workspace_id, 255 selected_stream_names=selected_streams, 256 prefix=table_prefix or "", 257 ) 258 259 if isinstance(source, Source): 260 source._deployed_api_root = self.api_root # noqa: SLF001 261 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 262 source._deployed_source_id = source_id # noqa: SLF001 263 if cache: 264 cache._deployed_api_root = self.api_root # noqa: SLF001 265 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 266 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 267 268 return CloudConnection( 269 workspace=self, 270 connection_id=deployed_connection.connection_id, 271 source=deployed_connection.source_id, 272 destination=deployed_connection.destination_id, 273 )
Deploy a source and cache to the workspace as a new connection.
Returns the newly deployed connection ID as a str
.
Arguments:
- source (Source | str): The source to deploy. You can pass either an already deployed
source ID
str
or a PyAirbyteSource
object. If you pass aSource
object, it will be deployed automatically. - cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
cache
ordestination
, but not both. - destination (str, optional): The destination ID to use. You can provide
cache
ordestination
, but not both. - table_prefix (str, optional): The table prefix to use for the cache. If not provided, the cache's table prefix will be used.
- selected_streams (list[str], optional): The selected stream names to use for the connection. If not provided, the source's selected streams will be used.
73 def _deploy_source( 74 self, 75 source: Source, 76 ) -> str: 77 """Deploy a source to the workspace. 78 79 Returns the newly deployed source ID. 80 """ 81 source_configuration = source.get_config().copy() 82 source_configuration["sourceType"] = source.name.replace("source-", "") 83 84 deployed_source = create_source( 85 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 86 api_root=self.api_root, 87 api_key=self.api_key, 88 workspace_id=self.workspace_id, 89 config=source_configuration, 90 ) 91 92 # Set the deployment Ids on the source object 93 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 94 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 95 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 96 97 return deployed_source.source_id
Deploy a source to the workspace.
Returns the newly deployed source ID.
129 def _deploy_cache_as_destination( 130 self, 131 cache: CacheBase, 132 ) -> str: 133 """Deploy a cache to the workspace as a new destination. 134 135 Returns the newly deployed destination ID. 136 """ 137 cache_type_name = cache.__class__.__name__.replace("Cache", "") 138 139 deployed_destination: DestinationResponse = create_destination( 140 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 141 api_root=self.api_root, 142 api_key=self.api_key, 143 workspace_id=self.workspace_id, 144 config=get_destination_config_from_cache(cache), 145 ) 146 147 # Set the deployment Ids on the source object 148 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 149 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 150 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 151 152 return deployed_destination.destination_id
Deploy a cache to the workspace as a new destination.
Returns the newly deployed destination ID.
289 def _permanently_delete_connection( 290 self, 291 connection: str | CloudConnection, 292 *, 293 delete_source: bool = False, 294 delete_destination: bool = False, 295 ) -> None: 296 """Delete a deployed connection from the workspace.""" 297 if connection is None: 298 raise ValueError("No connection ID provided.") # noqa: TRY003 299 300 if isinstance(connection, str): 301 connection = CloudConnection( 302 workspace=self, 303 connection_id=connection, 304 ) 305 306 delete_connection( 307 connection_id=connection.connection_id, 308 api_root=self.api_root, 309 api_key=self.api_key, 310 workspace_id=self.workspace_id, 311 ) 312 if delete_source: 313 self._permanently_delete_source(source=connection.source_id) 314 315 if delete_destination: 316 self._permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
99 def _permanently_delete_source( 100 self, 101 source: str | Source, 102 ) -> None: 103 """Delete a source from the workspace. 104 105 You can pass either the source ID `str` or a deployed `Source` object. 106 """ 107 if not isinstance(source, str | Source): 108 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 109 110 if isinstance(source, Source): 111 if not source._deployed_source_id: # noqa: SLF001 112 raise ValueError("Source has not been deployed.") # noqa: TRY003 113 114 source_id = source._deployed_source_id # noqa: SLF001 115 116 elif isinstance(source, str): 117 source_id = source 118 119 delete_source( 120 source_id=source_id, 121 api_root=self.api_root, 122 api_key=self.api_key, 123 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
154 def _permanently_delete_destination( 155 self, 156 *, 157 destination: str | None = None, 158 cache: CacheBase | None = None, 159 ) -> None: 160 """Delete a deployed destination from the workspace. 161 162 You can pass either the `Cache` class or the deployed destination ID as a `str`. 163 """ 164 if destination is None and cache is None: 165 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 166 if destination is not None and cache is not None: 167 raise ValueError( # noqa: TRY003 168 "You must provide either a destination ID or a cache object, not both." 169 ) 170 171 if cache: 172 if not cache._deployed_destination_id: # noqa: SLF001 173 raise ValueError("Cache has not been deployed.") # noqa: TRY003 174 175 destination = cache._deployed_destination_id # noqa: SLF001 176 177 if destination is None: 178 raise ValueError("No destination ID provided.") # noqa: TRY003 179 180 delete_destination( 181 destination_id=destination, 182 api_root=self.api_root, 183 api_key=self.api_key, 184 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
57class CloudConnection(Stable_CloudConnection): # noqa: D101 # Docstring inherited from parent. 58 __doc__ = ( 59 f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}" 60 ) 61 permanently_delete = Stable_CloudConnection._permanently_delete
Experimental implementation of .CloudConnection
.
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
193 def _permanently_delete( 194 self, 195 *, 196 delete_source: bool = False, 197 delete_destination: bool = False, 198 ) -> None: 199 """Delete the connection. 200 201 Args: 202 delete_source: Whether to also delete the source. 203 delete_destination: Whether to also delete the destination. 204 """ 205 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 206 connection=self 207 ) 208 209 if delete_source: 210 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 211 source=self.source_id 212 ) 213 214 if delete_destination: 215 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 216 destination=self.destination_id, 217 )
Delete the connection.
Arguments:
- delete_source: Whether to also delete the source.
- delete_destination: Whether to also delete the destination.