airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
name="test-source",
source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass 39from functools import cached_property 40from pathlib import Path 41from typing import TYPE_CHECKING, Any, Literal 42 43import yaml 44 45from airbyte import exceptions as exc 46from airbyte._util import api_util, text_util 47from airbyte._util.api_util import get_web_url_root 48from airbyte.cloud.connections import CloudConnection 49from airbyte.cloud.connectors import ( 50 CloudDestination, 51 CloudSource, 52 CustomCloudSourceDefinition, 53) 54from airbyte.destinations.base import Destination 55from airbyte.secrets.base import SecretString 56 57 58if TYPE_CHECKING: 59 from collections.abc import Callable 60 61 from airbyte.sources.base import Source 62 63 64@dataclass 65class CloudWorkspace: 66 """A remote workspace on the Airbyte Cloud. 67 68 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 69 instances, both OSS and Enterprise. 70 """ 71 72 workspace_id: str 73 client_id: SecretString 74 client_secret: SecretString 75 api_root: str = api_util.CLOUD_API_ROOT 76 77 def __post_init__(self) -> None: 78 """Ensure that the client ID and secret are handled securely.""" 79 self.client_id = SecretString(self.client_id) 80 self.client_secret = SecretString(self.client_secret) 81 82 @property 83 def workspace_url(self) -> str | None: 84 """The web URL of the workspace.""" 85 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 86 87 @cached_property 88 def _organization_info(self) -> dict[str, Any]: 89 """Fetch and cache organization info for this workspace. 90 91 Uses the Config API endpoint for an efficient O(1) lookup. 92 """ 93 return api_util.get_workspace_organization_info( 94 workspace_id=self.workspace_id, 95 api_root=self.api_root, 96 client_id=self.client_id, 97 client_secret=self.client_secret, 98 ) 99 100 @property 101 def organization_id(self) -> str | None: 102 """The ID of the organization this workspace belongs to. 103 104 This value is cached after the first lookup. 105 """ 106 return self._organization_info.get("organizationId") 107 108 @property 109 def organization_name(self) -> str | None: 110 """The name of the organization this workspace belongs to. 111 112 This value is cached after the first lookup. 113 """ 114 return self._organization_info.get("organizationName") 115 116 # Test connection and creds 117 118 def connect(self) -> None: 119 """Check that the workspace is reachable and raise an exception otherwise. 120 121 Note: It is not necessary to call this method before calling other operations. It 122 serves primarily as a simple check to ensure that the workspace is reachable 123 and credentials are correct. 124 """ 125 _ = api_util.get_workspace( 126 api_root=self.api_root, 127 workspace_id=self.workspace_id, 128 client_id=self.client_id, 129 client_secret=self.client_secret, 130 ) 131 print(f"Successfully connected to workspace: {self.workspace_url}") 132 133 # Get sources, destinations, and connections 134 135 def get_connection( 136 self, 137 connection_id: str, 138 ) -> CloudConnection: 139 """Get a connection by ID. 140 141 This method does not fetch data from the API. It returns a `CloudConnection` object, 142 which will be loaded lazily as needed. 143 """ 144 return CloudConnection( 145 workspace=self, 146 connection_id=connection_id, 147 ) 148 149 def get_source( 150 self, 151 source_id: str, 152 ) -> CloudSource: 153 """Get a source by ID. 154 155 This method does not fetch data from the API. It returns a `CloudSource` object, 156 which will be loaded lazily as needed. 157 """ 158 return CloudSource( 159 workspace=self, 160 connector_id=source_id, 161 ) 162 163 def get_destination( 164 self, 165 destination_id: str, 166 ) -> CloudDestination: 167 """Get a destination by ID. 168 169 This method does not fetch data from the API. It returns a `CloudDestination` object, 170 which will be loaded lazily as needed. 171 """ 172 return CloudDestination( 173 workspace=self, 174 connector_id=destination_id, 175 ) 176 177 # Deploy sources and destinations 178 179 def deploy_source( 180 self, 181 name: str, 182 source: Source, 183 *, 184 unique: bool = True, 185 random_name_suffix: bool = False, 186 ) -> CloudSource: 187 """Deploy a source to the workspace. 188 189 Returns the newly deployed source. 190 191 Args: 192 name: The name to use when deploying. 193 source: The source object to deploy. 194 unique: Whether to require a unique name. If `True`, duplicate names 195 are not allowed. Defaults to `True`. 196 random_name_suffix: Whether to append a random suffix to the name. 197 """ 198 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 199 source_config_dict["sourceType"] = source.name.replace("source-", "") 200 201 if random_name_suffix: 202 name += f" (ID: {text_util.generate_random_suffix()})" 203 204 if unique: 205 existing = self.list_sources(name=name) 206 if existing: 207 raise exc.AirbyteDuplicateResourcesError( 208 resource_type="source", 209 resource_name=name, 210 ) 211 212 deployed_source = api_util.create_source( 213 name=name, 214 api_root=self.api_root, 215 workspace_id=self.workspace_id, 216 config=source_config_dict, 217 client_id=self.client_id, 218 client_secret=self.client_secret, 219 ) 220 return CloudSource( 221 workspace=self, 222 connector_id=deployed_source.source_id, 223 ) 224 225 def deploy_destination( 226 self, 227 name: str, 228 destination: Destination | dict[str, Any], 229 *, 230 unique: bool = True, 231 random_name_suffix: bool = False, 232 ) -> CloudDestination: 233 """Deploy a destination to the workspace. 234 235 Returns the newly deployed destination ID. 236 237 Args: 238 name: The name to use when deploying. 239 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 240 dictionary of configuration values. 241 unique: Whether to require a unique name. If `True`, duplicate names 242 are not allowed. Defaults to `True`. 243 random_name_suffix: Whether to append a random suffix to the name. 244 """ 245 if isinstance(destination, Destination): 246 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 247 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 248 # raise ValueError(destination_conf_dict) 249 else: 250 destination_conf_dict = destination.copy() 251 if "destinationType" not in destination_conf_dict: 252 raise exc.PyAirbyteInputError( 253 message="Missing `destinationType` in configuration dictionary.", 254 ) 255 256 if random_name_suffix: 257 name += f" (ID: {text_util.generate_random_suffix()})" 258 259 if unique: 260 existing = self.list_destinations(name=name) 261 if existing: 262 raise exc.AirbyteDuplicateResourcesError( 263 resource_type="destination", 264 resource_name=name, 265 ) 266 267 deployed_destination = api_util.create_destination( 268 name=name, 269 api_root=self.api_root, 270 workspace_id=self.workspace_id, 271 config=destination_conf_dict, # Wants a dataclass but accepts dict 272 client_id=self.client_id, 273 client_secret=self.client_secret, 274 ) 275 return CloudDestination( 276 workspace=self, 277 connector_id=deployed_destination.destination_id, 278 ) 279 280 def permanently_delete_source( 281 self, 282 source: str | CloudSource, 283 *, 284 safe_mode: bool = True, 285 ) -> None: 286 """Delete a source from the workspace. 287 288 You can pass either the source ID `str` or a deployed `Source` object. 289 290 Args: 291 source: The source ID or CloudSource object to delete 292 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 293 (case insensitive) to prevent accidental deletion. Defaults to True. 294 """ 295 if not isinstance(source, (str, CloudSource)): 296 raise exc.PyAirbyteInputError( 297 message="Invalid source type.", 298 input_value=type(source).__name__, 299 ) 300 301 api_util.delete_source( 302 source_id=source.connector_id if isinstance(source, CloudSource) else source, 303 source_name=source.name if isinstance(source, CloudSource) else None, 304 api_root=self.api_root, 305 client_id=self.client_id, 306 client_secret=self.client_secret, 307 safe_mode=safe_mode, 308 ) 309 310 # Deploy and delete destinations 311 312 def permanently_delete_destination( 313 self, 314 destination: str | CloudDestination, 315 *, 316 safe_mode: bool = True, 317 ) -> None: 318 """Delete a deployed destination from the workspace. 319 320 You can pass either the `Cache` class or the deployed destination ID as a `str`. 321 322 Args: 323 destination: The destination ID or CloudDestination object to delete 324 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 325 (case insensitive) to prevent accidental deletion. Defaults to True. 326 """ 327 if not isinstance(destination, (str, CloudDestination)): 328 raise exc.PyAirbyteInputError( 329 message="Invalid destination type.", 330 input_value=type(destination).__name__, 331 ) 332 333 api_util.delete_destination( 334 destination_id=( 335 destination if isinstance(destination, str) else destination.destination_id 336 ), 337 destination_name=( 338 destination.name if isinstance(destination, CloudDestination) else None 339 ), 340 api_root=self.api_root, 341 client_id=self.client_id, 342 client_secret=self.client_secret, 343 safe_mode=safe_mode, 344 ) 345 346 # Deploy and delete connections 347 348 def deploy_connection( 349 self, 350 connection_name: str, 351 *, 352 source: CloudSource | str, 353 selected_streams: list[str], 354 destination: CloudDestination | str, 355 table_prefix: str | None = None, 356 ) -> CloudConnection: 357 """Create a new connection between an already deployed source and destination. 358 359 Returns the newly deployed connection object. 360 361 Args: 362 connection_name: The name of the connection. 363 source: The deployed source. You can pass a source ID or a CloudSource object. 364 destination: The deployed destination. You can pass a destination ID or a 365 CloudDestination object. 366 table_prefix: Optional. The table prefix to use when syncing to the destination. 367 selected_streams: The selected stream names to sync within the connection. 368 """ 369 if not selected_streams: 370 raise exc.PyAirbyteInputError( 371 guidance="You must provide `selected_streams` when creating a connection." 372 ) 373 374 source_id: str = source if isinstance(source, str) else source.connector_id 375 destination_id: str = ( 376 destination if isinstance(destination, str) else destination.connector_id 377 ) 378 379 deployed_connection = api_util.create_connection( 380 name=connection_name, 381 source_id=source_id, 382 destination_id=destination_id, 383 api_root=self.api_root, 384 workspace_id=self.workspace_id, 385 selected_stream_names=selected_streams, 386 prefix=table_prefix or "", 387 client_id=self.client_id, 388 client_secret=self.client_secret, 389 ) 390 391 return CloudConnection( 392 workspace=self, 393 connection_id=deployed_connection.connection_id, 394 source=deployed_connection.source_id, 395 destination=deployed_connection.destination_id, 396 ) 397 398 def permanently_delete_connection( 399 self, 400 connection: str | CloudConnection, 401 *, 402 cascade_delete_source: bool = False, 403 cascade_delete_destination: bool = False, 404 safe_mode: bool = True, 405 ) -> None: 406 """Delete a deployed connection from the workspace. 407 408 Args: 409 connection: The connection ID or CloudConnection object to delete 410 cascade_delete_source: If True, also delete the source after deleting the connection 411 cascade_delete_destination: If True, also delete the destination after deleting 412 the connection 413 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 414 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 415 to cascade deletes. 416 """ 417 if connection is None: 418 raise ValueError("No connection ID provided.") 419 420 if isinstance(connection, str): 421 connection = CloudConnection( 422 workspace=self, 423 connection_id=connection, 424 ) 425 426 api_util.delete_connection( 427 connection_id=connection.connection_id, 428 connection_name=connection.name, 429 api_root=self.api_root, 430 workspace_id=self.workspace_id, 431 client_id=self.client_id, 432 client_secret=self.client_secret, 433 safe_mode=safe_mode, 434 ) 435 436 if cascade_delete_source: 437 self.permanently_delete_source( 438 source=connection.source_id, 439 safe_mode=safe_mode, 440 ) 441 if cascade_delete_destination: 442 self.permanently_delete_destination( 443 destination=connection.destination_id, 444 safe_mode=safe_mode, 445 ) 446 447 # List sources, destinations, and connections 448 449 def list_connections( 450 self, 451 name: str | None = None, 452 *, 453 name_filter: Callable | None = None, 454 ) -> list[CloudConnection]: 455 """List connections by name in the workspace. 456 457 TODO: Add pagination support 458 """ 459 connections = api_util.list_connections( 460 api_root=self.api_root, 461 workspace_id=self.workspace_id, 462 name=name, 463 name_filter=name_filter, 464 client_id=self.client_id, 465 client_secret=self.client_secret, 466 ) 467 return [ 468 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 469 workspace=self, 470 connection_response=connection, 471 ) 472 for connection in connections 473 if name is None or connection.name == name 474 ] 475 476 def list_sources( 477 self, 478 name: str | None = None, 479 *, 480 name_filter: Callable | None = None, 481 ) -> list[CloudSource]: 482 """List all sources in the workspace. 483 484 TODO: Add pagination support 485 """ 486 sources = api_util.list_sources( 487 api_root=self.api_root, 488 workspace_id=self.workspace_id, 489 name=name, 490 name_filter=name_filter, 491 client_id=self.client_id, 492 client_secret=self.client_secret, 493 ) 494 return [ 495 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 496 workspace=self, 497 source_response=source, 498 ) 499 for source in sources 500 if name is None or source.name == name 501 ] 502 503 def list_destinations( 504 self, 505 name: str | None = None, 506 *, 507 name_filter: Callable | None = None, 508 ) -> list[CloudDestination]: 509 """List all destinations in the workspace. 510 511 TODO: Add pagination support 512 """ 513 destinations = api_util.list_destinations( 514 api_root=self.api_root, 515 workspace_id=self.workspace_id, 516 name=name, 517 name_filter=name_filter, 518 client_id=self.client_id, 519 client_secret=self.client_secret, 520 ) 521 return [ 522 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 523 workspace=self, 524 destination_response=destination, 525 ) 526 for destination in destinations 527 if name is None or destination.name == name 528 ] 529 530 def publish_custom_source_definition( 531 self, 532 name: str, 533 *, 534 manifest_yaml: dict[str, Any] | Path | str | None = None, 535 docker_image: str | None = None, 536 docker_tag: str | None = None, 537 unique: bool = True, 538 pre_validate: bool = True, 539 testing_values: dict[str, Any] | None = None, 540 ) -> CustomCloudSourceDefinition: 541 """Publish a custom source connector definition. 542 543 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 544 and docker_tag (for Docker connectors), but not both. 545 546 Args: 547 name: Display name for the connector definition 548 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 549 docker_image: Docker repository (e.g., 'airbyte/source-custom') 550 docker_tag: Docker image tag (e.g., '1.0.0') 551 unique: Whether to enforce name uniqueness 552 pre_validate: Whether to validate manifest client-side (YAML only) 553 testing_values: Optional configuration values to use for testing in the 554 Connector Builder UI. If provided, these values are stored as the complete 555 testing values object for the connector builder project (replaces any existing 556 values), allowing immediate test read operations. 557 558 Returns: 559 CustomCloudSourceDefinition object representing the created definition 560 561 Raises: 562 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 563 AirbyteDuplicateResourcesError: If unique=True and name already exists 564 """ 565 is_yaml = manifest_yaml is not None 566 is_docker = docker_image is not None 567 568 if is_yaml == is_docker: 569 raise exc.PyAirbyteInputError( 570 message=( 571 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 572 "docker_image + docker_tag (for Docker connectors), but not both" 573 ), 574 context={ 575 "manifest_yaml_provided": is_yaml, 576 "docker_image_provided": is_docker, 577 }, 578 ) 579 580 if is_docker and docker_tag is None: 581 raise exc.PyAirbyteInputError( 582 message="docker_tag is required when docker_image is specified", 583 context={"docker_image": docker_image}, 584 ) 585 586 if unique: 587 existing = self.list_custom_source_definitions( 588 definition_type="yaml" if is_yaml else "docker", 589 ) 590 if any(d.name == name for d in existing): 591 raise exc.AirbyteDuplicateResourcesError( 592 resource_type="custom_source_definition", 593 resource_name=name, 594 ) 595 596 if is_yaml: 597 manifest_dict: dict[str, Any] 598 if isinstance(manifest_yaml, Path): 599 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 600 elif isinstance(manifest_yaml, str): 601 manifest_dict = yaml.safe_load(manifest_yaml) 602 elif manifest_yaml is not None: 603 manifest_dict = manifest_yaml 604 else: 605 raise exc.PyAirbyteInputError( 606 message="manifest_yaml is required for YAML connectors", 607 context={"name": name}, 608 ) 609 610 if pre_validate: 611 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 612 613 result = api_util.create_custom_yaml_source_definition( 614 name=name, 615 workspace_id=self.workspace_id, 616 manifest=manifest_dict, 617 api_root=self.api_root, 618 client_id=self.client_id, 619 client_secret=self.client_secret, 620 ) 621 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 622 self, result 623 ) 624 625 # Set testing values if provided 626 if testing_values is not None: 627 custom_definition.set_testing_values(testing_values) 628 629 return custom_definition 630 631 raise NotImplementedError( 632 "Docker custom source definitions are not yet supported. " 633 "Only YAML manifest-based custom sources are currently available." 634 ) 635 636 def list_custom_source_definitions( 637 self, 638 *, 639 definition_type: Literal["yaml", "docker"], 640 ) -> list[CustomCloudSourceDefinition]: 641 """List custom source connector definitions. 642 643 Args: 644 definition_type: Connector type to list ("yaml" or "docker"). Required. 645 646 Returns: 647 List of CustomCloudSourceDefinition objects matching the specified type 648 """ 649 if definition_type == "yaml": 650 yaml_definitions = api_util.list_custom_yaml_source_definitions( 651 workspace_id=self.workspace_id, 652 api_root=self.api_root, 653 client_id=self.client_id, 654 client_secret=self.client_secret, 655 ) 656 return [ 657 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 658 for d in yaml_definitions 659 ] 660 661 raise NotImplementedError( 662 "Docker custom source definitions are not yet supported. " 663 "Only YAML manifest-based custom sources are currently available." 664 ) 665 666 def get_custom_source_definition( 667 self, 668 definition_id: str, 669 *, 670 definition_type: Literal["yaml", "docker"], 671 ) -> CustomCloudSourceDefinition: 672 """Get a specific custom source definition by ID. 673 674 Args: 675 definition_id: The definition ID 676 definition_type: Connector type ("yaml" or "docker"). Required. 677 678 Returns: 679 CustomCloudSourceDefinition object 680 """ 681 if definition_type == "yaml": 682 result = api_util.get_custom_yaml_source_definition( 683 workspace_id=self.workspace_id, 684 definition_id=definition_id, 685 api_root=self.api_root, 686 client_id=self.client_id, 687 client_secret=self.client_secret, 688 ) 689 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 690 691 raise NotImplementedError( 692 "Docker custom source definitions are not yet supported. " 693 "Only YAML manifest-based custom sources are currently available." 694 )
65@dataclass 66class CloudWorkspace: 67 """A remote workspace on the Airbyte Cloud. 68 69 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 70 instances, both OSS and Enterprise. 71 """ 72 73 workspace_id: str 74 client_id: SecretString 75 client_secret: SecretString 76 api_root: str = api_util.CLOUD_API_ROOT 77 78 def __post_init__(self) -> None: 79 """Ensure that the client ID and secret are handled securely.""" 80 self.client_id = SecretString(self.client_id) 81 self.client_secret = SecretString(self.client_secret) 82 83 @property 84 def workspace_url(self) -> str | None: 85 """The web URL of the workspace.""" 86 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 87 88 @cached_property 89 def _organization_info(self) -> dict[str, Any]: 90 """Fetch and cache organization info for this workspace. 91 92 Uses the Config API endpoint for an efficient O(1) lookup. 93 """ 94 return api_util.get_workspace_organization_info( 95 workspace_id=self.workspace_id, 96 api_root=self.api_root, 97 client_id=self.client_id, 98 client_secret=self.client_secret, 99 ) 100 101 @property 102 def organization_id(self) -> str | None: 103 """The ID of the organization this workspace belongs to. 104 105 This value is cached after the first lookup. 106 """ 107 return self._organization_info.get("organizationId") 108 109 @property 110 def organization_name(self) -> str | None: 111 """The name of the organization this workspace belongs to. 112 113 This value is cached after the first lookup. 114 """ 115 return self._organization_info.get("organizationName") 116 117 # Test connection and creds 118 119 def connect(self) -> None: 120 """Check that the workspace is reachable and raise an exception otherwise. 121 122 Note: It is not necessary to call this method before calling other operations. It 123 serves primarily as a simple check to ensure that the workspace is reachable 124 and credentials are correct. 125 """ 126 _ = api_util.get_workspace( 127 api_root=self.api_root, 128 workspace_id=self.workspace_id, 129 client_id=self.client_id, 130 client_secret=self.client_secret, 131 ) 132 print(f"Successfully connected to workspace: {self.workspace_url}") 133 134 # Get sources, destinations, and connections 135 136 def get_connection( 137 self, 138 connection_id: str, 139 ) -> CloudConnection: 140 """Get a connection by ID. 141 142 This method does not fetch data from the API. It returns a `CloudConnection` object, 143 which will be loaded lazily as needed. 144 """ 145 return CloudConnection( 146 workspace=self, 147 connection_id=connection_id, 148 ) 149 150 def get_source( 151 self, 152 source_id: str, 153 ) -> CloudSource: 154 """Get a source by ID. 155 156 This method does not fetch data from the API. It returns a `CloudSource` object, 157 which will be loaded lazily as needed. 158 """ 159 return CloudSource( 160 workspace=self, 161 connector_id=source_id, 162 ) 163 164 def get_destination( 165 self, 166 destination_id: str, 167 ) -> CloudDestination: 168 """Get a destination by ID. 169 170 This method does not fetch data from the API. It returns a `CloudDestination` object, 171 which will be loaded lazily as needed. 172 """ 173 return CloudDestination( 174 workspace=self, 175 connector_id=destination_id, 176 ) 177 178 # Deploy sources and destinations 179 180 def deploy_source( 181 self, 182 name: str, 183 source: Source, 184 *, 185 unique: bool = True, 186 random_name_suffix: bool = False, 187 ) -> CloudSource: 188 """Deploy a source to the workspace. 189 190 Returns the newly deployed source. 191 192 Args: 193 name: The name to use when deploying. 194 source: The source object to deploy. 195 unique: Whether to require a unique name. If `True`, duplicate names 196 are not allowed. Defaults to `True`. 197 random_name_suffix: Whether to append a random suffix to the name. 198 """ 199 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 200 source_config_dict["sourceType"] = source.name.replace("source-", "") 201 202 if random_name_suffix: 203 name += f" (ID: {text_util.generate_random_suffix()})" 204 205 if unique: 206 existing = self.list_sources(name=name) 207 if existing: 208 raise exc.AirbyteDuplicateResourcesError( 209 resource_type="source", 210 resource_name=name, 211 ) 212 213 deployed_source = api_util.create_source( 214 name=name, 215 api_root=self.api_root, 216 workspace_id=self.workspace_id, 217 config=source_config_dict, 218 client_id=self.client_id, 219 client_secret=self.client_secret, 220 ) 221 return CloudSource( 222 workspace=self, 223 connector_id=deployed_source.source_id, 224 ) 225 226 def deploy_destination( 227 self, 228 name: str, 229 destination: Destination | dict[str, Any], 230 *, 231 unique: bool = True, 232 random_name_suffix: bool = False, 233 ) -> CloudDestination: 234 """Deploy a destination to the workspace. 235 236 Returns the newly deployed destination ID. 237 238 Args: 239 name: The name to use when deploying. 240 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 241 dictionary of configuration values. 242 unique: Whether to require a unique name. If `True`, duplicate names 243 are not allowed. Defaults to `True`. 244 random_name_suffix: Whether to append a random suffix to the name. 245 """ 246 if isinstance(destination, Destination): 247 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 248 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 249 # raise ValueError(destination_conf_dict) 250 else: 251 destination_conf_dict = destination.copy() 252 if "destinationType" not in destination_conf_dict: 253 raise exc.PyAirbyteInputError( 254 message="Missing `destinationType` in configuration dictionary.", 255 ) 256 257 if random_name_suffix: 258 name += f" (ID: {text_util.generate_random_suffix()})" 259 260 if unique: 261 existing = self.list_destinations(name=name) 262 if existing: 263 raise exc.AirbyteDuplicateResourcesError( 264 resource_type="destination", 265 resource_name=name, 266 ) 267 268 deployed_destination = api_util.create_destination( 269 name=name, 270 api_root=self.api_root, 271 workspace_id=self.workspace_id, 272 config=destination_conf_dict, # Wants a dataclass but accepts dict 273 client_id=self.client_id, 274 client_secret=self.client_secret, 275 ) 276 return CloudDestination( 277 workspace=self, 278 connector_id=deployed_destination.destination_id, 279 ) 280 281 def permanently_delete_source( 282 self, 283 source: str | CloudSource, 284 *, 285 safe_mode: bool = True, 286 ) -> None: 287 """Delete a source from the workspace. 288 289 You can pass either the source ID `str` or a deployed `Source` object. 290 291 Args: 292 source: The source ID or CloudSource object to delete 293 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 294 (case insensitive) to prevent accidental deletion. Defaults to True. 295 """ 296 if not isinstance(source, (str, CloudSource)): 297 raise exc.PyAirbyteInputError( 298 message="Invalid source type.", 299 input_value=type(source).__name__, 300 ) 301 302 api_util.delete_source( 303 source_id=source.connector_id if isinstance(source, CloudSource) else source, 304 source_name=source.name if isinstance(source, CloudSource) else None, 305 api_root=self.api_root, 306 client_id=self.client_id, 307 client_secret=self.client_secret, 308 safe_mode=safe_mode, 309 ) 310 311 # Deploy and delete destinations 312 313 def permanently_delete_destination( 314 self, 315 destination: str | CloudDestination, 316 *, 317 safe_mode: bool = True, 318 ) -> None: 319 """Delete a deployed destination from the workspace. 320 321 You can pass either the `Cache` class or the deployed destination ID as a `str`. 322 323 Args: 324 destination: The destination ID or CloudDestination object to delete 325 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 326 (case insensitive) to prevent accidental deletion. Defaults to True. 327 """ 328 if not isinstance(destination, (str, CloudDestination)): 329 raise exc.PyAirbyteInputError( 330 message="Invalid destination type.", 331 input_value=type(destination).__name__, 332 ) 333 334 api_util.delete_destination( 335 destination_id=( 336 destination if isinstance(destination, str) else destination.destination_id 337 ), 338 destination_name=( 339 destination.name if isinstance(destination, CloudDestination) else None 340 ), 341 api_root=self.api_root, 342 client_id=self.client_id, 343 client_secret=self.client_secret, 344 safe_mode=safe_mode, 345 ) 346 347 # Deploy and delete connections 348 349 def deploy_connection( 350 self, 351 connection_name: str, 352 *, 353 source: CloudSource | str, 354 selected_streams: list[str], 355 destination: CloudDestination | str, 356 table_prefix: str | None = None, 357 ) -> CloudConnection: 358 """Create a new connection between an already deployed source and destination. 359 360 Returns the newly deployed connection object. 361 362 Args: 363 connection_name: The name of the connection. 364 source: The deployed source. You can pass a source ID or a CloudSource object. 365 destination: The deployed destination. You can pass a destination ID or a 366 CloudDestination object. 367 table_prefix: Optional. The table prefix to use when syncing to the destination. 368 selected_streams: The selected stream names to sync within the connection. 369 """ 370 if not selected_streams: 371 raise exc.PyAirbyteInputError( 372 guidance="You must provide `selected_streams` when creating a connection." 373 ) 374 375 source_id: str = source if isinstance(source, str) else source.connector_id 376 destination_id: str = ( 377 destination if isinstance(destination, str) else destination.connector_id 378 ) 379 380 deployed_connection = api_util.create_connection( 381 name=connection_name, 382 source_id=source_id, 383 destination_id=destination_id, 384 api_root=self.api_root, 385 workspace_id=self.workspace_id, 386 selected_stream_names=selected_streams, 387 prefix=table_prefix or "", 388 client_id=self.client_id, 389 client_secret=self.client_secret, 390 ) 391 392 return CloudConnection( 393 workspace=self, 394 connection_id=deployed_connection.connection_id, 395 source=deployed_connection.source_id, 396 destination=deployed_connection.destination_id, 397 ) 398 399 def permanently_delete_connection( 400 self, 401 connection: str | CloudConnection, 402 *, 403 cascade_delete_source: bool = False, 404 cascade_delete_destination: bool = False, 405 safe_mode: bool = True, 406 ) -> None: 407 """Delete a deployed connection from the workspace. 408 409 Args: 410 connection: The connection ID or CloudConnection object to delete 411 cascade_delete_source: If True, also delete the source after deleting the connection 412 cascade_delete_destination: If True, also delete the destination after deleting 413 the connection 414 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 415 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 416 to cascade deletes. 417 """ 418 if connection is None: 419 raise ValueError("No connection ID provided.") 420 421 if isinstance(connection, str): 422 connection = CloudConnection( 423 workspace=self, 424 connection_id=connection, 425 ) 426 427 api_util.delete_connection( 428 connection_id=connection.connection_id, 429 connection_name=connection.name, 430 api_root=self.api_root, 431 workspace_id=self.workspace_id, 432 client_id=self.client_id, 433 client_secret=self.client_secret, 434 safe_mode=safe_mode, 435 ) 436 437 if cascade_delete_source: 438 self.permanently_delete_source( 439 source=connection.source_id, 440 safe_mode=safe_mode, 441 ) 442 if cascade_delete_destination: 443 self.permanently_delete_destination( 444 destination=connection.destination_id, 445 safe_mode=safe_mode, 446 ) 447 448 # List sources, destinations, and connections 449 450 def list_connections( 451 self, 452 name: str | None = None, 453 *, 454 name_filter: Callable | None = None, 455 ) -> list[CloudConnection]: 456 """List connections by name in the workspace. 457 458 TODO: Add pagination support 459 """ 460 connections = api_util.list_connections( 461 api_root=self.api_root, 462 workspace_id=self.workspace_id, 463 name=name, 464 name_filter=name_filter, 465 client_id=self.client_id, 466 client_secret=self.client_secret, 467 ) 468 return [ 469 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 470 workspace=self, 471 connection_response=connection, 472 ) 473 for connection in connections 474 if name is None or connection.name == name 475 ] 476 477 def list_sources( 478 self, 479 name: str | None = None, 480 *, 481 name_filter: Callable | None = None, 482 ) -> list[CloudSource]: 483 """List all sources in the workspace. 484 485 TODO: Add pagination support 486 """ 487 sources = api_util.list_sources( 488 api_root=self.api_root, 489 workspace_id=self.workspace_id, 490 name=name, 491 name_filter=name_filter, 492 client_id=self.client_id, 493 client_secret=self.client_secret, 494 ) 495 return [ 496 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 497 workspace=self, 498 source_response=source, 499 ) 500 for source in sources 501 if name is None or source.name == name 502 ] 503 504 def list_destinations( 505 self, 506 name: str | None = None, 507 *, 508 name_filter: Callable | None = None, 509 ) -> list[CloudDestination]: 510 """List all destinations in the workspace. 511 512 TODO: Add pagination support 513 """ 514 destinations = api_util.list_destinations( 515 api_root=self.api_root, 516 workspace_id=self.workspace_id, 517 name=name, 518 name_filter=name_filter, 519 client_id=self.client_id, 520 client_secret=self.client_secret, 521 ) 522 return [ 523 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 524 workspace=self, 525 destination_response=destination, 526 ) 527 for destination in destinations 528 if name is None or destination.name == name 529 ] 530 531 def publish_custom_source_definition( 532 self, 533 name: str, 534 *, 535 manifest_yaml: dict[str, Any] | Path | str | None = None, 536 docker_image: str | None = None, 537 docker_tag: str | None = None, 538 unique: bool = True, 539 pre_validate: bool = True, 540 testing_values: dict[str, Any] | None = None, 541 ) -> CustomCloudSourceDefinition: 542 """Publish a custom source connector definition. 543 544 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 545 and docker_tag (for Docker connectors), but not both. 546 547 Args: 548 name: Display name for the connector definition 549 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 550 docker_image: Docker repository (e.g., 'airbyte/source-custom') 551 docker_tag: Docker image tag (e.g., '1.0.0') 552 unique: Whether to enforce name uniqueness 553 pre_validate: Whether to validate manifest client-side (YAML only) 554 testing_values: Optional configuration values to use for testing in the 555 Connector Builder UI. If provided, these values are stored as the complete 556 testing values object for the connector builder project (replaces any existing 557 values), allowing immediate test read operations. 558 559 Returns: 560 CustomCloudSourceDefinition object representing the created definition 561 562 Raises: 563 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 564 AirbyteDuplicateResourcesError: If unique=True and name already exists 565 """ 566 is_yaml = manifest_yaml is not None 567 is_docker = docker_image is not None 568 569 if is_yaml == is_docker: 570 raise exc.PyAirbyteInputError( 571 message=( 572 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 573 "docker_image + docker_tag (for Docker connectors), but not both" 574 ), 575 context={ 576 "manifest_yaml_provided": is_yaml, 577 "docker_image_provided": is_docker, 578 }, 579 ) 580 581 if is_docker and docker_tag is None: 582 raise exc.PyAirbyteInputError( 583 message="docker_tag is required when docker_image is specified", 584 context={"docker_image": docker_image}, 585 ) 586 587 if unique: 588 existing = self.list_custom_source_definitions( 589 definition_type="yaml" if is_yaml else "docker", 590 ) 591 if any(d.name == name for d in existing): 592 raise exc.AirbyteDuplicateResourcesError( 593 resource_type="custom_source_definition", 594 resource_name=name, 595 ) 596 597 if is_yaml: 598 manifest_dict: dict[str, Any] 599 if isinstance(manifest_yaml, Path): 600 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 601 elif isinstance(manifest_yaml, str): 602 manifest_dict = yaml.safe_load(manifest_yaml) 603 elif manifest_yaml is not None: 604 manifest_dict = manifest_yaml 605 else: 606 raise exc.PyAirbyteInputError( 607 message="manifest_yaml is required for YAML connectors", 608 context={"name": name}, 609 ) 610 611 if pre_validate: 612 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 613 614 result = api_util.create_custom_yaml_source_definition( 615 name=name, 616 workspace_id=self.workspace_id, 617 manifest=manifest_dict, 618 api_root=self.api_root, 619 client_id=self.client_id, 620 client_secret=self.client_secret, 621 ) 622 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 623 self, result 624 ) 625 626 # Set testing values if provided 627 if testing_values is not None: 628 custom_definition.set_testing_values(testing_values) 629 630 return custom_definition 631 632 raise NotImplementedError( 633 "Docker custom source definitions are not yet supported. " 634 "Only YAML manifest-based custom sources are currently available." 635 ) 636 637 def list_custom_source_definitions( 638 self, 639 *, 640 definition_type: Literal["yaml", "docker"], 641 ) -> list[CustomCloudSourceDefinition]: 642 """List custom source connector definitions. 643 644 Args: 645 definition_type: Connector type to list ("yaml" or "docker"). Required. 646 647 Returns: 648 List of CustomCloudSourceDefinition objects matching the specified type 649 """ 650 if definition_type == "yaml": 651 yaml_definitions = api_util.list_custom_yaml_source_definitions( 652 workspace_id=self.workspace_id, 653 api_root=self.api_root, 654 client_id=self.client_id, 655 client_secret=self.client_secret, 656 ) 657 return [ 658 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 659 for d in yaml_definitions 660 ] 661 662 raise NotImplementedError( 663 "Docker custom source definitions are not yet supported. " 664 "Only YAML manifest-based custom sources are currently available." 665 ) 666 667 def get_custom_source_definition( 668 self, 669 definition_id: str, 670 *, 671 definition_type: Literal["yaml", "docker"], 672 ) -> CustomCloudSourceDefinition: 673 """Get a specific custom source definition by ID. 674 675 Args: 676 definition_id: The definition ID 677 definition_type: Connector type ("yaml" or "docker"). Required. 678 679 Returns: 680 CustomCloudSourceDefinition object 681 """ 682 if definition_type == "yaml": 683 result = api_util.get_custom_yaml_source_definition( 684 workspace_id=self.workspace_id, 685 definition_id=definition_id, 686 api_root=self.api_root, 687 client_id=self.client_id, 688 client_secret=self.client_secret, 689 ) 690 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 691 692 raise NotImplementedError( 693 "Docker custom source definitions are not yet supported. " 694 "Only YAML manifest-based custom sources are currently available." 695 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
83 @property 84 def workspace_url(self) -> str | None: 85 """The web URL of the workspace.""" 86 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
101 @property 102 def organization_id(self) -> str | None: 103 """The ID of the organization this workspace belongs to. 104 105 This value is cached after the first lookup. 106 """ 107 return self._organization_info.get("organizationId")
The ID of the organization this workspace belongs to.
This value is cached after the first lookup.
109 @property 110 def organization_name(self) -> str | None: 111 """The name of the organization this workspace belongs to. 112 113 This value is cached after the first lookup. 114 """ 115 return self._organization_info.get("organizationName")
The name of the organization this workspace belongs to.
This value is cached after the first lookup.
119 def connect(self) -> None: 120 """Check that the workspace is reachable and raise an exception otherwise. 121 122 Note: It is not necessary to call this method before calling other operations. It 123 serves primarily as a simple check to ensure that the workspace is reachable 124 and credentials are correct. 125 """ 126 _ = api_util.get_workspace( 127 api_root=self.api_root, 128 workspace_id=self.workspace_id, 129 client_id=self.client_id, 130 client_secret=self.client_secret, 131 ) 132 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
136 def get_connection( 137 self, 138 connection_id: str, 139 ) -> CloudConnection: 140 """Get a connection by ID. 141 142 This method does not fetch data from the API. It returns a `CloudConnection` object, 143 which will be loaded lazily as needed. 144 """ 145 return CloudConnection( 146 workspace=self, 147 connection_id=connection_id, 148 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
150 def get_source( 151 self, 152 source_id: str, 153 ) -> CloudSource: 154 """Get a source by ID. 155 156 This method does not fetch data from the API. It returns a `CloudSource` object, 157 which will be loaded lazily as needed. 158 """ 159 return CloudSource( 160 workspace=self, 161 connector_id=source_id, 162 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
164 def get_destination( 165 self, 166 destination_id: str, 167 ) -> CloudDestination: 168 """Get a destination by ID. 169 170 This method does not fetch data from the API. It returns a `CloudDestination` object, 171 which will be loaded lazily as needed. 172 """ 173 return CloudDestination( 174 workspace=self, 175 connector_id=destination_id, 176 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
180 def deploy_source( 181 self, 182 name: str, 183 source: Source, 184 *, 185 unique: bool = True, 186 random_name_suffix: bool = False, 187 ) -> CloudSource: 188 """Deploy a source to the workspace. 189 190 Returns the newly deployed source. 191 192 Args: 193 name: The name to use when deploying. 194 source: The source object to deploy. 195 unique: Whether to require a unique name. If `True`, duplicate names 196 are not allowed. Defaults to `True`. 197 random_name_suffix: Whether to append a random suffix to the name. 198 """ 199 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 200 source_config_dict["sourceType"] = source.name.replace("source-", "") 201 202 if random_name_suffix: 203 name += f" (ID: {text_util.generate_random_suffix()})" 204 205 if unique: 206 existing = self.list_sources(name=name) 207 if existing: 208 raise exc.AirbyteDuplicateResourcesError( 209 resource_type="source", 210 resource_name=name, 211 ) 212 213 deployed_source = api_util.create_source( 214 name=name, 215 api_root=self.api_root, 216 workspace_id=self.workspace_id, 217 config=source_config_dict, 218 client_id=self.client_id, 219 client_secret=self.client_secret, 220 ) 221 return CloudSource( 222 workspace=self, 223 connector_id=deployed_source.source_id, 224 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
226 def deploy_destination( 227 self, 228 name: str, 229 destination: Destination | dict[str, Any], 230 *, 231 unique: bool = True, 232 random_name_suffix: bool = False, 233 ) -> CloudDestination: 234 """Deploy a destination to the workspace. 235 236 Returns the newly deployed destination ID. 237 238 Args: 239 name: The name to use when deploying. 240 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 241 dictionary of configuration values. 242 unique: Whether to require a unique name. If `True`, duplicate names 243 are not allowed. Defaults to `True`. 244 random_name_suffix: Whether to append a random suffix to the name. 245 """ 246 if isinstance(destination, Destination): 247 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 248 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 249 # raise ValueError(destination_conf_dict) 250 else: 251 destination_conf_dict = destination.copy() 252 if "destinationType" not in destination_conf_dict: 253 raise exc.PyAirbyteInputError( 254 message="Missing `destinationType` in configuration dictionary.", 255 ) 256 257 if random_name_suffix: 258 name += f" (ID: {text_util.generate_random_suffix()})" 259 260 if unique: 261 existing = self.list_destinations(name=name) 262 if existing: 263 raise exc.AirbyteDuplicateResourcesError( 264 resource_type="destination", 265 resource_name=name, 266 ) 267 268 deployed_destination = api_util.create_destination( 269 name=name, 270 api_root=self.api_root, 271 workspace_id=self.workspace_id, 272 config=destination_conf_dict, # Wants a dataclass but accepts dict 273 client_id=self.client_id, 274 client_secret=self.client_secret, 275 ) 276 return CloudDestination( 277 workspace=self, 278 connector_id=deployed_destination.destination_id, 279 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
281 def permanently_delete_source( 282 self, 283 source: str | CloudSource, 284 *, 285 safe_mode: bool = True, 286 ) -> None: 287 """Delete a source from the workspace. 288 289 You can pass either the source ID `str` or a deployed `Source` object. 290 291 Args: 292 source: The source ID or CloudSource object to delete 293 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 294 (case insensitive) to prevent accidental deletion. Defaults to True. 295 """ 296 if not isinstance(source, (str, CloudSource)): 297 raise exc.PyAirbyteInputError( 298 message="Invalid source type.", 299 input_value=type(source).__name__, 300 ) 301 302 api_util.delete_source( 303 source_id=source.connector_id if isinstance(source, CloudSource) else source, 304 source_name=source.name if isinstance(source, CloudSource) else None, 305 api_root=self.api_root, 306 client_id=self.client_id, 307 client_secret=self.client_secret, 308 safe_mode=safe_mode, 309 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
313 def permanently_delete_destination( 314 self, 315 destination: str | CloudDestination, 316 *, 317 safe_mode: bool = True, 318 ) -> None: 319 """Delete a deployed destination from the workspace. 320 321 You can pass either the `Cache` class or the deployed destination ID as a `str`. 322 323 Args: 324 destination: The destination ID or CloudDestination object to delete 325 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 326 (case insensitive) to prevent accidental deletion. Defaults to True. 327 """ 328 if not isinstance(destination, (str, CloudDestination)): 329 raise exc.PyAirbyteInputError( 330 message="Invalid destination type.", 331 input_value=type(destination).__name__, 332 ) 333 334 api_util.delete_destination( 335 destination_id=( 336 destination if isinstance(destination, str) else destination.destination_id 337 ), 338 destination_name=( 339 destination.name if isinstance(destination, CloudDestination) else None 340 ), 341 api_root=self.api_root, 342 client_id=self.client_id, 343 client_secret=self.client_secret, 344 safe_mode=safe_mode, 345 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
349 def deploy_connection( 350 self, 351 connection_name: str, 352 *, 353 source: CloudSource | str, 354 selected_streams: list[str], 355 destination: CloudDestination | str, 356 table_prefix: str | None = None, 357 ) -> CloudConnection: 358 """Create a new connection between an already deployed source and destination. 359 360 Returns the newly deployed connection object. 361 362 Args: 363 connection_name: The name of the connection. 364 source: The deployed source. You can pass a source ID or a CloudSource object. 365 destination: The deployed destination. You can pass a destination ID or a 366 CloudDestination object. 367 table_prefix: Optional. The table prefix to use when syncing to the destination. 368 selected_streams: The selected stream names to sync within the connection. 369 """ 370 if not selected_streams: 371 raise exc.PyAirbyteInputError( 372 guidance="You must provide `selected_streams` when creating a connection." 373 ) 374 375 source_id: str = source if isinstance(source, str) else source.connector_id 376 destination_id: str = ( 377 destination if isinstance(destination, str) else destination.connector_id 378 ) 379 380 deployed_connection = api_util.create_connection( 381 name=connection_name, 382 source_id=source_id, 383 destination_id=destination_id, 384 api_root=self.api_root, 385 workspace_id=self.workspace_id, 386 selected_stream_names=selected_streams, 387 prefix=table_prefix or "", 388 client_id=self.client_id, 389 client_secret=self.client_secret, 390 ) 391 392 return CloudConnection( 393 workspace=self, 394 connection_id=deployed_connection.connection_id, 395 source=deployed_connection.source_id, 396 destination=deployed_connection.destination_id, 397 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
399 def permanently_delete_connection( 400 self, 401 connection: str | CloudConnection, 402 *, 403 cascade_delete_source: bool = False, 404 cascade_delete_destination: bool = False, 405 safe_mode: bool = True, 406 ) -> None: 407 """Delete a deployed connection from the workspace. 408 409 Args: 410 connection: The connection ID or CloudConnection object to delete 411 cascade_delete_source: If True, also delete the source after deleting the connection 412 cascade_delete_destination: If True, also delete the destination after deleting 413 the connection 414 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 415 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 416 to cascade deletes. 417 """ 418 if connection is None: 419 raise ValueError("No connection ID provided.") 420 421 if isinstance(connection, str): 422 connection = CloudConnection( 423 workspace=self, 424 connection_id=connection, 425 ) 426 427 api_util.delete_connection( 428 connection_id=connection.connection_id, 429 connection_name=connection.name, 430 api_root=self.api_root, 431 workspace_id=self.workspace_id, 432 client_id=self.client_id, 433 client_secret=self.client_secret, 434 safe_mode=safe_mode, 435 ) 436 437 if cascade_delete_source: 438 self.permanently_delete_source( 439 source=connection.source_id, 440 safe_mode=safe_mode, 441 ) 442 if cascade_delete_destination: 443 self.permanently_delete_destination( 444 destination=connection.destination_id, 445 safe_mode=safe_mode, 446 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
450 def list_connections( 451 self, 452 name: str | None = None, 453 *, 454 name_filter: Callable | None = None, 455 ) -> list[CloudConnection]: 456 """List connections by name in the workspace. 457 458 TODO: Add pagination support 459 """ 460 connections = api_util.list_connections( 461 api_root=self.api_root, 462 workspace_id=self.workspace_id, 463 name=name, 464 name_filter=name_filter, 465 client_id=self.client_id, 466 client_secret=self.client_secret, 467 ) 468 return [ 469 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 470 workspace=self, 471 connection_response=connection, 472 ) 473 for connection in connections 474 if name is None or connection.name == name 475 ]
List connections by name in the workspace.
TODO: Add pagination support
477 def list_sources( 478 self, 479 name: str | None = None, 480 *, 481 name_filter: Callable | None = None, 482 ) -> list[CloudSource]: 483 """List all sources in the workspace. 484 485 TODO: Add pagination support 486 """ 487 sources = api_util.list_sources( 488 api_root=self.api_root, 489 workspace_id=self.workspace_id, 490 name=name, 491 name_filter=name_filter, 492 client_id=self.client_id, 493 client_secret=self.client_secret, 494 ) 495 return [ 496 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 497 workspace=self, 498 source_response=source, 499 ) 500 for source in sources 501 if name is None or source.name == name 502 ]
List all sources in the workspace.
TODO: Add pagination support
504 def list_destinations( 505 self, 506 name: str | None = None, 507 *, 508 name_filter: Callable | None = None, 509 ) -> list[CloudDestination]: 510 """List all destinations in the workspace. 511 512 TODO: Add pagination support 513 """ 514 destinations = api_util.list_destinations( 515 api_root=self.api_root, 516 workspace_id=self.workspace_id, 517 name=name, 518 name_filter=name_filter, 519 client_id=self.client_id, 520 client_secret=self.client_secret, 521 ) 522 return [ 523 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 524 workspace=self, 525 destination_response=destination, 526 ) 527 for destination in destinations 528 if name is None or destination.name == name 529 ]
List all destinations in the workspace.
TODO: Add pagination support
531 def publish_custom_source_definition( 532 self, 533 name: str, 534 *, 535 manifest_yaml: dict[str, Any] | Path | str | None = None, 536 docker_image: str | None = None, 537 docker_tag: str | None = None, 538 unique: bool = True, 539 pre_validate: bool = True, 540 testing_values: dict[str, Any] | None = None, 541 ) -> CustomCloudSourceDefinition: 542 """Publish a custom source connector definition. 543 544 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 545 and docker_tag (for Docker connectors), but not both. 546 547 Args: 548 name: Display name for the connector definition 549 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 550 docker_image: Docker repository (e.g., 'airbyte/source-custom') 551 docker_tag: Docker image tag (e.g., '1.0.0') 552 unique: Whether to enforce name uniqueness 553 pre_validate: Whether to validate manifest client-side (YAML only) 554 testing_values: Optional configuration values to use for testing in the 555 Connector Builder UI. If provided, these values are stored as the complete 556 testing values object for the connector builder project (replaces any existing 557 values), allowing immediate test read operations. 558 559 Returns: 560 CustomCloudSourceDefinition object representing the created definition 561 562 Raises: 563 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 564 AirbyteDuplicateResourcesError: If unique=True and name already exists 565 """ 566 is_yaml = manifest_yaml is not None 567 is_docker = docker_image is not None 568 569 if is_yaml == is_docker: 570 raise exc.PyAirbyteInputError( 571 message=( 572 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 573 "docker_image + docker_tag (for Docker connectors), but not both" 574 ), 575 context={ 576 "manifest_yaml_provided": is_yaml, 577 "docker_image_provided": is_docker, 578 }, 579 ) 580 581 if is_docker and docker_tag is None: 582 raise exc.PyAirbyteInputError( 583 message="docker_tag is required when docker_image is specified", 584 context={"docker_image": docker_image}, 585 ) 586 587 if unique: 588 existing = self.list_custom_source_definitions( 589 definition_type="yaml" if is_yaml else "docker", 590 ) 591 if any(d.name == name for d in existing): 592 raise exc.AirbyteDuplicateResourcesError( 593 resource_type="custom_source_definition", 594 resource_name=name, 595 ) 596 597 if is_yaml: 598 manifest_dict: dict[str, Any] 599 if isinstance(manifest_yaml, Path): 600 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 601 elif isinstance(manifest_yaml, str): 602 manifest_dict = yaml.safe_load(manifest_yaml) 603 elif manifest_yaml is not None: 604 manifest_dict = manifest_yaml 605 else: 606 raise exc.PyAirbyteInputError( 607 message="manifest_yaml is required for YAML connectors", 608 context={"name": name}, 609 ) 610 611 if pre_validate: 612 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 613 614 result = api_util.create_custom_yaml_source_definition( 615 name=name, 616 workspace_id=self.workspace_id, 617 manifest=manifest_dict, 618 api_root=self.api_root, 619 client_id=self.client_id, 620 client_secret=self.client_secret, 621 ) 622 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 623 self, result 624 ) 625 626 # Set testing values if provided 627 if testing_values is not None: 628 custom_definition.set_testing_values(testing_values) 629 630 return custom_definition 631 632 raise NotImplementedError( 633 "Docker custom source definitions are not yet supported. " 634 "Only YAML manifest-based custom sources are currently available." 635 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
637 def list_custom_source_definitions( 638 self, 639 *, 640 definition_type: Literal["yaml", "docker"], 641 ) -> list[CustomCloudSourceDefinition]: 642 """List custom source connector definitions. 643 644 Args: 645 definition_type: Connector type to list ("yaml" or "docker"). Required. 646 647 Returns: 648 List of CustomCloudSourceDefinition objects matching the specified type 649 """ 650 if definition_type == "yaml": 651 yaml_definitions = api_util.list_custom_yaml_source_definitions( 652 workspace_id=self.workspace_id, 653 api_root=self.api_root, 654 client_id=self.client_id, 655 client_secret=self.client_secret, 656 ) 657 return [ 658 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 659 for d in yaml_definitions 660 ] 661 662 raise NotImplementedError( 663 "Docker custom source definitions are not yet supported. " 664 "Only YAML manifest-based custom sources are currently available." 665 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
667 def get_custom_source_definition( 668 self, 669 definition_id: str, 670 *, 671 definition_type: Literal["yaml", "docker"], 672 ) -> CustomCloudSourceDefinition: 673 """Get a specific custom source definition by ID. 674 675 Args: 676 definition_id: The definition ID 677 definition_type: Connector type ("yaml" or "docker"). Required. 678 679 Returns: 680 CustomCloudSourceDefinition object 681 """ 682 if definition_type == "yaml": 683 result = api_util.get_custom_yaml_source_definition( 684 workspace_id=self.workspace_id, 685 definition_id=definition_id, 686 api_root=self.api_root, 687 client_id=self.client_id, 688 client_secret=self.client_secret, 689 ) 690 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 691 692 raise NotImplementedError( 693 "Docker custom source definitions are not yet supported. " 694 "Only YAML manifest-based custom sources are currently available." 695 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object