airbyte.cloud.workspaces      
                        PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
Usage Examples
Get a new workspace object and deploy a source to it:
import airbyte as ab
from airbyte import cloud
workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
# Deploy a source to the workspace
source = ab.get_source("source-faker", config={"count": 100})
deployed_source = workspace.deploy_source(
    name="test-source",
    source=source,
)
# Run a check on the deployed source and raise an exception if the check fails
check_result = deployed_source.check(raise_on_error=True)
# Permanently delete the newly-created source
workspace.permanently_delete_source(deployed_source)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6 7## Usage Examples 8 9Get a new workspace object and deploy a source to it: 10 11```python 12import airbyte as ab 13from airbyte import cloud 14 15workspace = cloud.CloudWorkspace( 16 workspace_id="...", 17 client_id="...", 18 client_secret="...", 19) 20 21# Deploy a source to the workspace 22source = ab.get_source("source-faker", config={"count": 100}) 23deployed_source = workspace.deploy_source( 24 name="test-source", 25 source=source, 26) 27 28# Run a check on the deployed source and raise an exception if the check fails 29check_result = deployed_source.check(raise_on_error=True) 30 31# Permanently delete the newly-created source 32workspace.permanently_delete_source(deployed_source) 33``` 34""" 35 36from __future__ import annotations 37 38from dataclasses import dataclass 39from pathlib import Path 40from typing import TYPE_CHECKING, Any, Literal 41 42import yaml 43 44from airbyte import exceptions as exc 45from airbyte._util import api_util, text_util 46from airbyte._util.api_util import get_web_url_root 47from airbyte.cloud.connections import CloudConnection 48from airbyte.cloud.connectors import ( 49 CloudDestination, 50 CloudSource, 51 CustomCloudSourceDefinition, 52) 53from airbyte.destinations.base import Destination 54from airbyte.secrets.base import SecretString 55 56 57if TYPE_CHECKING: 58 from collections.abc import Callable 59 60 from airbyte.sources.base import Source 61 62 63@dataclass 64class CloudWorkspace: 65 """A remote workspace on the Airbyte Cloud. 66 67 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 68 instances, both OSS and Enterprise. 69 """ 70 71 workspace_id: str 72 client_id: SecretString 73 client_secret: SecretString 74 api_root: str = api_util.CLOUD_API_ROOT 75 76 def __post_init__(self) -> None: 77 """Ensure that the client ID and secret are handled securely.""" 78 self.client_id = SecretString(self.client_id) 79 self.client_secret = SecretString(self.client_secret) 80 81 @property 82 def workspace_url(self) -> str | None: 83 """The web URL of the workspace.""" 84 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 85 86 # Test connection and creds 87 88 def connect(self) -> None: 89 """Check that the workspace is reachable and raise an exception otherwise. 90 91 Note: It is not necessary to call this method before calling other operations. It 92 serves primarily as a simple check to ensure that the workspace is reachable 93 and credentials are correct. 94 """ 95 _ = api_util.get_workspace( 96 api_root=self.api_root, 97 workspace_id=self.workspace_id, 98 client_id=self.client_id, 99 client_secret=self.client_secret, 100 ) 101 print(f"Successfully connected to workspace: {self.workspace_url}") 102 103 # Get sources, destinations, and connections 104 105 def get_connection( 106 self, 107 connection_id: str, 108 ) -> CloudConnection: 109 """Get a connection by ID. 110 111 This method does not fetch data from the API. It returns a `CloudConnection` object, 112 which will be loaded lazily as needed. 113 """ 114 return CloudConnection( 115 workspace=self, 116 connection_id=connection_id, 117 ) 118 119 def get_source( 120 self, 121 source_id: str, 122 ) -> CloudSource: 123 """Get a source by ID. 124 125 This method does not fetch data from the API. It returns a `CloudSource` object, 126 which will be loaded lazily as needed. 127 """ 128 return CloudSource( 129 workspace=self, 130 connector_id=source_id, 131 ) 132 133 def get_destination( 134 self, 135 destination_id: str, 136 ) -> CloudDestination: 137 """Get a destination by ID. 138 139 This method does not fetch data from the API. It returns a `CloudDestination` object, 140 which will be loaded lazily as needed. 141 """ 142 return CloudDestination( 143 workspace=self, 144 connector_id=destination_id, 145 ) 146 147 # Deploy sources and destinations 148 149 def deploy_source( 150 self, 151 name: str, 152 source: Source, 153 *, 154 unique: bool = True, 155 random_name_suffix: bool = False, 156 ) -> CloudSource: 157 """Deploy a source to the workspace. 158 159 Returns the newly deployed source. 160 161 Args: 162 name: The name to use when deploying. 163 source: The source object to deploy. 164 unique: Whether to require a unique name. If `True`, duplicate names 165 are not allowed. Defaults to `True`. 166 random_name_suffix: Whether to append a random suffix to the name. 167 """ 168 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 169 source_config_dict["sourceType"] = source.name.replace("source-", "") 170 171 if random_name_suffix: 172 name += f" (ID: {text_util.generate_random_suffix()})" 173 174 if unique: 175 existing = self.list_sources(name=name) 176 if existing: 177 raise exc.AirbyteDuplicateResourcesError( 178 resource_type="source", 179 resource_name=name, 180 ) 181 182 deployed_source = api_util.create_source( 183 name=name, 184 api_root=self.api_root, 185 workspace_id=self.workspace_id, 186 config=source_config_dict, 187 client_id=self.client_id, 188 client_secret=self.client_secret, 189 ) 190 return CloudSource( 191 workspace=self, 192 connector_id=deployed_source.source_id, 193 ) 194 195 def deploy_destination( 196 self, 197 name: str, 198 destination: Destination | dict[str, Any], 199 *, 200 unique: bool = True, 201 random_name_suffix: bool = False, 202 ) -> CloudDestination: 203 """Deploy a destination to the workspace. 204 205 Returns the newly deployed destination ID. 206 207 Args: 208 name: The name to use when deploying. 209 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 210 dictionary of configuration values. 211 unique: Whether to require a unique name. If `True`, duplicate names 212 are not allowed. Defaults to `True`. 213 random_name_suffix: Whether to append a random suffix to the name. 214 """ 215 if isinstance(destination, Destination): 216 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 217 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 218 # raise ValueError(destination_conf_dict) 219 else: 220 destination_conf_dict = destination.copy() 221 if "destinationType" not in destination_conf_dict: 222 raise exc.PyAirbyteInputError( 223 message="Missing `destinationType` in configuration dictionary.", 224 ) 225 226 if random_name_suffix: 227 name += f" (ID: {text_util.generate_random_suffix()})" 228 229 if unique: 230 existing = self.list_destinations(name=name) 231 if existing: 232 raise exc.AirbyteDuplicateResourcesError( 233 resource_type="destination", 234 resource_name=name, 235 ) 236 237 deployed_destination = api_util.create_destination( 238 name=name, 239 api_root=self.api_root, 240 workspace_id=self.workspace_id, 241 config=destination_conf_dict, # Wants a dataclass but accepts dict 242 client_id=self.client_id, 243 client_secret=self.client_secret, 244 ) 245 return CloudDestination( 246 workspace=self, 247 connector_id=deployed_destination.destination_id, 248 ) 249 250 def permanently_delete_source( 251 self, 252 source: str | CloudSource, 253 ) -> None: 254 """Delete a source from the workspace. 255 256 You can pass either the source ID `str` or a deployed `Source` object. 257 """ 258 if not isinstance(source, (str, CloudSource)): 259 raise exc.PyAirbyteInputError( 260 message="Invalid source type.", 261 input_value=type(source).__name__, 262 ) 263 264 api_util.delete_source( 265 source_id=source.connector_id if isinstance(source, CloudSource) else source, 266 api_root=self.api_root, 267 client_id=self.client_id, 268 client_secret=self.client_secret, 269 ) 270 271 # Deploy and delete destinations 272 273 def permanently_delete_destination( 274 self, 275 destination: str | CloudDestination, 276 ) -> None: 277 """Delete a deployed destination from the workspace. 278 279 You can pass either the `Cache` class or the deployed destination ID as a `str`. 280 """ 281 if not isinstance(destination, (str, CloudDestination)): 282 raise exc.PyAirbyteInputError( 283 message="Invalid destination type.", 284 input_value=type(destination).__name__, 285 ) 286 287 api_util.delete_destination( 288 destination_id=( 289 destination if isinstance(destination, str) else destination.destination_id 290 ), 291 api_root=self.api_root, 292 client_id=self.client_id, 293 client_secret=self.client_secret, 294 ) 295 296 # Deploy and delete connections 297 298 def deploy_connection( 299 self, 300 connection_name: str, 301 *, 302 source: CloudSource | str, 303 selected_streams: list[str], 304 destination: CloudDestination | str, 305 table_prefix: str | None = None, 306 ) -> CloudConnection: 307 """Create a new connection between an already deployed source and destination. 308 309 Returns the newly deployed connection object. 310 311 Args: 312 connection_name: The name of the connection. 313 source: The deployed source. You can pass a source ID or a CloudSource object. 314 destination: The deployed destination. You can pass a destination ID or a 315 CloudDestination object. 316 table_prefix: Optional. The table prefix to use when syncing to the destination. 317 selected_streams: The selected stream names to sync within the connection. 318 """ 319 if not selected_streams: 320 raise exc.PyAirbyteInputError( 321 guidance="You must provide `selected_streams` when creating a connection." 322 ) 323 324 source_id: str = source if isinstance(source, str) else source.connector_id 325 destination_id: str = ( 326 destination if isinstance(destination, str) else destination.connector_id 327 ) 328 329 deployed_connection = api_util.create_connection( 330 name=connection_name, 331 source_id=source_id, 332 destination_id=destination_id, 333 api_root=self.api_root, 334 workspace_id=self.workspace_id, 335 selected_stream_names=selected_streams, 336 prefix=table_prefix or "", 337 client_id=self.client_id, 338 client_secret=self.client_secret, 339 ) 340 341 return CloudConnection( 342 workspace=self, 343 connection_id=deployed_connection.connection_id, 344 source=deployed_connection.source_id, 345 destination=deployed_connection.destination_id, 346 ) 347 348 def permanently_delete_connection( 349 self, 350 connection: str | CloudConnection, 351 *, 352 cascade_delete_source: bool = False, 353 cascade_delete_destination: bool = False, 354 ) -> None: 355 """Delete a deployed connection from the workspace.""" 356 if connection is None: 357 raise ValueError("No connection ID provided.") 358 359 if isinstance(connection, str): 360 connection = CloudConnection( 361 workspace=self, 362 connection_id=connection, 363 ) 364 365 api_util.delete_connection( 366 connection_id=connection.connection_id, 367 api_root=self.api_root, 368 workspace_id=self.workspace_id, 369 client_id=self.client_id, 370 client_secret=self.client_secret, 371 ) 372 373 if cascade_delete_source: 374 self.permanently_delete_source(source=connection.source_id) 375 if cascade_delete_destination: 376 self.permanently_delete_destination(destination=connection.destination_id) 377 378 # List sources, destinations, and connections 379 380 def list_connections( 381 self, 382 name: str | None = None, 383 *, 384 name_filter: Callable | None = None, 385 ) -> list[CloudConnection]: 386 """List connections by name in the workspace. 387 388 TODO: Add pagination support 389 """ 390 connections = api_util.list_connections( 391 api_root=self.api_root, 392 workspace_id=self.workspace_id, 393 name=name, 394 name_filter=name_filter, 395 client_id=self.client_id, 396 client_secret=self.client_secret, 397 ) 398 return [ 399 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 400 workspace=self, 401 connection_response=connection, 402 ) 403 for connection in connections 404 if name is None or connection.name == name 405 ] 406 407 def list_sources( 408 self, 409 name: str | None = None, 410 *, 411 name_filter: Callable | None = None, 412 ) -> list[CloudSource]: 413 """List all sources in the workspace. 414 415 TODO: Add pagination support 416 """ 417 sources = api_util.list_sources( 418 api_root=self.api_root, 419 workspace_id=self.workspace_id, 420 name=name, 421 name_filter=name_filter, 422 client_id=self.client_id, 423 client_secret=self.client_secret, 424 ) 425 return [ 426 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 427 workspace=self, 428 source_response=source, 429 ) 430 for source in sources 431 if name is None or source.name == name 432 ] 433 434 def list_destinations( 435 self, 436 name: str | None = None, 437 *, 438 name_filter: Callable | None = None, 439 ) -> list[CloudDestination]: 440 """List all destinations in the workspace. 441 442 TODO: Add pagination support 443 """ 444 destinations = api_util.list_destinations( 445 api_root=self.api_root, 446 workspace_id=self.workspace_id, 447 name=name, 448 name_filter=name_filter, 449 client_id=self.client_id, 450 client_secret=self.client_secret, 451 ) 452 return [ 453 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 454 workspace=self, 455 destination_response=destination, 456 ) 457 for destination in destinations 458 if name is None or destination.name == name 459 ] 460 461 def publish_custom_source_definition( 462 self, 463 name: str, 464 *, 465 manifest_yaml: dict[str, Any] | Path | str | None = None, 466 docker_image: str | None = None, 467 docker_tag: str | None = None, 468 unique: bool = True, 469 pre_validate: bool = True, 470 ) -> CustomCloudSourceDefinition: 471 """Publish a custom source connector definition. 472 473 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 474 and docker_tag (for Docker connectors), but not both. 475 476 Args: 477 name: Display name for the connector definition 478 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 479 docker_image: Docker repository (e.g., 'airbyte/source-custom') 480 docker_tag: Docker image tag (e.g., '1.0.0') 481 unique: Whether to enforce name uniqueness 482 pre_validate: Whether to validate manifest client-side (YAML only) 483 484 Returns: 485 CustomCloudSourceDefinition object representing the created definition 486 487 Raises: 488 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 489 AirbyteDuplicateResourcesError: If unique=True and name already exists 490 """ 491 is_yaml = manifest_yaml is not None 492 is_docker = docker_image is not None 493 494 if is_yaml == is_docker: 495 raise exc.PyAirbyteInputError( 496 message=( 497 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 498 "docker_image + docker_tag (for Docker connectors), but not both" 499 ), 500 context={ 501 "manifest_yaml_provided": is_yaml, 502 "docker_image_provided": is_docker, 503 }, 504 ) 505 506 if is_docker and docker_tag is None: 507 raise exc.PyAirbyteInputError( 508 message="docker_tag is required when docker_image is specified", 509 context={"docker_image": docker_image}, 510 ) 511 512 if unique: 513 existing = self.list_custom_source_definitions( 514 definition_type="yaml" if is_yaml else "docker", 515 ) 516 if any(d.name == name for d in existing): 517 raise exc.AirbyteDuplicateResourcesError( 518 resource_type="custom_source_definition", 519 resource_name=name, 520 ) 521 522 if is_yaml: 523 manifest_dict: dict[str, Any] 524 if isinstance(manifest_yaml, Path): 525 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 526 elif isinstance(manifest_yaml, str): 527 manifest_dict = yaml.safe_load(manifest_yaml) 528 elif manifest_yaml is not None: 529 manifest_dict = manifest_yaml 530 else: 531 raise exc.PyAirbyteInputError( 532 message="manifest_yaml is required for YAML connectors", 533 context={"name": name}, 534 ) 535 536 if pre_validate: 537 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 538 539 result = api_util.create_custom_yaml_source_definition( 540 name=name, 541 workspace_id=self.workspace_id, 542 manifest=manifest_dict, 543 api_root=self.api_root, 544 client_id=self.client_id, 545 client_secret=self.client_secret, 546 ) 547 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 548 549 raise NotImplementedError( 550 "Docker custom source definitions are not yet supported. " 551 "Only YAML manifest-based custom sources are currently available." 552 ) 553 554 def list_custom_source_definitions( 555 self, 556 *, 557 definition_type: Literal["yaml", "docker"], 558 ) -> list[CustomCloudSourceDefinition]: 559 """List custom source connector definitions. 560 561 Args: 562 definition_type: Connector type to list ("yaml" or "docker"). Required. 563 564 Returns: 565 List of CustomCloudSourceDefinition objects matching the specified type 566 """ 567 if definition_type == "yaml": 568 yaml_definitions = api_util.list_custom_yaml_source_definitions( 569 workspace_id=self.workspace_id, 570 api_root=self.api_root, 571 client_id=self.client_id, 572 client_secret=self.client_secret, 573 ) 574 return [ 575 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 576 for d in yaml_definitions 577 ] 578 579 raise NotImplementedError( 580 "Docker custom source definitions are not yet supported. " 581 "Only YAML manifest-based custom sources are currently available." 582 ) 583 584 def get_custom_source_definition( 585 self, 586 definition_id: str, 587 *, 588 definition_type: Literal["yaml", "docker"], 589 ) -> CustomCloudSourceDefinition: 590 """Get a specific custom source definition by ID. 591 592 Args: 593 definition_id: The definition ID 594 definition_type: Connector type ("yaml" or "docker"). Required. 595 596 Returns: 597 CustomCloudSourceDefinition object 598 """ 599 if definition_type == "yaml": 600 result = api_util.get_custom_yaml_source_definition( 601 workspace_id=self.workspace_id, 602 definition_id=definition_id, 603 api_root=self.api_root, 604 client_id=self.client_id, 605 client_secret=self.client_secret, 606 ) 607 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 608 609 raise NotImplementedError( 610 "Docker custom source definitions are not yet supported. " 611 "Only YAML manifest-based custom sources are currently available." 612 )
64@dataclass 65class CloudWorkspace: 66 """A remote workspace on the Airbyte Cloud. 67 68 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 69 instances, both OSS and Enterprise. 70 """ 71 72 workspace_id: str 73 client_id: SecretString 74 client_secret: SecretString 75 api_root: str = api_util.CLOUD_API_ROOT 76 77 def __post_init__(self) -> None: 78 """Ensure that the client ID and secret are handled securely.""" 79 self.client_id = SecretString(self.client_id) 80 self.client_secret = SecretString(self.client_secret) 81 82 @property 83 def workspace_url(self) -> str | None: 84 """The web URL of the workspace.""" 85 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 86 87 # Test connection and creds 88 89 def connect(self) -> None: 90 """Check that the workspace is reachable and raise an exception otherwise. 91 92 Note: It is not necessary to call this method before calling other operations. It 93 serves primarily as a simple check to ensure that the workspace is reachable 94 and credentials are correct. 95 """ 96 _ = api_util.get_workspace( 97 api_root=self.api_root, 98 workspace_id=self.workspace_id, 99 client_id=self.client_id, 100 client_secret=self.client_secret, 101 ) 102 print(f"Successfully connected to workspace: {self.workspace_url}") 103 104 # Get sources, destinations, and connections 105 106 def get_connection( 107 self, 108 connection_id: str, 109 ) -> CloudConnection: 110 """Get a connection by ID. 111 112 This method does not fetch data from the API. It returns a `CloudConnection` object, 113 which will be loaded lazily as needed. 114 """ 115 return CloudConnection( 116 workspace=self, 117 connection_id=connection_id, 118 ) 119 120 def get_source( 121 self, 122 source_id: str, 123 ) -> CloudSource: 124 """Get a source by ID. 125 126 This method does not fetch data from the API. It returns a `CloudSource` object, 127 which will be loaded lazily as needed. 128 """ 129 return CloudSource( 130 workspace=self, 131 connector_id=source_id, 132 ) 133 134 def get_destination( 135 self, 136 destination_id: str, 137 ) -> CloudDestination: 138 """Get a destination by ID. 139 140 This method does not fetch data from the API. It returns a `CloudDestination` object, 141 which will be loaded lazily as needed. 142 """ 143 return CloudDestination( 144 workspace=self, 145 connector_id=destination_id, 146 ) 147 148 # Deploy sources and destinations 149 150 def deploy_source( 151 self, 152 name: str, 153 source: Source, 154 *, 155 unique: bool = True, 156 random_name_suffix: bool = False, 157 ) -> CloudSource: 158 """Deploy a source to the workspace. 159 160 Returns the newly deployed source. 161 162 Args: 163 name: The name to use when deploying. 164 source: The source object to deploy. 165 unique: Whether to require a unique name. If `True`, duplicate names 166 are not allowed. Defaults to `True`. 167 random_name_suffix: Whether to append a random suffix to the name. 168 """ 169 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 170 source_config_dict["sourceType"] = source.name.replace("source-", "") 171 172 if random_name_suffix: 173 name += f" (ID: {text_util.generate_random_suffix()})" 174 175 if unique: 176 existing = self.list_sources(name=name) 177 if existing: 178 raise exc.AirbyteDuplicateResourcesError( 179 resource_type="source", 180 resource_name=name, 181 ) 182 183 deployed_source = api_util.create_source( 184 name=name, 185 api_root=self.api_root, 186 workspace_id=self.workspace_id, 187 config=source_config_dict, 188 client_id=self.client_id, 189 client_secret=self.client_secret, 190 ) 191 return CloudSource( 192 workspace=self, 193 connector_id=deployed_source.source_id, 194 ) 195 196 def deploy_destination( 197 self, 198 name: str, 199 destination: Destination | dict[str, Any], 200 *, 201 unique: bool = True, 202 random_name_suffix: bool = False, 203 ) -> CloudDestination: 204 """Deploy a destination to the workspace. 205 206 Returns the newly deployed destination ID. 207 208 Args: 209 name: The name to use when deploying. 210 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 211 dictionary of configuration values. 212 unique: Whether to require a unique name. If `True`, duplicate names 213 are not allowed. Defaults to `True`. 214 random_name_suffix: Whether to append a random suffix to the name. 215 """ 216 if isinstance(destination, Destination): 217 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 218 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 219 # raise ValueError(destination_conf_dict) 220 else: 221 destination_conf_dict = destination.copy() 222 if "destinationType" not in destination_conf_dict: 223 raise exc.PyAirbyteInputError( 224 message="Missing `destinationType` in configuration dictionary.", 225 ) 226 227 if random_name_suffix: 228 name += f" (ID: {text_util.generate_random_suffix()})" 229 230 if unique: 231 existing = self.list_destinations(name=name) 232 if existing: 233 raise exc.AirbyteDuplicateResourcesError( 234 resource_type="destination", 235 resource_name=name, 236 ) 237 238 deployed_destination = api_util.create_destination( 239 name=name, 240 api_root=self.api_root, 241 workspace_id=self.workspace_id, 242 config=destination_conf_dict, # Wants a dataclass but accepts dict 243 client_id=self.client_id, 244 client_secret=self.client_secret, 245 ) 246 return CloudDestination( 247 workspace=self, 248 connector_id=deployed_destination.destination_id, 249 ) 250 251 def permanently_delete_source( 252 self, 253 source: str | CloudSource, 254 ) -> None: 255 """Delete a source from the workspace. 256 257 You can pass either the source ID `str` or a deployed `Source` object. 258 """ 259 if not isinstance(source, (str, CloudSource)): 260 raise exc.PyAirbyteInputError( 261 message="Invalid source type.", 262 input_value=type(source).__name__, 263 ) 264 265 api_util.delete_source( 266 source_id=source.connector_id if isinstance(source, CloudSource) else source, 267 api_root=self.api_root, 268 client_id=self.client_id, 269 client_secret=self.client_secret, 270 ) 271 272 # Deploy and delete destinations 273 274 def permanently_delete_destination( 275 self, 276 destination: str | CloudDestination, 277 ) -> None: 278 """Delete a deployed destination from the workspace. 279 280 You can pass either the `Cache` class or the deployed destination ID as a `str`. 281 """ 282 if not isinstance(destination, (str, CloudDestination)): 283 raise exc.PyAirbyteInputError( 284 message="Invalid destination type.", 285 input_value=type(destination).__name__, 286 ) 287 288 api_util.delete_destination( 289 destination_id=( 290 destination if isinstance(destination, str) else destination.destination_id 291 ), 292 api_root=self.api_root, 293 client_id=self.client_id, 294 client_secret=self.client_secret, 295 ) 296 297 # Deploy and delete connections 298 299 def deploy_connection( 300 self, 301 connection_name: str, 302 *, 303 source: CloudSource | str, 304 selected_streams: list[str], 305 destination: CloudDestination | str, 306 table_prefix: str | None = None, 307 ) -> CloudConnection: 308 """Create a new connection between an already deployed source and destination. 309 310 Returns the newly deployed connection object. 311 312 Args: 313 connection_name: The name of the connection. 314 source: The deployed source. You can pass a source ID or a CloudSource object. 315 destination: The deployed destination. You can pass a destination ID or a 316 CloudDestination object. 317 table_prefix: Optional. The table prefix to use when syncing to the destination. 318 selected_streams: The selected stream names to sync within the connection. 319 """ 320 if not selected_streams: 321 raise exc.PyAirbyteInputError( 322 guidance="You must provide `selected_streams` when creating a connection." 323 ) 324 325 source_id: str = source if isinstance(source, str) else source.connector_id 326 destination_id: str = ( 327 destination if isinstance(destination, str) else destination.connector_id 328 ) 329 330 deployed_connection = api_util.create_connection( 331 name=connection_name, 332 source_id=source_id, 333 destination_id=destination_id, 334 api_root=self.api_root, 335 workspace_id=self.workspace_id, 336 selected_stream_names=selected_streams, 337 prefix=table_prefix or "", 338 client_id=self.client_id, 339 client_secret=self.client_secret, 340 ) 341 342 return CloudConnection( 343 workspace=self, 344 connection_id=deployed_connection.connection_id, 345 source=deployed_connection.source_id, 346 destination=deployed_connection.destination_id, 347 ) 348 349 def permanently_delete_connection( 350 self, 351 connection: str | CloudConnection, 352 *, 353 cascade_delete_source: bool = False, 354 cascade_delete_destination: bool = False, 355 ) -> None: 356 """Delete a deployed connection from the workspace.""" 357 if connection is None: 358 raise ValueError("No connection ID provided.") 359 360 if isinstance(connection, str): 361 connection = CloudConnection( 362 workspace=self, 363 connection_id=connection, 364 ) 365 366 api_util.delete_connection( 367 connection_id=connection.connection_id, 368 api_root=self.api_root, 369 workspace_id=self.workspace_id, 370 client_id=self.client_id, 371 client_secret=self.client_secret, 372 ) 373 374 if cascade_delete_source: 375 self.permanently_delete_source(source=connection.source_id) 376 if cascade_delete_destination: 377 self.permanently_delete_destination(destination=connection.destination_id) 378 379 # List sources, destinations, and connections 380 381 def list_connections( 382 self, 383 name: str | None = None, 384 *, 385 name_filter: Callable | None = None, 386 ) -> list[CloudConnection]: 387 """List connections by name in the workspace. 388 389 TODO: Add pagination support 390 """ 391 connections = api_util.list_connections( 392 api_root=self.api_root, 393 workspace_id=self.workspace_id, 394 name=name, 395 name_filter=name_filter, 396 client_id=self.client_id, 397 client_secret=self.client_secret, 398 ) 399 return [ 400 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 401 workspace=self, 402 connection_response=connection, 403 ) 404 for connection in connections 405 if name is None or connection.name == name 406 ] 407 408 def list_sources( 409 self, 410 name: str | None = None, 411 *, 412 name_filter: Callable | None = None, 413 ) -> list[CloudSource]: 414 """List all sources in the workspace. 415 416 TODO: Add pagination support 417 """ 418 sources = api_util.list_sources( 419 api_root=self.api_root, 420 workspace_id=self.workspace_id, 421 name=name, 422 name_filter=name_filter, 423 client_id=self.client_id, 424 client_secret=self.client_secret, 425 ) 426 return [ 427 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 428 workspace=self, 429 source_response=source, 430 ) 431 for source in sources 432 if name is None or source.name == name 433 ] 434 435 def list_destinations( 436 self, 437 name: str | None = None, 438 *, 439 name_filter: Callable | None = None, 440 ) -> list[CloudDestination]: 441 """List all destinations in the workspace. 442 443 TODO: Add pagination support 444 """ 445 destinations = api_util.list_destinations( 446 api_root=self.api_root, 447 workspace_id=self.workspace_id, 448 name=name, 449 name_filter=name_filter, 450 client_id=self.client_id, 451 client_secret=self.client_secret, 452 ) 453 return [ 454 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 455 workspace=self, 456 destination_response=destination, 457 ) 458 for destination in destinations 459 if name is None or destination.name == name 460 ] 461 462 def publish_custom_source_definition( 463 self, 464 name: str, 465 *, 466 manifest_yaml: dict[str, Any] | Path | str | None = None, 467 docker_image: str | None = None, 468 docker_tag: str | None = None, 469 unique: bool = True, 470 pre_validate: bool = True, 471 ) -> CustomCloudSourceDefinition: 472 """Publish a custom source connector definition. 473 474 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 475 and docker_tag (for Docker connectors), but not both. 476 477 Args: 478 name: Display name for the connector definition 479 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 480 docker_image: Docker repository (e.g., 'airbyte/source-custom') 481 docker_tag: Docker image tag (e.g., '1.0.0') 482 unique: Whether to enforce name uniqueness 483 pre_validate: Whether to validate manifest client-side (YAML only) 484 485 Returns: 486 CustomCloudSourceDefinition object representing the created definition 487 488 Raises: 489 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 490 AirbyteDuplicateResourcesError: If unique=True and name already exists 491 """ 492 is_yaml = manifest_yaml is not None 493 is_docker = docker_image is not None 494 495 if is_yaml == is_docker: 496 raise exc.PyAirbyteInputError( 497 message=( 498 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 499 "docker_image + docker_tag (for Docker connectors), but not both" 500 ), 501 context={ 502 "manifest_yaml_provided": is_yaml, 503 "docker_image_provided": is_docker, 504 }, 505 ) 506 507 if is_docker and docker_tag is None: 508 raise exc.PyAirbyteInputError( 509 message="docker_tag is required when docker_image is specified", 510 context={"docker_image": docker_image}, 511 ) 512 513 if unique: 514 existing = self.list_custom_source_definitions( 515 definition_type="yaml" if is_yaml else "docker", 516 ) 517 if any(d.name == name for d in existing): 518 raise exc.AirbyteDuplicateResourcesError( 519 resource_type="custom_source_definition", 520 resource_name=name, 521 ) 522 523 if is_yaml: 524 manifest_dict: dict[str, Any] 525 if isinstance(manifest_yaml, Path): 526 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 527 elif isinstance(manifest_yaml, str): 528 manifest_dict = yaml.safe_load(manifest_yaml) 529 elif manifest_yaml is not None: 530 manifest_dict = manifest_yaml 531 else: 532 raise exc.PyAirbyteInputError( 533 message="manifest_yaml is required for YAML connectors", 534 context={"name": name}, 535 ) 536 537 if pre_validate: 538 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 539 540 result = api_util.create_custom_yaml_source_definition( 541 name=name, 542 workspace_id=self.workspace_id, 543 manifest=manifest_dict, 544 api_root=self.api_root, 545 client_id=self.client_id, 546 client_secret=self.client_secret, 547 ) 548 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 549 550 raise NotImplementedError( 551 "Docker custom source definitions are not yet supported. " 552 "Only YAML manifest-based custom sources are currently available." 553 ) 554 555 def list_custom_source_definitions( 556 self, 557 *, 558 definition_type: Literal["yaml", "docker"], 559 ) -> list[CustomCloudSourceDefinition]: 560 """List custom source connector definitions. 561 562 Args: 563 definition_type: Connector type to list ("yaml" or "docker"). Required. 564 565 Returns: 566 List of CustomCloudSourceDefinition objects matching the specified type 567 """ 568 if definition_type == "yaml": 569 yaml_definitions = api_util.list_custom_yaml_source_definitions( 570 workspace_id=self.workspace_id, 571 api_root=self.api_root, 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 ) 575 return [ 576 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 577 for d in yaml_definitions 578 ] 579 580 raise NotImplementedError( 581 "Docker custom source definitions are not yet supported. " 582 "Only YAML manifest-based custom sources are currently available." 583 ) 584 585 def get_custom_source_definition( 586 self, 587 definition_id: str, 588 *, 589 definition_type: Literal["yaml", "docker"], 590 ) -> CustomCloudSourceDefinition: 591 """Get a specific custom source definition by ID. 592 593 Args: 594 definition_id: The definition ID 595 definition_type: Connector type ("yaml" or "docker"). Required. 596 597 Returns: 598 CustomCloudSourceDefinition object 599 """ 600 if definition_type == "yaml": 601 result = api_util.get_custom_yaml_source_definition( 602 workspace_id=self.workspace_id, 603 definition_id=definition_id, 604 api_root=self.api_root, 605 client_id=self.client_id, 606 client_secret=self.client_secret, 607 ) 608 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 609 610 raise NotImplementedError( 611 "Docker custom source definitions are not yet supported. " 612 "Only YAML manifest-based custom sources are currently available." 613 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
82 @property 83 def workspace_url(self) -> str | None: 84 """The web URL of the workspace.""" 85 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
89 def connect(self) -> None: 90 """Check that the workspace is reachable and raise an exception otherwise. 91 92 Note: It is not necessary to call this method before calling other operations. It 93 serves primarily as a simple check to ensure that the workspace is reachable 94 and credentials are correct. 95 """ 96 _ = api_util.get_workspace( 97 api_root=self.api_root, 98 workspace_id=self.workspace_id, 99 client_id=self.client_id, 100 client_secret=self.client_secret, 101 ) 102 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
106 def get_connection( 107 self, 108 connection_id: str, 109 ) -> CloudConnection: 110 """Get a connection by ID. 111 112 This method does not fetch data from the API. It returns a `CloudConnection` object, 113 which will be loaded lazily as needed. 114 """ 115 return CloudConnection( 116 workspace=self, 117 connection_id=connection_id, 118 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
120 def get_source( 121 self, 122 source_id: str, 123 ) -> CloudSource: 124 """Get a source by ID. 125 126 This method does not fetch data from the API. It returns a `CloudSource` object, 127 which will be loaded lazily as needed. 128 """ 129 return CloudSource( 130 workspace=self, 131 connector_id=source_id, 132 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
134 def get_destination( 135 self, 136 destination_id: str, 137 ) -> CloudDestination: 138 """Get a destination by ID. 139 140 This method does not fetch data from the API. It returns a `CloudDestination` object, 141 which will be loaded lazily as needed. 142 """ 143 return CloudDestination( 144 workspace=self, 145 connector_id=destination_id, 146 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
150 def deploy_source( 151 self, 152 name: str, 153 source: Source, 154 *, 155 unique: bool = True, 156 random_name_suffix: bool = False, 157 ) -> CloudSource: 158 """Deploy a source to the workspace. 159 160 Returns the newly deployed source. 161 162 Args: 163 name: The name to use when deploying. 164 source: The source object to deploy. 165 unique: Whether to require a unique name. If `True`, duplicate names 166 are not allowed. Defaults to `True`. 167 random_name_suffix: Whether to append a random suffix to the name. 168 """ 169 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 170 source_config_dict["sourceType"] = source.name.replace("source-", "") 171 172 if random_name_suffix: 173 name += f" (ID: {text_util.generate_random_suffix()})" 174 175 if unique: 176 existing = self.list_sources(name=name) 177 if existing: 178 raise exc.AirbyteDuplicateResourcesError( 179 resource_type="source", 180 resource_name=name, 181 ) 182 183 deployed_source = api_util.create_source( 184 name=name, 185 api_root=self.api_root, 186 workspace_id=self.workspace_id, 187 config=source_config_dict, 188 client_id=self.client_id, 189 client_secret=self.client_secret, 190 ) 191 return CloudSource( 192 workspace=self, 193 connector_id=deployed_source.source_id, 194 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
 - source: The source object to deploy.
 - unique:  Whether to require a unique name. If 
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
 
196 def deploy_destination( 197 self, 198 name: str, 199 destination: Destination | dict[str, Any], 200 *, 201 unique: bool = True, 202 random_name_suffix: bool = False, 203 ) -> CloudDestination: 204 """Deploy a destination to the workspace. 205 206 Returns the newly deployed destination ID. 207 208 Args: 209 name: The name to use when deploying. 210 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 211 dictionary of configuration values. 212 unique: Whether to require a unique name. If `True`, duplicate names 213 are not allowed. Defaults to `True`. 214 random_name_suffix: Whether to append a random suffix to the name. 215 """ 216 if isinstance(destination, Destination): 217 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 218 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 219 # raise ValueError(destination_conf_dict) 220 else: 221 destination_conf_dict = destination.copy() 222 if "destinationType" not in destination_conf_dict: 223 raise exc.PyAirbyteInputError( 224 message="Missing `destinationType` in configuration dictionary.", 225 ) 226 227 if random_name_suffix: 228 name += f" (ID: {text_util.generate_random_suffix()})" 229 230 if unique: 231 existing = self.list_destinations(name=name) 232 if existing: 233 raise exc.AirbyteDuplicateResourcesError( 234 resource_type="destination", 235 resource_name=name, 236 ) 237 238 deployed_destination = api_util.create_destination( 239 name=name, 240 api_root=self.api_root, 241 workspace_id=self.workspace_id, 242 config=destination_conf_dict, # Wants a dataclass but accepts dict 243 client_id=self.client_id, 244 client_secret=self.client_secret, 245 ) 246 return CloudDestination( 247 workspace=self, 248 connector_id=deployed_destination.destination_id, 249 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
 - destination:  The destination to deploy. Can be a local Airbyte 
Destinationobject or a dictionary of configuration values. - unique:  Whether to require a unique name. If 
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
 
251 def permanently_delete_source( 252 self, 253 source: str | CloudSource, 254 ) -> None: 255 """Delete a source from the workspace. 256 257 You can pass either the source ID `str` or a deployed `Source` object. 258 """ 259 if not isinstance(source, (str, CloudSource)): 260 raise exc.PyAirbyteInputError( 261 message="Invalid source type.", 262 input_value=type(source).__name__, 263 ) 264 265 api_util.delete_source( 266 source_id=source.connector_id if isinstance(source, CloudSource) else source, 267 api_root=self.api_root, 268 client_id=self.client_id, 269 client_secret=self.client_secret, 270 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
274 def permanently_delete_destination( 275 self, 276 destination: str | CloudDestination, 277 ) -> None: 278 """Delete a deployed destination from the workspace. 279 280 You can pass either the `Cache` class or the deployed destination ID as a `str`. 281 """ 282 if not isinstance(destination, (str, CloudDestination)): 283 raise exc.PyAirbyteInputError( 284 message="Invalid destination type.", 285 input_value=type(destination).__name__, 286 ) 287 288 api_util.delete_destination( 289 destination_id=( 290 destination if isinstance(destination, str) else destination.destination_id 291 ), 292 api_root=self.api_root, 293 client_id=self.client_id, 294 client_secret=self.client_secret, 295 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
299 def deploy_connection( 300 self, 301 connection_name: str, 302 *, 303 source: CloudSource | str, 304 selected_streams: list[str], 305 destination: CloudDestination | str, 306 table_prefix: str | None = None, 307 ) -> CloudConnection: 308 """Create a new connection between an already deployed source and destination. 309 310 Returns the newly deployed connection object. 311 312 Args: 313 connection_name: The name of the connection. 314 source: The deployed source. You can pass a source ID or a CloudSource object. 315 destination: The deployed destination. You can pass a destination ID or a 316 CloudDestination object. 317 table_prefix: Optional. The table prefix to use when syncing to the destination. 318 selected_streams: The selected stream names to sync within the connection. 319 """ 320 if not selected_streams: 321 raise exc.PyAirbyteInputError( 322 guidance="You must provide `selected_streams` when creating a connection." 323 ) 324 325 source_id: str = source if isinstance(source, str) else source.connector_id 326 destination_id: str = ( 327 destination if isinstance(destination, str) else destination.connector_id 328 ) 329 330 deployed_connection = api_util.create_connection( 331 name=connection_name, 332 source_id=source_id, 333 destination_id=destination_id, 334 api_root=self.api_root, 335 workspace_id=self.workspace_id, 336 selected_stream_names=selected_streams, 337 prefix=table_prefix or "", 338 client_id=self.client_id, 339 client_secret=self.client_secret, 340 ) 341 342 return CloudConnection( 343 workspace=self, 344 connection_id=deployed_connection.connection_id, 345 source=deployed_connection.source_id, 346 destination=deployed_connection.destination_id, 347 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
 - source: The deployed source. You can pass a source ID or a CloudSource object.
 - destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
 - table_prefix: Optional. The table prefix to use when syncing to the destination.
 - selected_streams: The selected stream names to sync within the connection.
 
349 def permanently_delete_connection( 350 self, 351 connection: str | CloudConnection, 352 *, 353 cascade_delete_source: bool = False, 354 cascade_delete_destination: bool = False, 355 ) -> None: 356 """Delete a deployed connection from the workspace.""" 357 if connection is None: 358 raise ValueError("No connection ID provided.") 359 360 if isinstance(connection, str): 361 connection = CloudConnection( 362 workspace=self, 363 connection_id=connection, 364 ) 365 366 api_util.delete_connection( 367 connection_id=connection.connection_id, 368 api_root=self.api_root, 369 workspace_id=self.workspace_id, 370 client_id=self.client_id, 371 client_secret=self.client_secret, 372 ) 373 374 if cascade_delete_source: 375 self.permanently_delete_source(source=connection.source_id) 376 if cascade_delete_destination: 377 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
381 def list_connections( 382 self, 383 name: str | None = None, 384 *, 385 name_filter: Callable | None = None, 386 ) -> list[CloudConnection]: 387 """List connections by name in the workspace. 388 389 TODO: Add pagination support 390 """ 391 connections = api_util.list_connections( 392 api_root=self.api_root, 393 workspace_id=self.workspace_id, 394 name=name, 395 name_filter=name_filter, 396 client_id=self.client_id, 397 client_secret=self.client_secret, 398 ) 399 return [ 400 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 401 workspace=self, 402 connection_response=connection, 403 ) 404 for connection in connections 405 if name is None or connection.name == name 406 ]
List connections by name in the workspace.
TODO: Add pagination support
408 def list_sources( 409 self, 410 name: str | None = None, 411 *, 412 name_filter: Callable | None = None, 413 ) -> list[CloudSource]: 414 """List all sources in the workspace. 415 416 TODO: Add pagination support 417 """ 418 sources = api_util.list_sources( 419 api_root=self.api_root, 420 workspace_id=self.workspace_id, 421 name=name, 422 name_filter=name_filter, 423 client_id=self.client_id, 424 client_secret=self.client_secret, 425 ) 426 return [ 427 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 428 workspace=self, 429 source_response=source, 430 ) 431 for source in sources 432 if name is None or source.name == name 433 ]
List all sources in the workspace.
TODO: Add pagination support
435 def list_destinations( 436 self, 437 name: str | None = None, 438 *, 439 name_filter: Callable | None = None, 440 ) -> list[CloudDestination]: 441 """List all destinations in the workspace. 442 443 TODO: Add pagination support 444 """ 445 destinations = api_util.list_destinations( 446 api_root=self.api_root, 447 workspace_id=self.workspace_id, 448 name=name, 449 name_filter=name_filter, 450 client_id=self.client_id, 451 client_secret=self.client_secret, 452 ) 453 return [ 454 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 455 workspace=self, 456 destination_response=destination, 457 ) 458 for destination in destinations 459 if name is None or destination.name == name 460 ]
List all destinations in the workspace.
TODO: Add pagination support
462 def publish_custom_source_definition( 463 self, 464 name: str, 465 *, 466 manifest_yaml: dict[str, Any] | Path | str | None = None, 467 docker_image: str | None = None, 468 docker_tag: str | None = None, 469 unique: bool = True, 470 pre_validate: bool = True, 471 ) -> CustomCloudSourceDefinition: 472 """Publish a custom source connector definition. 473 474 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 475 and docker_tag (for Docker connectors), but not both. 476 477 Args: 478 name: Display name for the connector definition 479 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 480 docker_image: Docker repository (e.g., 'airbyte/source-custom') 481 docker_tag: Docker image tag (e.g., '1.0.0') 482 unique: Whether to enforce name uniqueness 483 pre_validate: Whether to validate manifest client-side (YAML only) 484 485 Returns: 486 CustomCloudSourceDefinition object representing the created definition 487 488 Raises: 489 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 490 AirbyteDuplicateResourcesError: If unique=True and name already exists 491 """ 492 is_yaml = manifest_yaml is not None 493 is_docker = docker_image is not None 494 495 if is_yaml == is_docker: 496 raise exc.PyAirbyteInputError( 497 message=( 498 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 499 "docker_image + docker_tag (for Docker connectors), but not both" 500 ), 501 context={ 502 "manifest_yaml_provided": is_yaml, 503 "docker_image_provided": is_docker, 504 }, 505 ) 506 507 if is_docker and docker_tag is None: 508 raise exc.PyAirbyteInputError( 509 message="docker_tag is required when docker_image is specified", 510 context={"docker_image": docker_image}, 511 ) 512 513 if unique: 514 existing = self.list_custom_source_definitions( 515 definition_type="yaml" if is_yaml else "docker", 516 ) 517 if any(d.name == name for d in existing): 518 raise exc.AirbyteDuplicateResourcesError( 519 resource_type="custom_source_definition", 520 resource_name=name, 521 ) 522 523 if is_yaml: 524 manifest_dict: dict[str, Any] 525 if isinstance(manifest_yaml, Path): 526 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 527 elif isinstance(manifest_yaml, str): 528 manifest_dict = yaml.safe_load(manifest_yaml) 529 elif manifest_yaml is not None: 530 manifest_dict = manifest_yaml 531 else: 532 raise exc.PyAirbyteInputError( 533 message="manifest_yaml is required for YAML connectors", 534 context={"name": name}, 535 ) 536 537 if pre_validate: 538 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 539 540 result = api_util.create_custom_yaml_source_definition( 541 name=name, 542 workspace_id=self.workspace_id, 543 manifest=manifest_dict, 544 api_root=self.api_root, 545 client_id=self.client_id, 546 client_secret=self.client_secret, 547 ) 548 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 549 550 raise NotImplementedError( 551 "Docker custom source definitions are not yet supported. " 552 "Only YAML manifest-based custom sources are currently available." 553 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
 - manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
 - docker_image: Docker repository (e.g., 'airbyte/source-custom')
 - docker_tag: Docker image tag (e.g., '1.0.0')
 - unique: Whether to enforce name uniqueness
 - pre_validate: Whether to validate manifest client-side (YAML only)
 
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
 - AirbyteDuplicateResourcesError: If unique=True and name already exists
 
555 def list_custom_source_definitions( 556 self, 557 *, 558 definition_type: Literal["yaml", "docker"], 559 ) -> list[CustomCloudSourceDefinition]: 560 """List custom source connector definitions. 561 562 Args: 563 definition_type: Connector type to list ("yaml" or "docker"). Required. 564 565 Returns: 566 List of CustomCloudSourceDefinition objects matching the specified type 567 """ 568 if definition_type == "yaml": 569 yaml_definitions = api_util.list_custom_yaml_source_definitions( 570 workspace_id=self.workspace_id, 571 api_root=self.api_root, 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 ) 575 return [ 576 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 577 for d in yaml_definitions 578 ] 579 580 raise NotImplementedError( 581 "Docker custom source definitions are not yet supported. " 582 "Only YAML manifest-based custom sources are currently available." 583 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
 
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
585 def get_custom_source_definition( 586 self, 587 definition_id: str, 588 *, 589 definition_type: Literal["yaml", "docker"], 590 ) -> CustomCloudSourceDefinition: 591 """Get a specific custom source definition by ID. 592 593 Args: 594 definition_id: The definition ID 595 definition_type: Connector type ("yaml" or "docker"). Required. 596 597 Returns: 598 CustomCloudSourceDefinition object 599 """ 600 if definition_type == "yaml": 601 result = api_util.get_custom_yaml_source_definition( 602 workspace_id=self.workspace_id, 603 definition_id=definition_id, 604 api_root=self.api_root, 605 client_id=self.client_id, 606 client_secret=self.client_secret, 607 ) 608 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 609 610 raise NotImplementedError( 611 "Docker custom source definitions are not yet supported. " 612 "Only YAML manifest-based custom sources are currently available." 613 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
 - definition_type: Connector type ("yaml" or "docker"). Required.
 
Returns:
CustomCloudSourceDefinition object