airbyte.cloud.experimental
Experimental features for interacting with the Airbyte Cloud API.
You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These features are subject to change and may not be available in all environments. Future versions of PyAirbyte may remove or change these features without notice.
To use this module, replace an import like this:
from airbyte.cloud import CloudConnection, CloudWorkspace
with an import like this:
from airbyte.cloud.experimental import CloudConnection, CloudWorkspace
You can toggle between the stable and experimental versions of these classes by changing the import path. This allows you to test new features without requiring substantial changes to your codebase.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Experimental features for interacting with the Airbyte Cloud API. 3 4You can use this module to access experimental features in Airbyte Cloud, OSS, and Enterprise. These 5features are subject to change and may not be available in all environments. **Future versions of 6PyAirbyte may remove or change these features without notice.** 7 8To use this module, replace an import like this: 9 10```python 11from airbyte.cloud import CloudConnection, CloudWorkspace 12``` 13 14with an import like this: 15 16```python 17from airbyte.cloud.experimental import CloudConnection, CloudWorkspace 18``` 19 20You can toggle between the stable and experimental versions of these classes by changing the import 21path. This allows you to test new features without requiring substantial changes to your codebase. 22 23""" 24# ruff: noqa: SLF001 # This file accesses private members of other classes. 25 26from __future__ import annotations 27 28import warnings 29 30from airbyte.cloud.connections import CloudConnection as Stable_CloudConnection 31from airbyte.cloud.workspaces import CloudWorkspace as Stable_CloudWorkspace 32 33 34# This module is not imported anywhere by default, so this warning should only print if the user 35# explicitly imports it. 36warnings.warn( 37 message="The `airbyte.cloud.experimental` module is experimental and may change in the future.", 38 category=FutureWarning, 39 stacklevel=2, 40) 41 42 43class CloudWorkspace(Stable_CloudWorkspace): 44 __doc__ = ( 45 f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}" 46 ) 47 deploy_connection = Stable_CloudWorkspace._deploy_connection 48 deploy_source = Stable_CloudWorkspace._deploy_source 49 deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination 50 permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection 51 permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source 52 permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination 53 54 55class CloudConnection(Stable_CloudConnection): 56 __doc__ = ( 57 f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}" 58 ) 59 permanently_delete = Stable_CloudConnection._permanently_delete
44class CloudWorkspace(Stable_CloudWorkspace): 45 __doc__ = ( 46 f"Experimental implementation of `.CloudWorkspace`.\n\n{Stable_CloudConnection.__doc__}" 47 ) 48 deploy_connection = Stable_CloudWorkspace._deploy_connection 49 deploy_source = Stable_CloudWorkspace._deploy_source 50 deploy_cache_as_destination = Stable_CloudWorkspace._deploy_cache_as_destination 51 permanently_delete_connection = Stable_CloudWorkspace._permanently_delete_connection 52 permanently_delete_source = Stable_CloudWorkspace._permanently_delete_source 53 permanently_delete_destination = Stable_CloudWorkspace._permanently_delete_destination
Experimental implementation of airbyte.cloud.CloudWorkspace
.
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
186 def _deploy_connection( 187 self, 188 source: Source | str, 189 cache: CacheBase | None = None, 190 destination: str | None = None, 191 table_prefix: str | None = None, 192 selected_streams: list[str] | None = None, 193 ) -> CloudConnection: 194 """Deploy a source and cache to the workspace as a new connection. 195 196 Returns the newly deployed connection ID as a `str`. 197 198 Args: 199 source (Source | str): The source to deploy. You can pass either an already deployed 200 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 201 it will be deployed automatically. 202 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 203 `cache` or `destination`, but not both. 204 destination (str, optional): The destination ID to use. You can provide 205 `cache` or `destination`, but not both. 206 """ 207 # Resolve source ID 208 source_id: str 209 if isinstance(source, Source): 210 selected_streams = selected_streams or source.get_selected_streams() 211 if source._deployed_source_id: # noqa: SLF001 212 source_id = source._deployed_source_id # noqa: SLF001 213 else: 214 source_id = self._deploy_source(source) 215 else: 216 source_id = source 217 if not selected_streams: 218 raise exc.PyAirbyteInputError( 219 guidance="You must provide `selected_streams` when deploying a source ID." 220 ) 221 222 # Resolve destination ID 223 destination_id: str 224 if destination: 225 destination_id = destination 226 elif cache: 227 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 228 if not cache._deployed_destination_id: # noqa: SLF001 229 destination_id = self._deploy_cache_as_destination(cache) 230 else: 231 destination_id = cache._deployed_destination_id # noqa: SLF001 232 else: 233 raise exc.PyAirbyteInputError( 234 guidance="You must provide either a destination ID or a cache object." 235 ) 236 237 assert source_id is not None 238 assert destination_id is not None 239 240 deployed_connection = create_connection( 241 name="Connection (Deployed by PyAirbyte)", 242 source_id=source_id, 243 destination_id=destination_id, 244 api_root=self.api_root, 245 api_key=self.api_key, 246 workspace_id=self.workspace_id, 247 selected_stream_names=selected_streams, 248 prefix=table_prefix or "", 249 ) 250 251 if isinstance(source, Source): 252 source._deployed_api_root = self.api_root # noqa: SLF001 253 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 254 source._deployed_source_id = source_id # noqa: SLF001 255 if cache: 256 cache._deployed_api_root = self.api_root # noqa: SLF001 257 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 258 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 259 260 return CloudConnection( 261 workspace=self, 262 connection_id=deployed_connection.connection_id, 263 source=deployed_connection.source_id, 264 destination=deployed_connection.destination_id, 265 )
Deploy a source and cache to the workspace as a new connection.
Returns the newly deployed connection ID as a str
.
Arguments:
- source (Source | str): The source to deploy. You can pass either an already deployed
source ID
str
or a PyAirbyteSource
object. If you pass aSource
object, it will be deployed automatically. - cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
cache
ordestination
, but not both. - destination (str, optional): The destination ID to use. You can provide
cache
ordestination
, but not both.
71 def _deploy_source( 72 self, 73 source: Source, 74 ) -> str: 75 """Deploy a source to the workspace. 76 77 Returns the newly deployed source ID. 78 """ 79 source_configuration = source.get_config().copy() 80 source_configuration["sourceType"] = source.name.replace("source-", "") 81 82 deployed_source = create_source( 83 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 84 api_root=self.api_root, 85 api_key=self.api_key, 86 workspace_id=self.workspace_id, 87 config=source_configuration, 88 ) 89 90 # Set the deployment Ids on the source object 91 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 92 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 93 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 94 95 return deployed_source.source_id
Deploy a source to the workspace.
Returns the newly deployed source ID.
126 def _deploy_cache_as_destination( 127 self, 128 cache: CacheBase, 129 ) -> str: 130 """Deploy a cache to the workspace as a new destination. 131 132 Returns the newly deployed destination ID. 133 """ 134 cache_type_name = cache.__class__.__name__.replace("Cache", "") 135 136 deployed_destination: DestinationResponse = create_destination( 137 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 138 api_root=self.api_root, 139 api_key=self.api_key, 140 workspace_id=self.workspace_id, 141 config=get_destination_config_from_cache(cache), 142 ) 143 144 # Set the deployment Ids on the source object 145 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 146 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 147 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 148 149 return deployed_destination.destination_id
Deploy a cache to the workspace as a new destination.
Returns the newly deployed destination ID.
281 def _permanently_delete_connection( 282 self, 283 connection: str | CloudConnection, 284 *, 285 delete_source: bool = False, 286 delete_destination: bool = False, 287 ) -> None: 288 """Delete a deployed connection from the workspace.""" 289 if connection is None: 290 raise ValueError("No connection ID provided.") # noqa: TRY003 291 292 if isinstance(connection, str): 293 connection = CloudConnection( 294 workspace=self, 295 connection_id=connection, 296 ) 297 298 delete_connection( 299 connection_id=connection.connection_id, 300 api_root=self.api_root, 301 api_key=self.api_key, 302 workspace_id=self.workspace_id, 303 ) 304 if delete_source: 305 self._permanently_delete_source(source=connection.source_id) 306 307 if delete_destination: 308 self._permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
97 def _permanently_delete_source( 98 self, 99 source: str | Source, 100 ) -> None: 101 """Delete a source from the workspace. 102 103 You can pass either the source ID `str` or a deployed `Source` object. 104 """ 105 if not isinstance(source, (str, Source)): 106 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 107 108 if isinstance(source, Source): 109 if not source._deployed_source_id: # noqa: SLF001 110 raise ValueError("Source has not been deployed.") # noqa: TRY003 111 112 source_id = source._deployed_source_id # noqa: SLF001 113 114 elif isinstance(source, str): 115 source_id = source 116 117 delete_source( 118 source_id=source_id, 119 api_root=self.api_root, 120 api_key=self.api_key, 121 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
151 def _permanently_delete_destination( 152 self, 153 *, 154 destination: str | None = None, 155 cache: CacheBase | None = None, 156 ) -> None: 157 """Delete a deployed destination from the workspace. 158 159 You can pass either the `Cache` class or the deployed destination ID as a `str`. 160 """ 161 if destination is None and cache is None: 162 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 163 if destination is not None and cache is not None: 164 raise ValueError( # noqa: TRY003 165 "You must provide either a destination ID or a cache object, not both." 166 ) 167 168 if cache: 169 if not cache._deployed_destination_id: # noqa: SLF001 170 raise ValueError("Cache has not been deployed.") # noqa: TRY003 171 172 destination = cache._deployed_destination_id # noqa: SLF001 173 174 if destination is None: 175 raise ValueError("No destination ID provided.") # noqa: TRY003 176 177 delete_destination( 178 destination_id=destination, 179 api_root=self.api_root, 180 api_key=self.api_key, 181 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
56class CloudConnection(Stable_CloudConnection): 57 __doc__ = ( 58 f"Experimental implementation of `.CloudConnection`.\n\n{Stable_CloudConnection.__doc__}" 59 ) 60 permanently_delete = Stable_CloudConnection._permanently_delete
Experimental implementation of airbyte.cloud.CloudConnection
.
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
191 def _permanently_delete( 192 self, 193 *, 194 delete_source: bool = False, 195 delete_destination: bool = False, 196 ) -> None: 197 """Delete the connection. 198 199 Args: 200 delete_source: Whether to also delete the source. 201 delete_destination: Whether to also delete the destination. 202 """ 203 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 204 connection=self 205 ) 206 207 if delete_source: 208 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 209 source=self.source_id 210 ) 211 212 if delete_destination: 213 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 214 destination=self.destination_id, 215 )
Delete the connection.
Arguments:
- delete_source: Whether to also delete the source.
- delete_destination: Whether to also delete the destination.