airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root
, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
name="test-source",
source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass 39from typing import TYPE_CHECKING, Any 40 41from airbyte import exceptions as exc 42from airbyte._util import api_util, text_util 43from airbyte._util.api_util import get_web_url_root 44from airbyte.cloud.connections import CloudConnection 45from airbyte.cloud.connectors import CloudDestination, CloudSource 46from airbyte.destinations.base import Destination 47from airbyte.secrets.base import SecretString 48 49 50if TYPE_CHECKING: 51 from collections.abc import Callable 52 53 from airbyte.sources.base import Source 54 55 56@dataclass 57class CloudWorkspace: 58 """A remote workspace on the Airbyte Cloud. 59 60 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 61 instances, both OSS and Enterprise. 62 """ 63 64 workspace_id: str 65 client_id: SecretString 66 client_secret: SecretString 67 api_root: str = api_util.CLOUD_API_ROOT 68 69 def __post_init__(self) -> None: 70 """Ensure that the client ID and secret are handled securely.""" 71 self.client_id = SecretString(self.client_id) 72 self.client_secret = SecretString(self.client_secret) 73 74 @property 75 def workspace_url(self) -> str | None: 76 """The web URL of the workspace.""" 77 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 78 79 # Test connection and creds 80 81 def connect(self) -> None: 82 """Check that the workspace is reachable and raise an exception otherwise. 83 84 Note: It is not necessary to call this method before calling other operations. It 85 serves primarily as a simple check to ensure that the workspace is reachable 86 and credentials are correct. 87 """ 88 _ = api_util.get_workspace( 89 api_root=self.api_root, 90 workspace_id=self.workspace_id, 91 client_id=self.client_id, 92 client_secret=self.client_secret, 93 ) 94 print(f"Successfully connected to workspace: {self.workspace_url}") 95 96 # Get sources, destinations, and connections 97 98 def get_connection( 99 self, 100 connection_id: str, 101 ) -> CloudConnection: 102 """Get a connection by ID. 103 104 This method does not fetch data from the API. It returns a `CloudConnection` object, 105 which will be loaded lazily as needed. 106 """ 107 return CloudConnection( 108 workspace=self, 109 connection_id=connection_id, 110 ) 111 112 def get_source( 113 self, 114 source_id: str, 115 ) -> CloudSource: 116 """Get a source by ID. 117 118 This method does not fetch data from the API. It returns a `CloudSource` object, 119 which will be loaded lazily as needed. 120 """ 121 return CloudSource( 122 workspace=self, 123 connector_id=source_id, 124 ) 125 126 def get_destination( 127 self, 128 destination_id: str, 129 ) -> CloudDestination: 130 """Get a destination by ID. 131 132 This method does not fetch data from the API. It returns a `CloudDestination` object, 133 which will be loaded lazily as needed. 134 """ 135 return CloudDestination( 136 workspace=self, 137 connector_id=destination_id, 138 ) 139 140 # Deploy sources and destinations 141 142 def deploy_source( 143 self, 144 name: str, 145 source: Source, 146 *, 147 unique: bool = True, 148 random_name_suffix: bool = False, 149 ) -> CloudSource: 150 """Deploy a source to the workspace. 151 152 Returns the newly deployed source. 153 154 Args: 155 name: The name to use when deploying. 156 source: The source object to deploy. 157 unique: Whether to require a unique name. If `True`, duplicate names 158 are not allowed. Defaults to `True`. 159 random_name_suffix: Whether to append a random suffix to the name. 160 """ 161 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 162 source_config_dict["sourceType"] = source.name.replace("source-", "") 163 164 if random_name_suffix: 165 name += f" (ID: {text_util.generate_random_suffix()})" 166 167 if unique: 168 existing = self.list_sources(name=name) 169 if existing: 170 raise exc.AirbyteDuplicateResourcesError( 171 resource_type="source", 172 resource_name=name, 173 ) 174 175 deployed_source = api_util.create_source( 176 name=name, 177 api_root=self.api_root, 178 workspace_id=self.workspace_id, 179 config=source_config_dict, 180 client_id=self.client_id, 181 client_secret=self.client_secret, 182 ) 183 return CloudSource( 184 workspace=self, 185 connector_id=deployed_source.source_id, 186 ) 187 188 def deploy_destination( 189 self, 190 name: str, 191 destination: Destination | dict[str, Any], 192 *, 193 unique: bool = True, 194 random_name_suffix: bool = False, 195 ) -> CloudDestination: 196 """Deploy a destination to the workspace. 197 198 Returns the newly deployed destination ID. 199 200 Args: 201 name: The name to use when deploying. 202 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 203 dictionary of configuration values. 204 unique: Whether to require a unique name. If `True`, duplicate names 205 are not allowed. Defaults to `True`. 206 random_name_suffix: Whether to append a random suffix to the name. 207 """ 208 if isinstance(destination, Destination): 209 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 210 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 211 # raise ValueError(destination_conf_dict) 212 else: 213 destination_conf_dict = destination.copy() 214 if "destinationType" not in destination_conf_dict: 215 raise exc.PyAirbyteInputError( 216 message="Missing `destinationType` in configuration dictionary.", 217 ) 218 219 if random_name_suffix: 220 name += f" (ID: {text_util.generate_random_suffix()})" 221 222 if unique: 223 existing = self.list_destinations(name=name) 224 if existing: 225 raise exc.AirbyteDuplicateResourcesError( 226 resource_type="destination", 227 resource_name=name, 228 ) 229 230 deployed_destination = api_util.create_destination( 231 name=name, 232 api_root=self.api_root, 233 workspace_id=self.workspace_id, 234 config=destination_conf_dict, # Wants a dataclass but accepts dict 235 client_id=self.client_id, 236 client_secret=self.client_secret, 237 ) 238 return CloudDestination( 239 workspace=self, 240 connector_id=deployed_destination.destination_id, 241 ) 242 243 def permanently_delete_source( 244 self, 245 source: str | CloudSource, 246 ) -> None: 247 """Delete a source from the workspace. 248 249 You can pass either the source ID `str` or a deployed `Source` object. 250 """ 251 if not isinstance(source, (str, CloudSource)): 252 raise exc.PyAirbyteInputError( 253 message="Invalid source type.", 254 input_value=type(source).__name__, 255 ) 256 257 api_util.delete_source( 258 source_id=source.connector_id if isinstance(source, CloudSource) else source, 259 api_root=self.api_root, 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 ) 263 264 # Deploy and delete destinations 265 266 def permanently_delete_destination( 267 self, 268 destination: str | CloudDestination, 269 ) -> None: 270 """Delete a deployed destination from the workspace. 271 272 You can pass either the `Cache` class or the deployed destination ID as a `str`. 273 """ 274 if not isinstance(destination, (str, CloudDestination)): 275 raise exc.PyAirbyteInputError( 276 message="Invalid destination type.", 277 input_value=type(destination).__name__, 278 ) 279 280 api_util.delete_destination( 281 destination_id=( 282 destination if isinstance(destination, str) else destination.destination_id 283 ), 284 api_root=self.api_root, 285 client_id=self.client_id, 286 client_secret=self.client_secret, 287 ) 288 289 # Deploy and delete connections 290 291 def deploy_connection( 292 self, 293 connection_name: str, 294 *, 295 source: CloudSource | str, 296 selected_streams: list[str], 297 destination: CloudDestination | str, 298 table_prefix: str | None = None, 299 ) -> CloudConnection: 300 """Create a new connection between an already deployed source and destination. 301 302 Returns the newly deployed connection object. 303 304 Args: 305 connection_name: The name of the connection. 306 source: The deployed source. You can pass a source ID or a CloudSource object. 307 destination: The deployed destination. You can pass a destination ID or a 308 CloudDestination object. 309 table_prefix: Optional. The table prefix to use when syncing to the destination. 310 selected_streams: The selected stream names to sync within the connection. 311 """ 312 if not selected_streams: 313 raise exc.PyAirbyteInputError( 314 guidance="You must provide `selected_streams` when creating a connection." 315 ) 316 317 source_id: str = source if isinstance(source, str) else source.connector_id 318 destination_id: str = ( 319 destination if isinstance(destination, str) else destination.connector_id 320 ) 321 322 deployed_connection = api_util.create_connection( 323 name=connection_name, 324 source_id=source_id, 325 destination_id=destination_id, 326 api_root=self.api_root, 327 workspace_id=self.workspace_id, 328 selected_stream_names=selected_streams, 329 prefix=table_prefix or "", 330 client_id=self.client_id, 331 client_secret=self.client_secret, 332 ) 333 334 return CloudConnection( 335 workspace=self, 336 connection_id=deployed_connection.connection_id, 337 source=deployed_connection.source_id, 338 destination=deployed_connection.destination_id, 339 ) 340 341 def permanently_delete_connection( 342 self, 343 connection: str | CloudConnection, 344 *, 345 cascade_delete_source: bool = False, 346 cascade_delete_destination: bool = False, 347 ) -> None: 348 """Delete a deployed connection from the workspace.""" 349 if connection is None: 350 raise ValueError("No connection ID provided.") 351 352 if isinstance(connection, str): 353 connection = CloudConnection( 354 workspace=self, 355 connection_id=connection, 356 ) 357 358 api_util.delete_connection( 359 connection_id=connection.connection_id, 360 api_root=self.api_root, 361 workspace_id=self.workspace_id, 362 client_id=self.client_id, 363 client_secret=self.client_secret, 364 ) 365 366 if cascade_delete_source: 367 self.permanently_delete_source(source=connection.source_id) 368 if cascade_delete_destination: 369 self.permanently_delete_destination(destination=connection.destination_id) 370 371 # List sources, destinations, and connections 372 373 def list_connections( 374 self, 375 name: str | None = None, 376 *, 377 name_filter: Callable | None = None, 378 ) -> list[CloudConnection]: 379 """List connections by name in the workspace. 380 381 TODO: Add pagination support 382 """ 383 connections = api_util.list_connections( 384 api_root=self.api_root, 385 workspace_id=self.workspace_id, 386 name=name, 387 name_filter=name_filter, 388 client_id=self.client_id, 389 client_secret=self.client_secret, 390 ) 391 return [ 392 CloudConnection( 393 workspace=self, 394 connection_id=connection.connection_id, 395 source=None, 396 destination=None, 397 ) 398 for connection in connections 399 if name is None or connection.name == name 400 ] 401 402 def list_sources( 403 self, 404 name: str | None = None, 405 *, 406 name_filter: Callable | None = None, 407 ) -> list[CloudSource]: 408 """List all sources in the workspace. 409 410 TODO: Add pagination support 411 """ 412 sources = api_util.list_sources( 413 api_root=self.api_root, 414 workspace_id=self.workspace_id, 415 name=name, 416 name_filter=name_filter, 417 client_id=self.client_id, 418 client_secret=self.client_secret, 419 ) 420 return [ 421 CloudSource( 422 workspace=self, 423 connector_id=source.source_id, 424 ) 425 for source in sources 426 if name is None or source.name == name 427 ] 428 429 def list_destinations( 430 self, 431 name: str | None = None, 432 *, 433 name_filter: Callable | None = None, 434 ) -> list[CloudDestination]: 435 """List all destinations in the workspace. 436 437 TODO: Add pagination support 438 """ 439 destinations = api_util.list_destinations( 440 api_root=self.api_root, 441 workspace_id=self.workspace_id, 442 name=name, 443 name_filter=name_filter, 444 client_id=self.client_id, 445 client_secret=self.client_secret, 446 ) 447 return [ 448 CloudDestination( 449 workspace=self, 450 connector_id=destination.destination_id, 451 ) 452 for destination in destinations 453 if name is None or destination.name == name 454 ]
57@dataclass 58class CloudWorkspace: 59 """A remote workspace on the Airbyte Cloud. 60 61 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 62 instances, both OSS and Enterprise. 63 """ 64 65 workspace_id: str 66 client_id: SecretString 67 client_secret: SecretString 68 api_root: str = api_util.CLOUD_API_ROOT 69 70 def __post_init__(self) -> None: 71 """Ensure that the client ID and secret are handled securely.""" 72 self.client_id = SecretString(self.client_id) 73 self.client_secret = SecretString(self.client_secret) 74 75 @property 76 def workspace_url(self) -> str | None: 77 """The web URL of the workspace.""" 78 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 79 80 # Test connection and creds 81 82 def connect(self) -> None: 83 """Check that the workspace is reachable and raise an exception otherwise. 84 85 Note: It is not necessary to call this method before calling other operations. It 86 serves primarily as a simple check to ensure that the workspace is reachable 87 and credentials are correct. 88 """ 89 _ = api_util.get_workspace( 90 api_root=self.api_root, 91 workspace_id=self.workspace_id, 92 client_id=self.client_id, 93 client_secret=self.client_secret, 94 ) 95 print(f"Successfully connected to workspace: {self.workspace_url}") 96 97 # Get sources, destinations, and connections 98 99 def get_connection( 100 self, 101 connection_id: str, 102 ) -> CloudConnection: 103 """Get a connection by ID. 104 105 This method does not fetch data from the API. It returns a `CloudConnection` object, 106 which will be loaded lazily as needed. 107 """ 108 return CloudConnection( 109 workspace=self, 110 connection_id=connection_id, 111 ) 112 113 def get_source( 114 self, 115 source_id: str, 116 ) -> CloudSource: 117 """Get a source by ID. 118 119 This method does not fetch data from the API. It returns a `CloudSource` object, 120 which will be loaded lazily as needed. 121 """ 122 return CloudSource( 123 workspace=self, 124 connector_id=source_id, 125 ) 126 127 def get_destination( 128 self, 129 destination_id: str, 130 ) -> CloudDestination: 131 """Get a destination by ID. 132 133 This method does not fetch data from the API. It returns a `CloudDestination` object, 134 which will be loaded lazily as needed. 135 """ 136 return CloudDestination( 137 workspace=self, 138 connector_id=destination_id, 139 ) 140 141 # Deploy sources and destinations 142 143 def deploy_source( 144 self, 145 name: str, 146 source: Source, 147 *, 148 unique: bool = True, 149 random_name_suffix: bool = False, 150 ) -> CloudSource: 151 """Deploy a source to the workspace. 152 153 Returns the newly deployed source. 154 155 Args: 156 name: The name to use when deploying. 157 source: The source object to deploy. 158 unique: Whether to require a unique name. If `True`, duplicate names 159 are not allowed. Defaults to `True`. 160 random_name_suffix: Whether to append a random suffix to the name. 161 """ 162 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 163 source_config_dict["sourceType"] = source.name.replace("source-", "") 164 165 if random_name_suffix: 166 name += f" (ID: {text_util.generate_random_suffix()})" 167 168 if unique: 169 existing = self.list_sources(name=name) 170 if existing: 171 raise exc.AirbyteDuplicateResourcesError( 172 resource_type="source", 173 resource_name=name, 174 ) 175 176 deployed_source = api_util.create_source( 177 name=name, 178 api_root=self.api_root, 179 workspace_id=self.workspace_id, 180 config=source_config_dict, 181 client_id=self.client_id, 182 client_secret=self.client_secret, 183 ) 184 return CloudSource( 185 workspace=self, 186 connector_id=deployed_source.source_id, 187 ) 188 189 def deploy_destination( 190 self, 191 name: str, 192 destination: Destination | dict[str, Any], 193 *, 194 unique: bool = True, 195 random_name_suffix: bool = False, 196 ) -> CloudDestination: 197 """Deploy a destination to the workspace. 198 199 Returns the newly deployed destination ID. 200 201 Args: 202 name: The name to use when deploying. 203 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 204 dictionary of configuration values. 205 unique: Whether to require a unique name. If `True`, duplicate names 206 are not allowed. Defaults to `True`. 207 random_name_suffix: Whether to append a random suffix to the name. 208 """ 209 if isinstance(destination, Destination): 210 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 211 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 212 # raise ValueError(destination_conf_dict) 213 else: 214 destination_conf_dict = destination.copy() 215 if "destinationType" not in destination_conf_dict: 216 raise exc.PyAirbyteInputError( 217 message="Missing `destinationType` in configuration dictionary.", 218 ) 219 220 if random_name_suffix: 221 name += f" (ID: {text_util.generate_random_suffix()})" 222 223 if unique: 224 existing = self.list_destinations(name=name) 225 if existing: 226 raise exc.AirbyteDuplicateResourcesError( 227 resource_type="destination", 228 resource_name=name, 229 ) 230 231 deployed_destination = api_util.create_destination( 232 name=name, 233 api_root=self.api_root, 234 workspace_id=self.workspace_id, 235 config=destination_conf_dict, # Wants a dataclass but accepts dict 236 client_id=self.client_id, 237 client_secret=self.client_secret, 238 ) 239 return CloudDestination( 240 workspace=self, 241 connector_id=deployed_destination.destination_id, 242 ) 243 244 def permanently_delete_source( 245 self, 246 source: str | CloudSource, 247 ) -> None: 248 """Delete a source from the workspace. 249 250 You can pass either the source ID `str` or a deployed `Source` object. 251 """ 252 if not isinstance(source, (str, CloudSource)): 253 raise exc.PyAirbyteInputError( 254 message="Invalid source type.", 255 input_value=type(source).__name__, 256 ) 257 258 api_util.delete_source( 259 source_id=source.connector_id if isinstance(source, CloudSource) else source, 260 api_root=self.api_root, 261 client_id=self.client_id, 262 client_secret=self.client_secret, 263 ) 264 265 # Deploy and delete destinations 266 267 def permanently_delete_destination( 268 self, 269 destination: str | CloudDestination, 270 ) -> None: 271 """Delete a deployed destination from the workspace. 272 273 You can pass either the `Cache` class or the deployed destination ID as a `str`. 274 """ 275 if not isinstance(destination, (str, CloudDestination)): 276 raise exc.PyAirbyteInputError( 277 message="Invalid destination type.", 278 input_value=type(destination).__name__, 279 ) 280 281 api_util.delete_destination( 282 destination_id=( 283 destination if isinstance(destination, str) else destination.destination_id 284 ), 285 api_root=self.api_root, 286 client_id=self.client_id, 287 client_secret=self.client_secret, 288 ) 289 290 # Deploy and delete connections 291 292 def deploy_connection( 293 self, 294 connection_name: str, 295 *, 296 source: CloudSource | str, 297 selected_streams: list[str], 298 destination: CloudDestination | str, 299 table_prefix: str | None = None, 300 ) -> CloudConnection: 301 """Create a new connection between an already deployed source and destination. 302 303 Returns the newly deployed connection object. 304 305 Args: 306 connection_name: The name of the connection. 307 source: The deployed source. You can pass a source ID or a CloudSource object. 308 destination: The deployed destination. You can pass a destination ID or a 309 CloudDestination object. 310 table_prefix: Optional. The table prefix to use when syncing to the destination. 311 selected_streams: The selected stream names to sync within the connection. 312 """ 313 if not selected_streams: 314 raise exc.PyAirbyteInputError( 315 guidance="You must provide `selected_streams` when creating a connection." 316 ) 317 318 source_id: str = source if isinstance(source, str) else source.connector_id 319 destination_id: str = ( 320 destination if isinstance(destination, str) else destination.connector_id 321 ) 322 323 deployed_connection = api_util.create_connection( 324 name=connection_name, 325 source_id=source_id, 326 destination_id=destination_id, 327 api_root=self.api_root, 328 workspace_id=self.workspace_id, 329 selected_stream_names=selected_streams, 330 prefix=table_prefix or "", 331 client_id=self.client_id, 332 client_secret=self.client_secret, 333 ) 334 335 return CloudConnection( 336 workspace=self, 337 connection_id=deployed_connection.connection_id, 338 source=deployed_connection.source_id, 339 destination=deployed_connection.destination_id, 340 ) 341 342 def permanently_delete_connection( 343 self, 344 connection: str | CloudConnection, 345 *, 346 cascade_delete_source: bool = False, 347 cascade_delete_destination: bool = False, 348 ) -> None: 349 """Delete a deployed connection from the workspace.""" 350 if connection is None: 351 raise ValueError("No connection ID provided.") 352 353 if isinstance(connection, str): 354 connection = CloudConnection( 355 workspace=self, 356 connection_id=connection, 357 ) 358 359 api_util.delete_connection( 360 connection_id=connection.connection_id, 361 api_root=self.api_root, 362 workspace_id=self.workspace_id, 363 client_id=self.client_id, 364 client_secret=self.client_secret, 365 ) 366 367 if cascade_delete_source: 368 self.permanently_delete_source(source=connection.source_id) 369 if cascade_delete_destination: 370 self.permanently_delete_destination(destination=connection.destination_id) 371 372 # List sources, destinations, and connections 373 374 def list_connections( 375 self, 376 name: str | None = None, 377 *, 378 name_filter: Callable | None = None, 379 ) -> list[CloudConnection]: 380 """List connections by name in the workspace. 381 382 TODO: Add pagination support 383 """ 384 connections = api_util.list_connections( 385 api_root=self.api_root, 386 workspace_id=self.workspace_id, 387 name=name, 388 name_filter=name_filter, 389 client_id=self.client_id, 390 client_secret=self.client_secret, 391 ) 392 return [ 393 CloudConnection( 394 workspace=self, 395 connection_id=connection.connection_id, 396 source=None, 397 destination=None, 398 ) 399 for connection in connections 400 if name is None or connection.name == name 401 ] 402 403 def list_sources( 404 self, 405 name: str | None = None, 406 *, 407 name_filter: Callable | None = None, 408 ) -> list[CloudSource]: 409 """List all sources in the workspace. 410 411 TODO: Add pagination support 412 """ 413 sources = api_util.list_sources( 414 api_root=self.api_root, 415 workspace_id=self.workspace_id, 416 name=name, 417 name_filter=name_filter, 418 client_id=self.client_id, 419 client_secret=self.client_secret, 420 ) 421 return [ 422 CloudSource( 423 workspace=self, 424 connector_id=source.source_id, 425 ) 426 for source in sources 427 if name is None or source.name == name 428 ] 429 430 def list_destinations( 431 self, 432 name: str | None = None, 433 *, 434 name_filter: Callable | None = None, 435 ) -> list[CloudDestination]: 436 """List all destinations in the workspace. 437 438 TODO: Add pagination support 439 """ 440 destinations = api_util.list_destinations( 441 api_root=self.api_root, 442 workspace_id=self.workspace_id, 443 name=name, 444 name_filter=name_filter, 445 client_id=self.client_id, 446 client_secret=self.client_secret, 447 ) 448 return [ 449 CloudDestination( 450 workspace=self, 451 connector_id=destination.destination_id, 452 ) 453 for destination in destinations 454 if name is None or destination.name == name 455 ]
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
75 @property 76 def workspace_url(self) -> str | None: 77 """The web URL of the workspace.""" 78 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
82 def connect(self) -> None: 83 """Check that the workspace is reachable and raise an exception otherwise. 84 85 Note: It is not necessary to call this method before calling other operations. It 86 serves primarily as a simple check to ensure that the workspace is reachable 87 and credentials are correct. 88 """ 89 _ = api_util.get_workspace( 90 api_root=self.api_root, 91 workspace_id=self.workspace_id, 92 client_id=self.client_id, 93 client_secret=self.client_secret, 94 ) 95 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
99 def get_connection( 100 self, 101 connection_id: str, 102 ) -> CloudConnection: 103 """Get a connection by ID. 104 105 This method does not fetch data from the API. It returns a `CloudConnection` object, 106 which will be loaded lazily as needed. 107 """ 108 return CloudConnection( 109 workspace=self, 110 connection_id=connection_id, 111 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
113 def get_source( 114 self, 115 source_id: str, 116 ) -> CloudSource: 117 """Get a source by ID. 118 119 This method does not fetch data from the API. It returns a `CloudSource` object, 120 which will be loaded lazily as needed. 121 """ 122 return CloudSource( 123 workspace=self, 124 connector_id=source_id, 125 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource
object,
which will be loaded lazily as needed.
127 def get_destination( 128 self, 129 destination_id: str, 130 ) -> CloudDestination: 131 """Get a destination by ID. 132 133 This method does not fetch data from the API. It returns a `CloudDestination` object, 134 which will be loaded lazily as needed. 135 """ 136 return CloudDestination( 137 workspace=self, 138 connector_id=destination_id, 139 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination
object,
which will be loaded lazily as needed.
143 def deploy_source( 144 self, 145 name: str, 146 source: Source, 147 *, 148 unique: bool = True, 149 random_name_suffix: bool = False, 150 ) -> CloudSource: 151 """Deploy a source to the workspace. 152 153 Returns the newly deployed source. 154 155 Args: 156 name: The name to use when deploying. 157 source: The source object to deploy. 158 unique: Whether to require a unique name. If `True`, duplicate names 159 are not allowed. Defaults to `True`. 160 random_name_suffix: Whether to append a random suffix to the name. 161 """ 162 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 163 source_config_dict["sourceType"] = source.name.replace("source-", "") 164 165 if random_name_suffix: 166 name += f" (ID: {text_util.generate_random_suffix()})" 167 168 if unique: 169 existing = self.list_sources(name=name) 170 if existing: 171 raise exc.AirbyteDuplicateResourcesError( 172 resource_type="source", 173 resource_name=name, 174 ) 175 176 deployed_source = api_util.create_source( 177 name=name, 178 api_root=self.api_root, 179 workspace_id=self.workspace_id, 180 config=source_config_dict, 181 client_id=self.client_id, 182 client_secret=self.client_secret, 183 ) 184 return CloudSource( 185 workspace=self, 186 connector_id=deployed_source.source_id, 187 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
189 def deploy_destination( 190 self, 191 name: str, 192 destination: Destination | dict[str, Any], 193 *, 194 unique: bool = True, 195 random_name_suffix: bool = False, 196 ) -> CloudDestination: 197 """Deploy a destination to the workspace. 198 199 Returns the newly deployed destination ID. 200 201 Args: 202 name: The name to use when deploying. 203 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 204 dictionary of configuration values. 205 unique: Whether to require a unique name. If `True`, duplicate names 206 are not allowed. Defaults to `True`. 207 random_name_suffix: Whether to append a random suffix to the name. 208 """ 209 if isinstance(destination, Destination): 210 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 211 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 212 # raise ValueError(destination_conf_dict) 213 else: 214 destination_conf_dict = destination.copy() 215 if "destinationType" not in destination_conf_dict: 216 raise exc.PyAirbyteInputError( 217 message="Missing `destinationType` in configuration dictionary.", 218 ) 219 220 if random_name_suffix: 221 name += f" (ID: {text_util.generate_random_suffix()})" 222 223 if unique: 224 existing = self.list_destinations(name=name) 225 if existing: 226 raise exc.AirbyteDuplicateResourcesError( 227 resource_type="destination", 228 resource_name=name, 229 ) 230 231 deployed_destination = api_util.create_destination( 232 name=name, 233 api_root=self.api_root, 234 workspace_id=self.workspace_id, 235 config=destination_conf_dict, # Wants a dataclass but accepts dict 236 client_id=self.client_id, 237 client_secret=self.client_secret, 238 ) 239 return CloudDestination( 240 workspace=self, 241 connector_id=deployed_destination.destination_id, 242 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destination
object or a dictionary of configuration values. - unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
244 def permanently_delete_source( 245 self, 246 source: str | CloudSource, 247 ) -> None: 248 """Delete a source from the workspace. 249 250 You can pass either the source ID `str` or a deployed `Source` object. 251 """ 252 if not isinstance(source, (str, CloudSource)): 253 raise exc.PyAirbyteInputError( 254 message="Invalid source type.", 255 input_value=type(source).__name__, 256 ) 257 258 api_util.delete_source( 259 source_id=source.connector_id if isinstance(source, CloudSource) else source, 260 api_root=self.api_root, 261 client_id=self.client_id, 262 client_secret=self.client_secret, 263 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
267 def permanently_delete_destination( 268 self, 269 destination: str | CloudDestination, 270 ) -> None: 271 """Delete a deployed destination from the workspace. 272 273 You can pass either the `Cache` class or the deployed destination ID as a `str`. 274 """ 275 if not isinstance(destination, (str, CloudDestination)): 276 raise exc.PyAirbyteInputError( 277 message="Invalid destination type.", 278 input_value=type(destination).__name__, 279 ) 280 281 api_util.delete_destination( 282 destination_id=( 283 destination if isinstance(destination, str) else destination.destination_id 284 ), 285 api_root=self.api_root, 286 client_id=self.client_id, 287 client_secret=self.client_secret, 288 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
292 def deploy_connection( 293 self, 294 connection_name: str, 295 *, 296 source: CloudSource | str, 297 selected_streams: list[str], 298 destination: CloudDestination | str, 299 table_prefix: str | None = None, 300 ) -> CloudConnection: 301 """Create a new connection between an already deployed source and destination. 302 303 Returns the newly deployed connection object. 304 305 Args: 306 connection_name: The name of the connection. 307 source: The deployed source. You can pass a source ID or a CloudSource object. 308 destination: The deployed destination. You can pass a destination ID or a 309 CloudDestination object. 310 table_prefix: Optional. The table prefix to use when syncing to the destination. 311 selected_streams: The selected stream names to sync within the connection. 312 """ 313 if not selected_streams: 314 raise exc.PyAirbyteInputError( 315 guidance="You must provide `selected_streams` when creating a connection." 316 ) 317 318 source_id: str = source if isinstance(source, str) else source.connector_id 319 destination_id: str = ( 320 destination if isinstance(destination, str) else destination.connector_id 321 ) 322 323 deployed_connection = api_util.create_connection( 324 name=connection_name, 325 source_id=source_id, 326 destination_id=destination_id, 327 api_root=self.api_root, 328 workspace_id=self.workspace_id, 329 selected_stream_names=selected_streams, 330 prefix=table_prefix or "", 331 client_id=self.client_id, 332 client_secret=self.client_secret, 333 ) 334 335 return CloudConnection( 336 workspace=self, 337 connection_id=deployed_connection.connection_id, 338 source=deployed_connection.source_id, 339 destination=deployed_connection.destination_id, 340 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
342 def permanently_delete_connection( 343 self, 344 connection: str | CloudConnection, 345 *, 346 cascade_delete_source: bool = False, 347 cascade_delete_destination: bool = False, 348 ) -> None: 349 """Delete a deployed connection from the workspace.""" 350 if connection is None: 351 raise ValueError("No connection ID provided.") 352 353 if isinstance(connection, str): 354 connection = CloudConnection( 355 workspace=self, 356 connection_id=connection, 357 ) 358 359 api_util.delete_connection( 360 connection_id=connection.connection_id, 361 api_root=self.api_root, 362 workspace_id=self.workspace_id, 363 client_id=self.client_id, 364 client_secret=self.client_secret, 365 ) 366 367 if cascade_delete_source: 368 self.permanently_delete_source(source=connection.source_id) 369 if cascade_delete_destination: 370 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
374 def list_connections( 375 self, 376 name: str | None = None, 377 *, 378 name_filter: Callable | None = None, 379 ) -> list[CloudConnection]: 380 """List connections by name in the workspace. 381 382 TODO: Add pagination support 383 """ 384 connections = api_util.list_connections( 385 api_root=self.api_root, 386 workspace_id=self.workspace_id, 387 name=name, 388 name_filter=name_filter, 389 client_id=self.client_id, 390 client_secret=self.client_secret, 391 ) 392 return [ 393 CloudConnection( 394 workspace=self, 395 connection_id=connection.connection_id, 396 source=None, 397 destination=None, 398 ) 399 for connection in connections 400 if name is None or connection.name == name 401 ]
List connections by name in the workspace.
TODO: Add pagination support
403 def list_sources( 404 self, 405 name: str | None = None, 406 *, 407 name_filter: Callable | None = None, 408 ) -> list[CloudSource]: 409 """List all sources in the workspace. 410 411 TODO: Add pagination support 412 """ 413 sources = api_util.list_sources( 414 api_root=self.api_root, 415 workspace_id=self.workspace_id, 416 name=name, 417 name_filter=name_filter, 418 client_id=self.client_id, 419 client_secret=self.client_secret, 420 ) 421 return [ 422 CloudSource( 423 workspace=self, 424 connector_id=source.source_id, 425 ) 426 for source in sources 427 if name is None or source.name == name 428 ]
List all sources in the workspace.
TODO: Add pagination support
430 def list_destinations( 431 self, 432 name: str | None = None, 433 *, 434 name_filter: Callable | None = None, 435 ) -> list[CloudDestination]: 436 """List all destinations in the workspace. 437 438 TODO: Add pagination support 439 """ 440 destinations = api_util.list_destinations( 441 api_root=self.api_root, 442 workspace_id=self.workspace_id, 443 name=name, 444 name_filter=name_filter, 445 client_id=self.client_id, 446 client_secret=self.client_secret, 447 ) 448 return [ 449 CloudDestination( 450 workspace=self, 451 connector_id=destination.destination_id, 452 ) 453 for destination in destinations 454 if name is None or destination.name == name 455 ]
List all destinations in the workspace.
TODO: Add pagination support