airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root
, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
name="test-source",
source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass 39from typing import TYPE_CHECKING, Any 40 41from airbyte import exceptions as exc 42from airbyte._util import api_util, text_util 43from airbyte.cloud.connections import CloudConnection 44from airbyte.cloud.connectors import CloudDestination, CloudSource 45from airbyte.destinations.base import Destination 46from airbyte.secrets.base import SecretString 47 48 49if TYPE_CHECKING: 50 from collections.abc import Callable 51 52 from airbyte.sources.base import Source 53 54 55@dataclass 56class CloudWorkspace: 57 """A remote workspace on the Airbyte Cloud. 58 59 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 60 instances, both OSS and Enterprise. 61 """ 62 63 workspace_id: str 64 client_id: SecretString 65 client_secret: SecretString 66 api_root: str = api_util.CLOUD_API_ROOT 67 68 def __post_init__(self) -> None: 69 """Ensure that the client ID and secret are handled securely.""" 70 self.client_id = SecretString(self.client_id) 71 self.client_secret = SecretString(self.client_secret) 72 73 @property 74 def workspace_url(self) -> str | None: 75 """The URL of the workspace.""" 76 return f"{self.api_root}/workspaces/{self.workspace_id}" 77 78 # Test connection and creds 79 80 def connect(self) -> None: 81 """Check that the workspace is reachable and raise an exception otherwise. 82 83 Note: It is not necessary to call this method before calling other operations. It 84 serves primarily as a simple check to ensure that the workspace is reachable 85 and credentials are correct. 86 """ 87 _ = api_util.get_workspace( 88 api_root=self.api_root, 89 workspace_id=self.workspace_id, 90 client_id=self.client_id, 91 client_secret=self.client_secret, 92 ) 93 print(f"Successfully connected to workspace: {self.workspace_url}") 94 95 # Get sources, destinations, and connections 96 97 def get_connection( 98 self, 99 connection_id: str, 100 ) -> CloudConnection: 101 """Get a connection by ID. 102 103 This method does not fetch data from the API. It returns a `CloudConnection` object, 104 which will be loaded lazily as needed. 105 """ 106 return CloudConnection( 107 workspace=self, 108 connection_id=connection_id, 109 ) 110 111 def get_source( 112 self, 113 source_id: str, 114 ) -> CloudSource: 115 """Get a source by ID. 116 117 This method does not fetch data from the API. It returns a `CloudSource` object, 118 which will be loaded lazily as needed. 119 """ 120 return CloudSource( 121 workspace=self, 122 connector_id=source_id, 123 ) 124 125 def get_destination( 126 self, 127 destination_id: str, 128 ) -> CloudDestination: 129 """Get a destination by ID. 130 131 This method does not fetch data from the API. It returns a `CloudDestination` object, 132 which will be loaded lazily as needed. 133 """ 134 return CloudDestination( 135 workspace=self, 136 connector_id=destination_id, 137 ) 138 139 # Deploy sources and destinations 140 141 def deploy_source( 142 self, 143 name: str, 144 source: Source, 145 *, 146 unique: bool = True, 147 random_name_suffix: bool = False, 148 ) -> CloudSource: 149 """Deploy a source to the workspace. 150 151 Returns the newly deployed source. 152 153 Args: 154 name: The name to use when deploying. 155 source: The source object to deploy. 156 unique: Whether to require a unique name. If `True`, duplicate names 157 are not allowed. Defaults to `True`. 158 random_name_suffix: Whether to append a random suffix to the name. 159 """ 160 source_config_dict = source.get_config().copy() 161 source_config_dict["sourceType"] = source.name.replace("source-", "") 162 163 if random_name_suffix: 164 name += f" (ID: {text_util.generate_random_suffix()})" 165 166 if unique: 167 existing = self.list_sources(name=name) 168 if existing: 169 raise exc.AirbyteDuplicateResourcesError( 170 resource_type="source", 171 resource_name=name, 172 ) 173 174 deployed_source = api_util.create_source( 175 name=name, 176 api_root=self.api_root, 177 workspace_id=self.workspace_id, 178 config=source_config_dict, 179 client_id=self.client_id, 180 client_secret=self.client_secret, 181 ) 182 return CloudSource( 183 workspace=self, 184 connector_id=deployed_source.source_id, 185 ) 186 187 def deploy_destination( 188 self, 189 name: str, 190 destination: Destination | dict[str, Any], 191 *, 192 unique: bool = True, 193 random_name_suffix: bool = False, 194 ) -> CloudDestination: 195 """Deploy a destination to the workspace. 196 197 Returns the newly deployed destination ID. 198 199 Args: 200 name: The name to use when deploying. 201 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 202 dictionary of configuration values. 203 unique: Whether to require a unique name. If `True`, duplicate names 204 are not allowed. Defaults to `True`. 205 random_name_suffix: Whether to append a random suffix to the name. 206 """ 207 if isinstance(destination, Destination): 208 destination_conf_dict = destination.get_config().copy() 209 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 210 # raise ValueError(destination_conf_dict) 211 else: 212 destination_conf_dict = destination.copy() 213 if "destinationType" not in destination_conf_dict: 214 raise exc.PyAirbyteInputError( 215 message="Missing `destinationType` in configuration dictionary.", 216 ) 217 218 if random_name_suffix: 219 name += f" (ID: {text_util.generate_random_suffix()})" 220 221 if unique: 222 existing = self.list_destinations(name=name) 223 if existing: 224 raise exc.AirbyteDuplicateResourcesError( 225 resource_type="destination", 226 resource_name=name, 227 ) 228 229 deployed_destination = api_util.create_destination( 230 name=name, 231 api_root=self.api_root, 232 workspace_id=self.workspace_id, 233 config=destination_conf_dict, # Wants a dataclass but accepts dict 234 client_id=self.client_id, 235 client_secret=self.client_secret, 236 ) 237 return CloudDestination( 238 workspace=self, 239 connector_id=deployed_destination.destination_id, 240 ) 241 242 def permanently_delete_source( 243 self, 244 source: str | CloudSource, 245 ) -> None: 246 """Delete a source from the workspace. 247 248 You can pass either the source ID `str` or a deployed `Source` object. 249 """ 250 if not isinstance(source, (str, CloudSource)): 251 raise exc.PyAirbyteInputError( 252 message="Invalid source type.", 253 input_value=type(source).__name__, 254 ) 255 256 api_util.delete_source( 257 source_id=source.connector_id if isinstance(source, CloudSource) else source, 258 api_root=self.api_root, 259 client_id=self.client_id, 260 client_secret=self.client_secret, 261 ) 262 263 # Deploy and delete destinations 264 265 def permanently_delete_destination( 266 self, 267 destination: str | CloudDestination, 268 ) -> None: 269 """Delete a deployed destination from the workspace. 270 271 You can pass either the `Cache` class or the deployed destination ID as a `str`. 272 """ 273 if not isinstance(destination, (str, CloudDestination)): 274 raise exc.PyAirbyteInputError( 275 message="Invalid destination type.", 276 input_value=type(destination).__name__, 277 ) 278 279 api_util.delete_destination( 280 destination_id=( 281 destination if isinstance(destination, str) else destination.destination_id 282 ), 283 api_root=self.api_root, 284 client_id=self.client_id, 285 client_secret=self.client_secret, 286 ) 287 288 # Deploy and delete connections 289 290 def deploy_connection( 291 self, 292 connection_name: str, 293 *, 294 source: CloudSource | str, 295 selected_streams: list[str], 296 destination: CloudDestination | str, 297 table_prefix: str | None = None, 298 ) -> CloudConnection: 299 """Create a new connection between an already deployed source and destination. 300 301 Returns the newly deployed connection object. 302 303 Args: 304 connection_name: The name of the connection. 305 source: The deployed source. You can pass a source ID or a CloudSource object. 306 destination: The deployed destination. You can pass a destination ID or a 307 CloudDestination object. 308 table_prefix: Optional. The table prefix to use when syncing to the destination. 309 selected_streams: The selected stream names to sync within the connection. 310 """ 311 if not selected_streams: 312 raise exc.PyAirbyteInputError( 313 guidance="You must provide `selected_streams` when creating a connection." 314 ) 315 316 source_id: str = source if isinstance(source, str) else source.connector_id 317 destination_id: str = ( 318 destination if isinstance(destination, str) else destination.connector_id 319 ) 320 321 deployed_connection = api_util.create_connection( 322 name=connection_name, 323 source_id=source_id, 324 destination_id=destination_id, 325 api_root=self.api_root, 326 workspace_id=self.workspace_id, 327 selected_stream_names=selected_streams, 328 prefix=table_prefix or "", 329 client_id=self.client_id, 330 client_secret=self.client_secret, 331 ) 332 333 return CloudConnection( 334 workspace=self, 335 connection_id=deployed_connection.connection_id, 336 source=deployed_connection.source_id, 337 destination=deployed_connection.destination_id, 338 ) 339 340 def permanently_delete_connection( 341 self, 342 connection: str | CloudConnection, 343 *, 344 cascade_delete_source: bool = False, 345 cascade_delete_destination: bool = False, 346 ) -> None: 347 """Delete a deployed connection from the workspace.""" 348 if connection is None: 349 raise ValueError("No connection ID provided.") 350 351 if isinstance(connection, str): 352 connection = CloudConnection( 353 workspace=self, 354 connection_id=connection, 355 ) 356 357 api_util.delete_connection( 358 connection_id=connection.connection_id, 359 api_root=self.api_root, 360 workspace_id=self.workspace_id, 361 client_id=self.client_id, 362 client_secret=self.client_secret, 363 ) 364 365 if cascade_delete_source: 366 self.permanently_delete_source(source=connection.source_id) 367 if cascade_delete_destination: 368 self.permanently_delete_destination(destination=connection.destination_id) 369 370 # List sources, destinations, and connections 371 372 def list_connections( 373 self, 374 name: str | None = None, 375 *, 376 name_filter: Callable | None = None, 377 ) -> list[CloudConnection]: 378 """List connections by name in the workspace.""" 379 connections = api_util.list_connections( 380 api_root=self.api_root, 381 workspace_id=self.workspace_id, 382 name=name, 383 name_filter=name_filter, 384 client_id=self.client_id, 385 client_secret=self.client_secret, 386 ) 387 return [ 388 CloudConnection( 389 workspace=self, 390 connection_id=connection.connection_id, 391 source=None, 392 destination=None, 393 ) 394 for connection in connections 395 if name is None or connection.name == name 396 ] 397 398 def list_sources( 399 self, 400 name: str | None = None, 401 *, 402 name_filter: Callable | None = None, 403 ) -> list[CloudSource]: 404 """List all sources in the workspace.""" 405 sources = api_util.list_sources( 406 api_root=self.api_root, 407 workspace_id=self.workspace_id, 408 name=name, 409 name_filter=name_filter, 410 client_id=self.client_id, 411 client_secret=self.client_secret, 412 ) 413 return [ 414 CloudSource( 415 workspace=self, 416 connector_id=source.source_id, 417 ) 418 for source in sources 419 if name is None or source.name == name 420 ] 421 422 def list_destinations( 423 self, 424 name: str | None = None, 425 *, 426 name_filter: Callable | None = None, 427 ) -> list[CloudDestination]: 428 """List all destinations in the workspace.""" 429 destinations = api_util.list_destinations( 430 api_root=self.api_root, 431 workspace_id=self.workspace_id, 432 name=name, 433 name_filter=name_filter, 434 client_id=self.client_id, 435 client_secret=self.client_secret, 436 ) 437 return [ 438 CloudDestination( 439 workspace=self, 440 connector_id=destination.destination_id, 441 ) 442 for destination in destinations 443 if name is None or destination.name == name 444 ]
56@dataclass 57class CloudWorkspace: 58 """A remote workspace on the Airbyte Cloud. 59 60 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 61 instances, both OSS and Enterprise. 62 """ 63 64 workspace_id: str 65 client_id: SecretString 66 client_secret: SecretString 67 api_root: str = api_util.CLOUD_API_ROOT 68 69 def __post_init__(self) -> None: 70 """Ensure that the client ID and secret are handled securely.""" 71 self.client_id = SecretString(self.client_id) 72 self.client_secret = SecretString(self.client_secret) 73 74 @property 75 def workspace_url(self) -> str | None: 76 """The URL of the workspace.""" 77 return f"{self.api_root}/workspaces/{self.workspace_id}" 78 79 # Test connection and creds 80 81 def connect(self) -> None: 82 """Check that the workspace is reachable and raise an exception otherwise. 83 84 Note: It is not necessary to call this method before calling other operations. It 85 serves primarily as a simple check to ensure that the workspace is reachable 86 and credentials are correct. 87 """ 88 _ = api_util.get_workspace( 89 api_root=self.api_root, 90 workspace_id=self.workspace_id, 91 client_id=self.client_id, 92 client_secret=self.client_secret, 93 ) 94 print(f"Successfully connected to workspace: {self.workspace_url}") 95 96 # Get sources, destinations, and connections 97 98 def get_connection( 99 self, 100 connection_id: str, 101 ) -> CloudConnection: 102 """Get a connection by ID. 103 104 This method does not fetch data from the API. It returns a `CloudConnection` object, 105 which will be loaded lazily as needed. 106 """ 107 return CloudConnection( 108 workspace=self, 109 connection_id=connection_id, 110 ) 111 112 def get_source( 113 self, 114 source_id: str, 115 ) -> CloudSource: 116 """Get a source by ID. 117 118 This method does not fetch data from the API. It returns a `CloudSource` object, 119 which will be loaded lazily as needed. 120 """ 121 return CloudSource( 122 workspace=self, 123 connector_id=source_id, 124 ) 125 126 def get_destination( 127 self, 128 destination_id: str, 129 ) -> CloudDestination: 130 """Get a destination by ID. 131 132 This method does not fetch data from the API. It returns a `CloudDestination` object, 133 which will be loaded lazily as needed. 134 """ 135 return CloudDestination( 136 workspace=self, 137 connector_id=destination_id, 138 ) 139 140 # Deploy sources and destinations 141 142 def deploy_source( 143 self, 144 name: str, 145 source: Source, 146 *, 147 unique: bool = True, 148 random_name_suffix: bool = False, 149 ) -> CloudSource: 150 """Deploy a source to the workspace. 151 152 Returns the newly deployed source. 153 154 Args: 155 name: The name to use when deploying. 156 source: The source object to deploy. 157 unique: Whether to require a unique name. If `True`, duplicate names 158 are not allowed. Defaults to `True`. 159 random_name_suffix: Whether to append a random suffix to the name. 160 """ 161 source_config_dict = source.get_config().copy() 162 source_config_dict["sourceType"] = source.name.replace("source-", "") 163 164 if random_name_suffix: 165 name += f" (ID: {text_util.generate_random_suffix()})" 166 167 if unique: 168 existing = self.list_sources(name=name) 169 if existing: 170 raise exc.AirbyteDuplicateResourcesError( 171 resource_type="source", 172 resource_name=name, 173 ) 174 175 deployed_source = api_util.create_source( 176 name=name, 177 api_root=self.api_root, 178 workspace_id=self.workspace_id, 179 config=source_config_dict, 180 client_id=self.client_id, 181 client_secret=self.client_secret, 182 ) 183 return CloudSource( 184 workspace=self, 185 connector_id=deployed_source.source_id, 186 ) 187 188 def deploy_destination( 189 self, 190 name: str, 191 destination: Destination | dict[str, Any], 192 *, 193 unique: bool = True, 194 random_name_suffix: bool = False, 195 ) -> CloudDestination: 196 """Deploy a destination to the workspace. 197 198 Returns the newly deployed destination ID. 199 200 Args: 201 name: The name to use when deploying. 202 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 203 dictionary of configuration values. 204 unique: Whether to require a unique name. If `True`, duplicate names 205 are not allowed. Defaults to `True`. 206 random_name_suffix: Whether to append a random suffix to the name. 207 """ 208 if isinstance(destination, Destination): 209 destination_conf_dict = destination.get_config().copy() 210 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 211 # raise ValueError(destination_conf_dict) 212 else: 213 destination_conf_dict = destination.copy() 214 if "destinationType" not in destination_conf_dict: 215 raise exc.PyAirbyteInputError( 216 message="Missing `destinationType` in configuration dictionary.", 217 ) 218 219 if random_name_suffix: 220 name += f" (ID: {text_util.generate_random_suffix()})" 221 222 if unique: 223 existing = self.list_destinations(name=name) 224 if existing: 225 raise exc.AirbyteDuplicateResourcesError( 226 resource_type="destination", 227 resource_name=name, 228 ) 229 230 deployed_destination = api_util.create_destination( 231 name=name, 232 api_root=self.api_root, 233 workspace_id=self.workspace_id, 234 config=destination_conf_dict, # Wants a dataclass but accepts dict 235 client_id=self.client_id, 236 client_secret=self.client_secret, 237 ) 238 return CloudDestination( 239 workspace=self, 240 connector_id=deployed_destination.destination_id, 241 ) 242 243 def permanently_delete_source( 244 self, 245 source: str | CloudSource, 246 ) -> None: 247 """Delete a source from the workspace. 248 249 You can pass either the source ID `str` or a deployed `Source` object. 250 """ 251 if not isinstance(source, (str, CloudSource)): 252 raise exc.PyAirbyteInputError( 253 message="Invalid source type.", 254 input_value=type(source).__name__, 255 ) 256 257 api_util.delete_source( 258 source_id=source.connector_id if isinstance(source, CloudSource) else source, 259 api_root=self.api_root, 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 ) 263 264 # Deploy and delete destinations 265 266 def permanently_delete_destination( 267 self, 268 destination: str | CloudDestination, 269 ) -> None: 270 """Delete a deployed destination from the workspace. 271 272 You can pass either the `Cache` class or the deployed destination ID as a `str`. 273 """ 274 if not isinstance(destination, (str, CloudDestination)): 275 raise exc.PyAirbyteInputError( 276 message="Invalid destination type.", 277 input_value=type(destination).__name__, 278 ) 279 280 api_util.delete_destination( 281 destination_id=( 282 destination if isinstance(destination, str) else destination.destination_id 283 ), 284 api_root=self.api_root, 285 client_id=self.client_id, 286 client_secret=self.client_secret, 287 ) 288 289 # Deploy and delete connections 290 291 def deploy_connection( 292 self, 293 connection_name: str, 294 *, 295 source: CloudSource | str, 296 selected_streams: list[str], 297 destination: CloudDestination | str, 298 table_prefix: str | None = None, 299 ) -> CloudConnection: 300 """Create a new connection between an already deployed source and destination. 301 302 Returns the newly deployed connection object. 303 304 Args: 305 connection_name: The name of the connection. 306 source: The deployed source. You can pass a source ID or a CloudSource object. 307 destination: The deployed destination. You can pass a destination ID or a 308 CloudDestination object. 309 table_prefix: Optional. The table prefix to use when syncing to the destination. 310 selected_streams: The selected stream names to sync within the connection. 311 """ 312 if not selected_streams: 313 raise exc.PyAirbyteInputError( 314 guidance="You must provide `selected_streams` when creating a connection." 315 ) 316 317 source_id: str = source if isinstance(source, str) else source.connector_id 318 destination_id: str = ( 319 destination if isinstance(destination, str) else destination.connector_id 320 ) 321 322 deployed_connection = api_util.create_connection( 323 name=connection_name, 324 source_id=source_id, 325 destination_id=destination_id, 326 api_root=self.api_root, 327 workspace_id=self.workspace_id, 328 selected_stream_names=selected_streams, 329 prefix=table_prefix or "", 330 client_id=self.client_id, 331 client_secret=self.client_secret, 332 ) 333 334 return CloudConnection( 335 workspace=self, 336 connection_id=deployed_connection.connection_id, 337 source=deployed_connection.source_id, 338 destination=deployed_connection.destination_id, 339 ) 340 341 def permanently_delete_connection( 342 self, 343 connection: str | CloudConnection, 344 *, 345 cascade_delete_source: bool = False, 346 cascade_delete_destination: bool = False, 347 ) -> None: 348 """Delete a deployed connection from the workspace.""" 349 if connection is None: 350 raise ValueError("No connection ID provided.") 351 352 if isinstance(connection, str): 353 connection = CloudConnection( 354 workspace=self, 355 connection_id=connection, 356 ) 357 358 api_util.delete_connection( 359 connection_id=connection.connection_id, 360 api_root=self.api_root, 361 workspace_id=self.workspace_id, 362 client_id=self.client_id, 363 client_secret=self.client_secret, 364 ) 365 366 if cascade_delete_source: 367 self.permanently_delete_source(source=connection.source_id) 368 if cascade_delete_destination: 369 self.permanently_delete_destination(destination=connection.destination_id) 370 371 # List sources, destinations, and connections 372 373 def list_connections( 374 self, 375 name: str | None = None, 376 *, 377 name_filter: Callable | None = None, 378 ) -> list[CloudConnection]: 379 """List connections by name in the workspace.""" 380 connections = api_util.list_connections( 381 api_root=self.api_root, 382 workspace_id=self.workspace_id, 383 name=name, 384 name_filter=name_filter, 385 client_id=self.client_id, 386 client_secret=self.client_secret, 387 ) 388 return [ 389 CloudConnection( 390 workspace=self, 391 connection_id=connection.connection_id, 392 source=None, 393 destination=None, 394 ) 395 for connection in connections 396 if name is None or connection.name == name 397 ] 398 399 def list_sources( 400 self, 401 name: str | None = None, 402 *, 403 name_filter: Callable | None = None, 404 ) -> list[CloudSource]: 405 """List all sources in the workspace.""" 406 sources = api_util.list_sources( 407 api_root=self.api_root, 408 workspace_id=self.workspace_id, 409 name=name, 410 name_filter=name_filter, 411 client_id=self.client_id, 412 client_secret=self.client_secret, 413 ) 414 return [ 415 CloudSource( 416 workspace=self, 417 connector_id=source.source_id, 418 ) 419 for source in sources 420 if name is None or source.name == name 421 ] 422 423 def list_destinations( 424 self, 425 name: str | None = None, 426 *, 427 name_filter: Callable | None = None, 428 ) -> list[CloudDestination]: 429 """List all destinations in the workspace.""" 430 destinations = api_util.list_destinations( 431 api_root=self.api_root, 432 workspace_id=self.workspace_id, 433 name=name, 434 name_filter=name_filter, 435 client_id=self.client_id, 436 client_secret=self.client_secret, 437 ) 438 return [ 439 CloudDestination( 440 workspace=self, 441 connector_id=destination.destination_id, 442 ) 443 for destination in destinations 444 if name is None or destination.name == name 445 ]
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
74 @property 75 def workspace_url(self) -> str | None: 76 """The URL of the workspace.""" 77 return f"{self.api_root}/workspaces/{self.workspace_id}"
The URL of the workspace.
81 def connect(self) -> None: 82 """Check that the workspace is reachable and raise an exception otherwise. 83 84 Note: It is not necessary to call this method before calling other operations. It 85 serves primarily as a simple check to ensure that the workspace is reachable 86 and credentials are correct. 87 """ 88 _ = api_util.get_workspace( 89 api_root=self.api_root, 90 workspace_id=self.workspace_id, 91 client_id=self.client_id, 92 client_secret=self.client_secret, 93 ) 94 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
98 def get_connection( 99 self, 100 connection_id: str, 101 ) -> CloudConnection: 102 """Get a connection by ID. 103 104 This method does not fetch data from the API. It returns a `CloudConnection` object, 105 which will be loaded lazily as needed. 106 """ 107 return CloudConnection( 108 workspace=self, 109 connection_id=connection_id, 110 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
112 def get_source( 113 self, 114 source_id: str, 115 ) -> CloudSource: 116 """Get a source by ID. 117 118 This method does not fetch data from the API. It returns a `CloudSource` object, 119 which will be loaded lazily as needed. 120 """ 121 return CloudSource( 122 workspace=self, 123 connector_id=source_id, 124 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource
object,
which will be loaded lazily as needed.
126 def get_destination( 127 self, 128 destination_id: str, 129 ) -> CloudDestination: 130 """Get a destination by ID. 131 132 This method does not fetch data from the API. It returns a `CloudDestination` object, 133 which will be loaded lazily as needed. 134 """ 135 return CloudDestination( 136 workspace=self, 137 connector_id=destination_id, 138 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination
object,
which will be loaded lazily as needed.
142 def deploy_source( 143 self, 144 name: str, 145 source: Source, 146 *, 147 unique: bool = True, 148 random_name_suffix: bool = False, 149 ) -> CloudSource: 150 """Deploy a source to the workspace. 151 152 Returns the newly deployed source. 153 154 Args: 155 name: The name to use when deploying. 156 source: The source object to deploy. 157 unique: Whether to require a unique name. If `True`, duplicate names 158 are not allowed. Defaults to `True`. 159 random_name_suffix: Whether to append a random suffix to the name. 160 """ 161 source_config_dict = source.get_config().copy() 162 source_config_dict["sourceType"] = source.name.replace("source-", "") 163 164 if random_name_suffix: 165 name += f" (ID: {text_util.generate_random_suffix()})" 166 167 if unique: 168 existing = self.list_sources(name=name) 169 if existing: 170 raise exc.AirbyteDuplicateResourcesError( 171 resource_type="source", 172 resource_name=name, 173 ) 174 175 deployed_source = api_util.create_source( 176 name=name, 177 api_root=self.api_root, 178 workspace_id=self.workspace_id, 179 config=source_config_dict, 180 client_id=self.client_id, 181 client_secret=self.client_secret, 182 ) 183 return CloudSource( 184 workspace=self, 185 connector_id=deployed_source.source_id, 186 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
188 def deploy_destination( 189 self, 190 name: str, 191 destination: Destination | dict[str, Any], 192 *, 193 unique: bool = True, 194 random_name_suffix: bool = False, 195 ) -> CloudDestination: 196 """Deploy a destination to the workspace. 197 198 Returns the newly deployed destination ID. 199 200 Args: 201 name: The name to use when deploying. 202 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 203 dictionary of configuration values. 204 unique: Whether to require a unique name. If `True`, duplicate names 205 are not allowed. Defaults to `True`. 206 random_name_suffix: Whether to append a random suffix to the name. 207 """ 208 if isinstance(destination, Destination): 209 destination_conf_dict = destination.get_config().copy() 210 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 211 # raise ValueError(destination_conf_dict) 212 else: 213 destination_conf_dict = destination.copy() 214 if "destinationType" not in destination_conf_dict: 215 raise exc.PyAirbyteInputError( 216 message="Missing `destinationType` in configuration dictionary.", 217 ) 218 219 if random_name_suffix: 220 name += f" (ID: {text_util.generate_random_suffix()})" 221 222 if unique: 223 existing = self.list_destinations(name=name) 224 if existing: 225 raise exc.AirbyteDuplicateResourcesError( 226 resource_type="destination", 227 resource_name=name, 228 ) 229 230 deployed_destination = api_util.create_destination( 231 name=name, 232 api_root=self.api_root, 233 workspace_id=self.workspace_id, 234 config=destination_conf_dict, # Wants a dataclass but accepts dict 235 client_id=self.client_id, 236 client_secret=self.client_secret, 237 ) 238 return CloudDestination( 239 workspace=self, 240 connector_id=deployed_destination.destination_id, 241 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destination
object or a dictionary of configuration values. - unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
243 def permanently_delete_source( 244 self, 245 source: str | CloudSource, 246 ) -> None: 247 """Delete a source from the workspace. 248 249 You can pass either the source ID `str` or a deployed `Source` object. 250 """ 251 if not isinstance(source, (str, CloudSource)): 252 raise exc.PyAirbyteInputError( 253 message="Invalid source type.", 254 input_value=type(source).__name__, 255 ) 256 257 api_util.delete_source( 258 source_id=source.connector_id if isinstance(source, CloudSource) else source, 259 api_root=self.api_root, 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
266 def permanently_delete_destination( 267 self, 268 destination: str | CloudDestination, 269 ) -> None: 270 """Delete a deployed destination from the workspace. 271 272 You can pass either the `Cache` class or the deployed destination ID as a `str`. 273 """ 274 if not isinstance(destination, (str, CloudDestination)): 275 raise exc.PyAirbyteInputError( 276 message="Invalid destination type.", 277 input_value=type(destination).__name__, 278 ) 279 280 api_util.delete_destination( 281 destination_id=( 282 destination if isinstance(destination, str) else destination.destination_id 283 ), 284 api_root=self.api_root, 285 client_id=self.client_id, 286 client_secret=self.client_secret, 287 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
291 def deploy_connection( 292 self, 293 connection_name: str, 294 *, 295 source: CloudSource | str, 296 selected_streams: list[str], 297 destination: CloudDestination | str, 298 table_prefix: str | None = None, 299 ) -> CloudConnection: 300 """Create a new connection between an already deployed source and destination. 301 302 Returns the newly deployed connection object. 303 304 Args: 305 connection_name: The name of the connection. 306 source: The deployed source. You can pass a source ID or a CloudSource object. 307 destination: The deployed destination. You can pass a destination ID or a 308 CloudDestination object. 309 table_prefix: Optional. The table prefix to use when syncing to the destination. 310 selected_streams: The selected stream names to sync within the connection. 311 """ 312 if not selected_streams: 313 raise exc.PyAirbyteInputError( 314 guidance="You must provide `selected_streams` when creating a connection." 315 ) 316 317 source_id: str = source if isinstance(source, str) else source.connector_id 318 destination_id: str = ( 319 destination if isinstance(destination, str) else destination.connector_id 320 ) 321 322 deployed_connection = api_util.create_connection( 323 name=connection_name, 324 source_id=source_id, 325 destination_id=destination_id, 326 api_root=self.api_root, 327 workspace_id=self.workspace_id, 328 selected_stream_names=selected_streams, 329 prefix=table_prefix or "", 330 client_id=self.client_id, 331 client_secret=self.client_secret, 332 ) 333 334 return CloudConnection( 335 workspace=self, 336 connection_id=deployed_connection.connection_id, 337 source=deployed_connection.source_id, 338 destination=deployed_connection.destination_id, 339 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
341 def permanently_delete_connection( 342 self, 343 connection: str | CloudConnection, 344 *, 345 cascade_delete_source: bool = False, 346 cascade_delete_destination: bool = False, 347 ) -> None: 348 """Delete a deployed connection from the workspace.""" 349 if connection is None: 350 raise ValueError("No connection ID provided.") 351 352 if isinstance(connection, str): 353 connection = CloudConnection( 354 workspace=self, 355 connection_id=connection, 356 ) 357 358 api_util.delete_connection( 359 connection_id=connection.connection_id, 360 api_root=self.api_root, 361 workspace_id=self.workspace_id, 362 client_id=self.client_id, 363 client_secret=self.client_secret, 364 ) 365 366 if cascade_delete_source: 367 self.permanently_delete_source(source=connection.source_id) 368 if cascade_delete_destination: 369 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
373 def list_connections( 374 self, 375 name: str | None = None, 376 *, 377 name_filter: Callable | None = None, 378 ) -> list[CloudConnection]: 379 """List connections by name in the workspace.""" 380 connections = api_util.list_connections( 381 api_root=self.api_root, 382 workspace_id=self.workspace_id, 383 name=name, 384 name_filter=name_filter, 385 client_id=self.client_id, 386 client_secret=self.client_secret, 387 ) 388 return [ 389 CloudConnection( 390 workspace=self, 391 connection_id=connection.connection_id, 392 source=None, 393 destination=None, 394 ) 395 for connection in connections 396 if name is None or connection.name == name 397 ]
List connections by name in the workspace.
399 def list_sources( 400 self, 401 name: str | None = None, 402 *, 403 name_filter: Callable | None = None, 404 ) -> list[CloudSource]: 405 """List all sources in the workspace.""" 406 sources = api_util.list_sources( 407 api_root=self.api_root, 408 workspace_id=self.workspace_id, 409 name=name, 410 name_filter=name_filter, 411 client_id=self.client_id, 412 client_secret=self.client_secret, 413 ) 414 return [ 415 CloudSource( 416 workspace=self, 417 connector_id=source.source_id, 418 ) 419 for source in sources 420 if name is None or source.name == name 421 ]
List all sources in the workspace.
423 def list_destinations( 424 self, 425 name: str | None = None, 426 *, 427 name_filter: Callable | None = None, 428 ) -> list[CloudDestination]: 429 """List all destinations in the workspace.""" 430 destinations = api_util.list_destinations( 431 api_root=self.api_root, 432 workspace_id=self.workspace_id, 433 name=name, 434 name_filter=name_filter, 435 client_id=self.client_id, 436 client_secret=self.client_secret, 437 ) 438 return [ 439 CloudDestination( 440 workspace=self, 441 connector_id=destination.destination_id, 442 ) 443 for destination in destinations 444 if name is None or destination.name == name 445 ]
List all destinations in the workspace.