airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.client_config import CloudClientConfig 57from airbyte.cloud.connections import CloudConnection 58from airbyte.cloud.constants import JobStatusEnum 59from airbyte.cloud.sync_results import SyncResult 60from airbyte.cloud.workspaces import CloudWorkspace 61 62 63# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 64if TYPE_CHECKING: 65 # ruff: noqa: TC004 66 from airbyte.cloud import client_config, connections, constants, sync_results, workspaces 67 68 69__all__ = [ 70 # Submodules 71 "workspaces", 72 "connections", 73 "constants", 74 "client_config", 75 "sync_results", 76 # Classes 77 "CloudWorkspace", 78 "CloudConnection", 79 "CloudClientConfig", 80 "SyncResult", 81 # Enums 82 "JobStatusEnum", 83]
88@dataclass 89class CloudWorkspace: 90 """A remote workspace on the Airbyte Cloud. 91 92 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 93 instances, both OSS and Enterprise. 94 95 Two authentication methods are supported (mutually exclusive): 96 1. OAuth2 client credentials (client_id + client_secret) 97 2. Bearer token authentication 98 99 Example with client credentials: 100 ```python 101 workspace = CloudWorkspace( 102 workspace_id="...", 103 client_id="...", 104 client_secret="...", 105 ) 106 ``` 107 108 Example with bearer token: 109 ```python 110 workspace = CloudWorkspace( 111 workspace_id="...", 112 bearer_token="...", 113 ) 114 ``` 115 """ 116 117 workspace_id: str 118 client_id: SecretString | None = None 119 client_secret: SecretString | None = None 120 api_root: str = api_util.CLOUD_API_ROOT 121 bearer_token: SecretString | None = None 122 123 # Internal credentials object (set in __post_init__, excluded from __init__) 124 _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) 125 126 def __post_init__(self) -> None: 127 """Validate and initialize credentials.""" 128 # Wrap secrets in SecretString if provided 129 if self.client_id is not None: 130 self.client_id = SecretString(self.client_id) 131 if self.client_secret is not None: 132 self.client_secret = SecretString(self.client_secret) 133 if self.bearer_token is not None: 134 self.bearer_token = SecretString(self.bearer_token) 135 136 # Create internal CloudClientConfig object (validates mutual exclusivity) 137 self._credentials = CloudClientConfig( 138 client_id=self.client_id, 139 client_secret=self.client_secret, 140 bearer_token=self.bearer_token, 141 api_root=self.api_root, 142 ) 143 144 @classmethod 145 def from_env( 146 cls, 147 workspace_id: str | None = None, 148 *, 149 api_root: str | None = None, 150 ) -> CloudWorkspace: 151 """Create a CloudWorkspace using credentials from environment variables. 152 153 This factory method resolves credentials from environment variables, 154 providing a convenient way to create a workspace without explicitly 155 passing credentials. 156 157 Two authentication methods are supported (mutually exclusive): 158 1. Bearer token (checked first) 159 2. OAuth2 client credentials (fallback) 160 161 Environment variables used: 162 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 163 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 164 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 165 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 166 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 167 168 Args: 169 workspace_id: The workspace ID. If not provided, will be resolved from 170 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 171 api_root: The API root URL. If not provided, will be resolved from 172 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 173 the Airbyte Cloud API. 174 175 Returns: 176 A CloudWorkspace instance configured with credentials from the environment. 177 178 Raises: 179 PyAirbyteSecretNotFoundError: If required credentials are not found in 180 the environment. 181 182 Example: 183 ```python 184 # With workspace_id from environment 185 workspace = CloudWorkspace.from_env() 186 187 # With explicit workspace_id 188 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 189 ``` 190 """ 191 resolved_api_root = resolve_cloud_api_url(api_root) 192 193 # Try bearer token first 194 bearer_token = resolve_cloud_bearer_token() 195 if bearer_token: 196 return cls( 197 workspace_id=resolve_cloud_workspace_id(workspace_id), 198 bearer_token=bearer_token, 199 api_root=resolved_api_root, 200 ) 201 202 # Fall back to client credentials 203 return cls( 204 workspace_id=resolve_cloud_workspace_id(workspace_id), 205 client_id=resolve_cloud_client_id(), 206 client_secret=resolve_cloud_client_secret(), 207 api_root=resolved_api_root, 208 ) 209 210 @property 211 def workspace_url(self) -> str | None: 212 """The web URL of the workspace.""" 213 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 214 215 @cached_property 216 def _organization_info(self) -> dict[str, Any]: 217 """Fetch and cache organization info for this workspace. 218 219 Uses the Config API endpoint for an efficient O(1) lookup. 220 This is an internal method; use get_organization() for public access. 221 """ 222 return api_util.get_workspace_organization_info( 223 workspace_id=self.workspace_id, 224 api_root=self.api_root, 225 client_id=self.client_id, 226 client_secret=self.client_secret, 227 bearer_token=self.bearer_token, 228 ) 229 230 @overload 231 def get_organization(self) -> CloudOrganization: ... 232 233 @overload 234 def get_organization( 235 self, 236 *, 237 raise_on_error: Literal[True], 238 ) -> CloudOrganization: ... 239 240 @overload 241 def get_organization( 242 self, 243 *, 244 raise_on_error: Literal[False], 245 ) -> CloudOrganization | None: ... 246 247 def get_organization( 248 self, 249 *, 250 raise_on_error: bool = True, 251 ) -> CloudOrganization | None: 252 """Get the organization this workspace belongs to. 253 254 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 255 which may not be available with workspace-scoped credentials. 256 257 Args: 258 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 259 If False, returns None instead of raising. 260 261 Returns: 262 CloudOrganization object with organization_id and organization_name, 263 or None if raise_on_error=False and an error occurred. 264 265 Raises: 266 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 267 (e.g., due to insufficient permissions or missing data). 268 """ 269 try: 270 info = self._organization_info 271 except (AirbyteError, NotImplementedError): 272 if raise_on_error: 273 raise 274 return None 275 276 organization_id = info.get("organizationId") 277 organization_name = info.get("organizationName") 278 279 # Validate that both organization_id and organization_name are non-null and non-empty 280 if not organization_id or not organization_name: 281 if raise_on_error: 282 raise AirbyteError( 283 message="Organization info is incomplete.", 284 context={ 285 "organization_id": organization_id, 286 "organization_name": organization_name, 287 }, 288 ) 289 return None 290 291 return CloudOrganization( 292 organization_id=organization_id, 293 organization_name=organization_name, 294 ) 295 296 # Test connection and creds 297 298 def connect(self) -> None: 299 """Check that the workspace is reachable and raise an exception otherwise. 300 301 Note: It is not necessary to call this method before calling other operations. It 302 serves primarily as a simple check to ensure that the workspace is reachable 303 and credentials are correct. 304 """ 305 _ = api_util.get_workspace( 306 api_root=self.api_root, 307 workspace_id=self.workspace_id, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 print(f"Successfully connected to workspace: {self.workspace_url}") 313 314 # Get sources, destinations, and connections 315 316 def get_connection( 317 self, 318 connection_id: str, 319 ) -> CloudConnection: 320 """Get a connection by ID. 321 322 This method does not fetch data from the API. It returns a `CloudConnection` object, 323 which will be loaded lazily as needed. 324 """ 325 return CloudConnection( 326 workspace=self, 327 connection_id=connection_id, 328 ) 329 330 def get_source( 331 self, 332 source_id: str, 333 ) -> CloudSource: 334 """Get a source by ID. 335 336 This method does not fetch data from the API. It returns a `CloudSource` object, 337 which will be loaded lazily as needed. 338 """ 339 return CloudSource( 340 workspace=self, 341 connector_id=source_id, 342 ) 343 344 def get_destination( 345 self, 346 destination_id: str, 347 ) -> CloudDestination: 348 """Get a destination by ID. 349 350 This method does not fetch data from the API. It returns a `CloudDestination` object, 351 which will be loaded lazily as needed. 352 """ 353 return CloudDestination( 354 workspace=self, 355 connector_id=destination_id, 356 ) 357 358 # Deploy sources and destinations 359 360 def deploy_source( 361 self, 362 name: str, 363 source: Source, 364 *, 365 unique: bool = True, 366 random_name_suffix: bool = False, 367 ) -> CloudSource: 368 """Deploy a source to the workspace. 369 370 Returns the newly deployed source. 371 372 Args: 373 name: The name to use when deploying. 374 source: The source object to deploy. 375 unique: Whether to require a unique name. If `True`, duplicate names 376 are not allowed. Defaults to `True`. 377 random_name_suffix: Whether to append a random suffix to the name. 378 """ 379 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 380 source_config_dict["sourceType"] = source.name.replace("source-", "") 381 382 if random_name_suffix: 383 name += f" (ID: {text_util.generate_random_suffix()})" 384 385 if unique: 386 existing = self.list_sources(name=name) 387 if existing: 388 raise exc.AirbyteDuplicateResourcesError( 389 resource_type="source", 390 resource_name=name, 391 ) 392 393 deployed_source = api_util.create_source( 394 name=name, 395 api_root=self.api_root, 396 workspace_id=self.workspace_id, 397 config=source_config_dict, 398 client_id=self.client_id, 399 client_secret=self.client_secret, 400 bearer_token=self.bearer_token, 401 ) 402 return CloudSource( 403 workspace=self, 404 connector_id=deployed_source.source_id, 405 ) 406 407 def deploy_destination( 408 self, 409 name: str, 410 destination: Destination | dict[str, Any], 411 *, 412 unique: bool = True, 413 random_name_suffix: bool = False, 414 ) -> CloudDestination: 415 """Deploy a destination to the workspace. 416 417 Returns the newly deployed destination ID. 418 419 Args: 420 name: The name to use when deploying. 421 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 422 dictionary of configuration values. 423 unique: Whether to require a unique name. If `True`, duplicate names 424 are not allowed. Defaults to `True`. 425 random_name_suffix: Whether to append a random suffix to the name. 426 """ 427 if isinstance(destination, Destination): 428 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 429 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 430 # raise ValueError(destination_conf_dict) 431 else: 432 destination_conf_dict = destination.copy() 433 if "destinationType" not in destination_conf_dict: 434 raise exc.PyAirbyteInputError( 435 message="Missing `destinationType` in configuration dictionary.", 436 ) 437 438 if random_name_suffix: 439 name += f" (ID: {text_util.generate_random_suffix()})" 440 441 if unique: 442 existing = self.list_destinations(name=name) 443 if existing: 444 raise exc.AirbyteDuplicateResourcesError( 445 resource_type="destination", 446 resource_name=name, 447 ) 448 449 deployed_destination = api_util.create_destination( 450 name=name, 451 api_root=self.api_root, 452 workspace_id=self.workspace_id, 453 config=destination_conf_dict, # Wants a dataclass but accepts dict 454 client_id=self.client_id, 455 client_secret=self.client_secret, 456 bearer_token=self.bearer_token, 457 ) 458 return CloudDestination( 459 workspace=self, 460 connector_id=deployed_destination.destination_id, 461 ) 462 463 def permanently_delete_source( 464 self, 465 source: str | CloudSource, 466 *, 467 safe_mode: bool = True, 468 ) -> None: 469 """Delete a source from the workspace. 470 471 You can pass either the source ID `str` or a deployed `Source` object. 472 473 Args: 474 source: The source ID or CloudSource object to delete 475 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 476 (case insensitive) to prevent accidental deletion. Defaults to True. 477 """ 478 if not isinstance(source, (str, CloudSource)): 479 raise exc.PyAirbyteInputError( 480 message="Invalid source type.", 481 input_value=type(source).__name__, 482 ) 483 484 api_util.delete_source( 485 source_id=source.connector_id if isinstance(source, CloudSource) else source, 486 source_name=source.name if isinstance(source, CloudSource) else None, 487 api_root=self.api_root, 488 client_id=self.client_id, 489 client_secret=self.client_secret, 490 bearer_token=self.bearer_token, 491 safe_mode=safe_mode, 492 ) 493 494 # Deploy and delete destinations 495 496 def permanently_delete_destination( 497 self, 498 destination: str | CloudDestination, 499 *, 500 safe_mode: bool = True, 501 ) -> None: 502 """Delete a deployed destination from the workspace. 503 504 You can pass either the `Cache` class or the deployed destination ID as a `str`. 505 506 Args: 507 destination: The destination ID or CloudDestination object to delete 508 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 509 (case insensitive) to prevent accidental deletion. Defaults to True. 510 """ 511 if not isinstance(destination, (str, CloudDestination)): 512 raise exc.PyAirbyteInputError( 513 message="Invalid destination type.", 514 input_value=type(destination).__name__, 515 ) 516 517 api_util.delete_destination( 518 destination_id=( 519 destination if isinstance(destination, str) else destination.destination_id 520 ), 521 destination_name=( 522 destination.name if isinstance(destination, CloudDestination) else None 523 ), 524 api_root=self.api_root, 525 client_id=self.client_id, 526 client_secret=self.client_secret, 527 bearer_token=self.bearer_token, 528 safe_mode=safe_mode, 529 ) 530 531 # Deploy and delete connections 532 533 def deploy_connection( 534 self, 535 connection_name: str, 536 *, 537 source: CloudSource | str, 538 selected_streams: list[str], 539 destination: CloudDestination | str, 540 table_prefix: str | None = None, 541 ) -> CloudConnection: 542 """Create a new connection between an already deployed source and destination. 543 544 Returns the newly deployed connection object. 545 546 Args: 547 connection_name: The name of the connection. 548 source: The deployed source. You can pass a source ID or a CloudSource object. 549 destination: The deployed destination. You can pass a destination ID or a 550 CloudDestination object. 551 table_prefix: Optional. The table prefix to use when syncing to the destination. 552 selected_streams: The selected stream names to sync within the connection. 553 """ 554 if not selected_streams: 555 raise exc.PyAirbyteInputError( 556 guidance="You must provide `selected_streams` when creating a connection." 557 ) 558 559 source_id: str = source if isinstance(source, str) else source.connector_id 560 destination_id: str = ( 561 destination if isinstance(destination, str) else destination.connector_id 562 ) 563 564 deployed_connection = api_util.create_connection( 565 name=connection_name, 566 source_id=source_id, 567 destination_id=destination_id, 568 api_root=self.api_root, 569 workspace_id=self.workspace_id, 570 selected_stream_names=selected_streams, 571 prefix=table_prefix or "", 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 bearer_token=self.bearer_token, 575 ) 576 577 return CloudConnection( 578 workspace=self, 579 connection_id=deployed_connection.connection_id, 580 source=deployed_connection.source_id, 581 destination=deployed_connection.destination_id, 582 ) 583 584 def permanently_delete_connection( 585 self, 586 connection: str | CloudConnection, 587 *, 588 cascade_delete_source: bool = False, 589 cascade_delete_destination: bool = False, 590 safe_mode: bool = True, 591 ) -> None: 592 """Delete a deployed connection from the workspace. 593 594 Args: 595 connection: The connection ID or CloudConnection object to delete 596 cascade_delete_source: If True, also delete the source after deleting the connection 597 cascade_delete_destination: If True, also delete the destination after deleting 598 the connection 599 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 600 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 601 to cascade deletes. 602 """ 603 if connection is None: 604 raise ValueError("No connection ID provided.") 605 606 if isinstance(connection, str): 607 connection = CloudConnection( 608 workspace=self, 609 connection_id=connection, 610 ) 611 612 api_util.delete_connection( 613 connection_id=connection.connection_id, 614 connection_name=connection.name, 615 api_root=self.api_root, 616 workspace_id=self.workspace_id, 617 client_id=self.client_id, 618 client_secret=self.client_secret, 619 bearer_token=self.bearer_token, 620 safe_mode=safe_mode, 621 ) 622 623 if cascade_delete_source: 624 self.permanently_delete_source( 625 source=connection.source_id, 626 safe_mode=safe_mode, 627 ) 628 if cascade_delete_destination: 629 self.permanently_delete_destination( 630 destination=connection.destination_id, 631 safe_mode=safe_mode, 632 ) 633 634 # List sources, destinations, and connections 635 636 def list_connections( 637 self, 638 name: str | None = None, 639 *, 640 name_filter: Callable | None = None, 641 ) -> list[CloudConnection]: 642 """List connections by name in the workspace. 643 644 TODO: Add pagination support 645 """ 646 connections = api_util.list_connections( 647 api_root=self.api_root, 648 workspace_id=self.workspace_id, 649 name=name, 650 name_filter=name_filter, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 ) 655 return [ 656 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 657 workspace=self, 658 connection_response=connection, 659 ) 660 for connection in connections 661 if name is None or connection.name == name 662 ] 663 664 def list_sources( 665 self, 666 name: str | None = None, 667 *, 668 name_filter: Callable | None = None, 669 ) -> list[CloudSource]: 670 """List all sources in the workspace. 671 672 TODO: Add pagination support 673 """ 674 sources = api_util.list_sources( 675 api_root=self.api_root, 676 workspace_id=self.workspace_id, 677 name=name, 678 name_filter=name_filter, 679 client_id=self.client_id, 680 client_secret=self.client_secret, 681 bearer_token=self.bearer_token, 682 ) 683 return [ 684 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 685 workspace=self, 686 source_response=source, 687 ) 688 for source in sources 689 if name is None or source.name == name 690 ] 691 692 def list_destinations( 693 self, 694 name: str | None = None, 695 *, 696 name_filter: Callable | None = None, 697 ) -> list[CloudDestination]: 698 """List all destinations in the workspace. 699 700 TODO: Add pagination support 701 """ 702 destinations = api_util.list_destinations( 703 api_root=self.api_root, 704 workspace_id=self.workspace_id, 705 name=name, 706 name_filter=name_filter, 707 client_id=self.client_id, 708 client_secret=self.client_secret, 709 bearer_token=self.bearer_token, 710 ) 711 return [ 712 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 713 workspace=self, 714 destination_response=destination, 715 ) 716 for destination in destinations 717 if name is None or destination.name == name 718 ] 719 720 def publish_custom_source_definition( 721 self, 722 name: str, 723 *, 724 manifest_yaml: dict[str, Any] | Path | str | None = None, 725 docker_image: str | None = None, 726 docker_tag: str | None = None, 727 unique: bool = True, 728 pre_validate: bool = True, 729 testing_values: dict[str, Any] | None = None, 730 ) -> CustomCloudSourceDefinition: 731 """Publish a custom source connector definition. 732 733 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 734 and docker_tag (for Docker connectors), but not both. 735 736 Args: 737 name: Display name for the connector definition 738 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 739 docker_image: Docker repository (e.g., 'airbyte/source-custom') 740 docker_tag: Docker image tag (e.g., '1.0.0') 741 unique: Whether to enforce name uniqueness 742 pre_validate: Whether to validate manifest client-side (YAML only) 743 testing_values: Optional configuration values to use for testing in the 744 Connector Builder UI. If provided, these values are stored as the complete 745 testing values object for the connector builder project (replaces any existing 746 values), allowing immediate test read operations. 747 748 Returns: 749 CustomCloudSourceDefinition object representing the created definition 750 751 Raises: 752 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 753 AirbyteDuplicateResourcesError: If unique=True and name already exists 754 """ 755 is_yaml = manifest_yaml is not None 756 is_docker = docker_image is not None 757 758 if is_yaml == is_docker: 759 raise exc.PyAirbyteInputError( 760 message=( 761 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 762 "docker_image + docker_tag (for Docker connectors), but not both" 763 ), 764 context={ 765 "manifest_yaml_provided": is_yaml, 766 "docker_image_provided": is_docker, 767 }, 768 ) 769 770 if is_docker and docker_tag is None: 771 raise exc.PyAirbyteInputError( 772 message="docker_tag is required when docker_image is specified", 773 context={"docker_image": docker_image}, 774 ) 775 776 if unique: 777 existing = self.list_custom_source_definitions( 778 definition_type="yaml" if is_yaml else "docker", 779 ) 780 if any(d.name == name for d in existing): 781 raise exc.AirbyteDuplicateResourcesError( 782 resource_type="custom_source_definition", 783 resource_name=name, 784 ) 785 786 if is_yaml: 787 manifest_dict: dict[str, Any] 788 if isinstance(manifest_yaml, Path): 789 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 790 elif isinstance(manifest_yaml, str): 791 manifest_dict = yaml.safe_load(manifest_yaml) 792 elif manifest_yaml is not None: 793 manifest_dict = manifest_yaml 794 else: 795 raise exc.PyAirbyteInputError( 796 message="manifest_yaml is required for YAML connectors", 797 context={"name": name}, 798 ) 799 800 if pre_validate: 801 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 802 803 result = api_util.create_custom_yaml_source_definition( 804 name=name, 805 workspace_id=self.workspace_id, 806 manifest=manifest_dict, 807 api_root=self.api_root, 808 client_id=self.client_id, 809 client_secret=self.client_secret, 810 bearer_token=self.bearer_token, 811 ) 812 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 813 self, result 814 ) 815 816 # Set testing values if provided 817 if testing_values is not None: 818 custom_definition.set_testing_values(testing_values) 819 820 return custom_definition 821 822 raise NotImplementedError( 823 "Docker custom source definitions are not yet supported. " 824 "Only YAML manifest-based custom sources are currently available." 825 ) 826 827 def list_custom_source_definitions( 828 self, 829 *, 830 definition_type: Literal["yaml", "docker"], 831 ) -> list[CustomCloudSourceDefinition]: 832 """List custom source connector definitions. 833 834 Args: 835 definition_type: Connector type to list ("yaml" or "docker"). Required. 836 837 Returns: 838 List of CustomCloudSourceDefinition objects matching the specified type 839 """ 840 if definition_type == "yaml": 841 yaml_definitions = api_util.list_custom_yaml_source_definitions( 842 workspace_id=self.workspace_id, 843 api_root=self.api_root, 844 client_id=self.client_id, 845 client_secret=self.client_secret, 846 bearer_token=self.bearer_token, 847 ) 848 return [ 849 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 850 for d in yaml_definitions 851 ] 852 853 raise NotImplementedError( 854 "Docker custom source definitions are not yet supported. " 855 "Only YAML manifest-based custom sources are currently available." 856 ) 857 858 def get_custom_source_definition( 859 self, 860 definition_id: str, 861 *, 862 definition_type: Literal["yaml", "docker"], 863 ) -> CustomCloudSourceDefinition: 864 """Get a specific custom source definition by ID. 865 866 Args: 867 definition_id: The definition ID 868 definition_type: Connector type ("yaml" or "docker"). Required. 869 870 Returns: 871 CustomCloudSourceDefinition object 872 """ 873 if definition_type == "yaml": 874 result = api_util.get_custom_yaml_source_definition( 875 workspace_id=self.workspace_id, 876 definition_id=definition_id, 877 api_root=self.api_root, 878 client_id=self.client_id, 879 client_secret=self.client_secret, 880 bearer_token=self.bearer_token, 881 ) 882 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
Two authentication methods are supported (mutually exclusive):
- OAuth2 client credentials (client_id + client_secret)
- Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", )
Example with bearer token:
workspace = CloudWorkspace( workspace_id="...", bearer_token="...", )
144 @classmethod 145 def from_env( 146 cls, 147 workspace_id: str | None = None, 148 *, 149 api_root: str | None = None, 150 ) -> CloudWorkspace: 151 """Create a CloudWorkspace using credentials from environment variables. 152 153 This factory method resolves credentials from environment variables, 154 providing a convenient way to create a workspace without explicitly 155 passing credentials. 156 157 Two authentication methods are supported (mutually exclusive): 158 1. Bearer token (checked first) 159 2. OAuth2 client credentials (fallback) 160 161 Environment variables used: 162 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 163 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 164 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 165 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 166 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 167 168 Args: 169 workspace_id: The workspace ID. If not provided, will be resolved from 170 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 171 api_root: The API root URL. If not provided, will be resolved from 172 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 173 the Airbyte Cloud API. 174 175 Returns: 176 A CloudWorkspace instance configured with credentials from the environment. 177 178 Raises: 179 PyAirbyteSecretNotFoundError: If required credentials are not found in 180 the environment. 181 182 Example: 183 ```python 184 # With workspace_id from environment 185 workspace = CloudWorkspace.from_env() 186 187 # With explicit workspace_id 188 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 189 ``` 190 """ 191 resolved_api_root = resolve_cloud_api_url(api_root) 192 193 # Try bearer token first 194 bearer_token = resolve_cloud_bearer_token() 195 if bearer_token: 196 return cls( 197 workspace_id=resolve_cloud_workspace_id(workspace_id), 198 bearer_token=bearer_token, 199 api_root=resolved_api_root, 200 ) 201 202 # Fall back to client credentials 203 return cls( 204 workspace_id=resolve_cloud_workspace_id(workspace_id), 205 client_id=resolve_cloud_client_id(), 206 client_secret=resolve_cloud_client_secret(), 207 api_root=resolved_api_root, 208 )
Create a CloudWorkspace using credentials from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.
Two authentication methods are supported (mutually exclusive):
- Bearer token (checked first)
- OAuth2 client credentials (fallback)
Environment variables used:
AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
- workspace_id: The workspace ID. If not provided, will be resolved from
the
AIRBYTE_CLOUD_WORKSPACE_IDenvironment variable. - api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API.
Returns:
A CloudWorkspace instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
210 @property 211 def workspace_url(self) -> str | None: 212 """The web URL of the workspace.""" 213 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
247 def get_organization( 248 self, 249 *, 250 raise_on_error: bool = True, 251 ) -> CloudOrganization | None: 252 """Get the organization this workspace belongs to. 253 254 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 255 which may not be available with workspace-scoped credentials. 256 257 Args: 258 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 259 If False, returns None instead of raising. 260 261 Returns: 262 CloudOrganization object with organization_id and organization_name, 263 or None if raise_on_error=False and an error occurred. 264 265 Raises: 266 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 267 (e.g., due to insufficient permissions or missing data). 268 """ 269 try: 270 info = self._organization_info 271 except (AirbyteError, NotImplementedError): 272 if raise_on_error: 273 raise 274 return None 275 276 organization_id = info.get("organizationId") 277 organization_name = info.get("organizationName") 278 279 # Validate that both organization_id and organization_name are non-null and non-empty 280 if not organization_id or not organization_name: 281 if raise_on_error: 282 raise AirbyteError( 283 message="Organization info is incomplete.", 284 context={ 285 "organization_id": organization_id, 286 "organization_name": organization_name, 287 }, 288 ) 289 return None 290 291 return CloudOrganization( 292 organization_id=organization_id, 293 organization_name=organization_name, 294 )
Get the organization this workspace belongs to.
Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.
Arguments:
- raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:
CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.
Raises:
- AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
298 def connect(self) -> None: 299 """Check that the workspace is reachable and raise an exception otherwise. 300 301 Note: It is not necessary to call this method before calling other operations. It 302 serves primarily as a simple check to ensure that the workspace is reachable 303 and credentials are correct. 304 """ 305 _ = api_util.get_workspace( 306 api_root=self.api_root, 307 workspace_id=self.workspace_id, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
316 def get_connection( 317 self, 318 connection_id: str, 319 ) -> CloudConnection: 320 """Get a connection by ID. 321 322 This method does not fetch data from the API. It returns a `CloudConnection` object, 323 which will be loaded lazily as needed. 324 """ 325 return CloudConnection( 326 workspace=self, 327 connection_id=connection_id, 328 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
330 def get_source( 331 self, 332 source_id: str, 333 ) -> CloudSource: 334 """Get a source by ID. 335 336 This method does not fetch data from the API. It returns a `CloudSource` object, 337 which will be loaded lazily as needed. 338 """ 339 return CloudSource( 340 workspace=self, 341 connector_id=source_id, 342 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
344 def get_destination( 345 self, 346 destination_id: str, 347 ) -> CloudDestination: 348 """Get a destination by ID. 349 350 This method does not fetch data from the API. It returns a `CloudDestination` object, 351 which will be loaded lazily as needed. 352 """ 353 return CloudDestination( 354 workspace=self, 355 connector_id=destination_id, 356 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
360 def deploy_source( 361 self, 362 name: str, 363 source: Source, 364 *, 365 unique: bool = True, 366 random_name_suffix: bool = False, 367 ) -> CloudSource: 368 """Deploy a source to the workspace. 369 370 Returns the newly deployed source. 371 372 Args: 373 name: The name to use when deploying. 374 source: The source object to deploy. 375 unique: Whether to require a unique name. If `True`, duplicate names 376 are not allowed. Defaults to `True`. 377 random_name_suffix: Whether to append a random suffix to the name. 378 """ 379 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 380 source_config_dict["sourceType"] = source.name.replace("source-", "") 381 382 if random_name_suffix: 383 name += f" (ID: {text_util.generate_random_suffix()})" 384 385 if unique: 386 existing = self.list_sources(name=name) 387 if existing: 388 raise exc.AirbyteDuplicateResourcesError( 389 resource_type="source", 390 resource_name=name, 391 ) 392 393 deployed_source = api_util.create_source( 394 name=name, 395 api_root=self.api_root, 396 workspace_id=self.workspace_id, 397 config=source_config_dict, 398 client_id=self.client_id, 399 client_secret=self.client_secret, 400 bearer_token=self.bearer_token, 401 ) 402 return CloudSource( 403 workspace=self, 404 connector_id=deployed_source.source_id, 405 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
407 def deploy_destination( 408 self, 409 name: str, 410 destination: Destination | dict[str, Any], 411 *, 412 unique: bool = True, 413 random_name_suffix: bool = False, 414 ) -> CloudDestination: 415 """Deploy a destination to the workspace. 416 417 Returns the newly deployed destination ID. 418 419 Args: 420 name: The name to use when deploying. 421 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 422 dictionary of configuration values. 423 unique: Whether to require a unique name. If `True`, duplicate names 424 are not allowed. Defaults to `True`. 425 random_name_suffix: Whether to append a random suffix to the name. 426 """ 427 if isinstance(destination, Destination): 428 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 429 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 430 # raise ValueError(destination_conf_dict) 431 else: 432 destination_conf_dict = destination.copy() 433 if "destinationType" not in destination_conf_dict: 434 raise exc.PyAirbyteInputError( 435 message="Missing `destinationType` in configuration dictionary.", 436 ) 437 438 if random_name_suffix: 439 name += f" (ID: {text_util.generate_random_suffix()})" 440 441 if unique: 442 existing = self.list_destinations(name=name) 443 if existing: 444 raise exc.AirbyteDuplicateResourcesError( 445 resource_type="destination", 446 resource_name=name, 447 ) 448 449 deployed_destination = api_util.create_destination( 450 name=name, 451 api_root=self.api_root, 452 workspace_id=self.workspace_id, 453 config=destination_conf_dict, # Wants a dataclass but accepts dict 454 client_id=self.client_id, 455 client_secret=self.client_secret, 456 bearer_token=self.bearer_token, 457 ) 458 return CloudDestination( 459 workspace=self, 460 connector_id=deployed_destination.destination_id, 461 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
463 def permanently_delete_source( 464 self, 465 source: str | CloudSource, 466 *, 467 safe_mode: bool = True, 468 ) -> None: 469 """Delete a source from the workspace. 470 471 You can pass either the source ID `str` or a deployed `Source` object. 472 473 Args: 474 source: The source ID or CloudSource object to delete 475 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 476 (case insensitive) to prevent accidental deletion. Defaults to True. 477 """ 478 if not isinstance(source, (str, CloudSource)): 479 raise exc.PyAirbyteInputError( 480 message="Invalid source type.", 481 input_value=type(source).__name__, 482 ) 483 484 api_util.delete_source( 485 source_id=source.connector_id if isinstance(source, CloudSource) else source, 486 source_name=source.name if isinstance(source, CloudSource) else None, 487 api_root=self.api_root, 488 client_id=self.client_id, 489 client_secret=self.client_secret, 490 bearer_token=self.bearer_token, 491 safe_mode=safe_mode, 492 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
496 def permanently_delete_destination( 497 self, 498 destination: str | CloudDestination, 499 *, 500 safe_mode: bool = True, 501 ) -> None: 502 """Delete a deployed destination from the workspace. 503 504 You can pass either the `Cache` class or the deployed destination ID as a `str`. 505 506 Args: 507 destination: The destination ID or CloudDestination object to delete 508 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 509 (case insensitive) to prevent accidental deletion. Defaults to True. 510 """ 511 if not isinstance(destination, (str, CloudDestination)): 512 raise exc.PyAirbyteInputError( 513 message="Invalid destination type.", 514 input_value=type(destination).__name__, 515 ) 516 517 api_util.delete_destination( 518 destination_id=( 519 destination if isinstance(destination, str) else destination.destination_id 520 ), 521 destination_name=( 522 destination.name if isinstance(destination, CloudDestination) else None 523 ), 524 api_root=self.api_root, 525 client_id=self.client_id, 526 client_secret=self.client_secret, 527 bearer_token=self.bearer_token, 528 safe_mode=safe_mode, 529 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
533 def deploy_connection( 534 self, 535 connection_name: str, 536 *, 537 source: CloudSource | str, 538 selected_streams: list[str], 539 destination: CloudDestination | str, 540 table_prefix: str | None = None, 541 ) -> CloudConnection: 542 """Create a new connection between an already deployed source and destination. 543 544 Returns the newly deployed connection object. 545 546 Args: 547 connection_name: The name of the connection. 548 source: The deployed source. You can pass a source ID or a CloudSource object. 549 destination: The deployed destination. You can pass a destination ID or a 550 CloudDestination object. 551 table_prefix: Optional. The table prefix to use when syncing to the destination. 552 selected_streams: The selected stream names to sync within the connection. 553 """ 554 if not selected_streams: 555 raise exc.PyAirbyteInputError( 556 guidance="You must provide `selected_streams` when creating a connection." 557 ) 558 559 source_id: str = source if isinstance(source, str) else source.connector_id 560 destination_id: str = ( 561 destination if isinstance(destination, str) else destination.connector_id 562 ) 563 564 deployed_connection = api_util.create_connection( 565 name=connection_name, 566 source_id=source_id, 567 destination_id=destination_id, 568 api_root=self.api_root, 569 workspace_id=self.workspace_id, 570 selected_stream_names=selected_streams, 571 prefix=table_prefix or "", 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 bearer_token=self.bearer_token, 575 ) 576 577 return CloudConnection( 578 workspace=self, 579 connection_id=deployed_connection.connection_id, 580 source=deployed_connection.source_id, 581 destination=deployed_connection.destination_id, 582 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
584 def permanently_delete_connection( 585 self, 586 connection: str | CloudConnection, 587 *, 588 cascade_delete_source: bool = False, 589 cascade_delete_destination: bool = False, 590 safe_mode: bool = True, 591 ) -> None: 592 """Delete a deployed connection from the workspace. 593 594 Args: 595 connection: The connection ID or CloudConnection object to delete 596 cascade_delete_source: If True, also delete the source after deleting the connection 597 cascade_delete_destination: If True, also delete the destination after deleting 598 the connection 599 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 600 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 601 to cascade deletes. 602 """ 603 if connection is None: 604 raise ValueError("No connection ID provided.") 605 606 if isinstance(connection, str): 607 connection = CloudConnection( 608 workspace=self, 609 connection_id=connection, 610 ) 611 612 api_util.delete_connection( 613 connection_id=connection.connection_id, 614 connection_name=connection.name, 615 api_root=self.api_root, 616 workspace_id=self.workspace_id, 617 client_id=self.client_id, 618 client_secret=self.client_secret, 619 bearer_token=self.bearer_token, 620 safe_mode=safe_mode, 621 ) 622 623 if cascade_delete_source: 624 self.permanently_delete_source( 625 source=connection.source_id, 626 safe_mode=safe_mode, 627 ) 628 if cascade_delete_destination: 629 self.permanently_delete_destination( 630 destination=connection.destination_id, 631 safe_mode=safe_mode, 632 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
636 def list_connections( 637 self, 638 name: str | None = None, 639 *, 640 name_filter: Callable | None = None, 641 ) -> list[CloudConnection]: 642 """List connections by name in the workspace. 643 644 TODO: Add pagination support 645 """ 646 connections = api_util.list_connections( 647 api_root=self.api_root, 648 workspace_id=self.workspace_id, 649 name=name, 650 name_filter=name_filter, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 ) 655 return [ 656 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 657 workspace=self, 658 connection_response=connection, 659 ) 660 for connection in connections 661 if name is None or connection.name == name 662 ]
List connections by name in the workspace.
TODO: Add pagination support
664 def list_sources( 665 self, 666 name: str | None = None, 667 *, 668 name_filter: Callable | None = None, 669 ) -> list[CloudSource]: 670 """List all sources in the workspace. 671 672 TODO: Add pagination support 673 """ 674 sources = api_util.list_sources( 675 api_root=self.api_root, 676 workspace_id=self.workspace_id, 677 name=name, 678 name_filter=name_filter, 679 client_id=self.client_id, 680 client_secret=self.client_secret, 681 bearer_token=self.bearer_token, 682 ) 683 return [ 684 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 685 workspace=self, 686 source_response=source, 687 ) 688 for source in sources 689 if name is None or source.name == name 690 ]
List all sources in the workspace.
TODO: Add pagination support
692 def list_destinations( 693 self, 694 name: str | None = None, 695 *, 696 name_filter: Callable | None = None, 697 ) -> list[CloudDestination]: 698 """List all destinations in the workspace. 699 700 TODO: Add pagination support 701 """ 702 destinations = api_util.list_destinations( 703 api_root=self.api_root, 704 workspace_id=self.workspace_id, 705 name=name, 706 name_filter=name_filter, 707 client_id=self.client_id, 708 client_secret=self.client_secret, 709 bearer_token=self.bearer_token, 710 ) 711 return [ 712 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 713 workspace=self, 714 destination_response=destination, 715 ) 716 for destination in destinations 717 if name is None or destination.name == name 718 ]
List all destinations in the workspace.
TODO: Add pagination support
720 def publish_custom_source_definition( 721 self, 722 name: str, 723 *, 724 manifest_yaml: dict[str, Any] | Path | str | None = None, 725 docker_image: str | None = None, 726 docker_tag: str | None = None, 727 unique: bool = True, 728 pre_validate: bool = True, 729 testing_values: dict[str, Any] | None = None, 730 ) -> CustomCloudSourceDefinition: 731 """Publish a custom source connector definition. 732 733 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 734 and docker_tag (for Docker connectors), but not both. 735 736 Args: 737 name: Display name for the connector definition 738 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 739 docker_image: Docker repository (e.g., 'airbyte/source-custom') 740 docker_tag: Docker image tag (e.g., '1.0.0') 741 unique: Whether to enforce name uniqueness 742 pre_validate: Whether to validate manifest client-side (YAML only) 743 testing_values: Optional configuration values to use for testing in the 744 Connector Builder UI. If provided, these values are stored as the complete 745 testing values object for the connector builder project (replaces any existing 746 values), allowing immediate test read operations. 747 748 Returns: 749 CustomCloudSourceDefinition object representing the created definition 750 751 Raises: 752 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 753 AirbyteDuplicateResourcesError: If unique=True and name already exists 754 """ 755 is_yaml = manifest_yaml is not None 756 is_docker = docker_image is not None 757 758 if is_yaml == is_docker: 759 raise exc.PyAirbyteInputError( 760 message=( 761 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 762 "docker_image + docker_tag (for Docker connectors), but not both" 763 ), 764 context={ 765 "manifest_yaml_provided": is_yaml, 766 "docker_image_provided": is_docker, 767 }, 768 ) 769 770 if is_docker and docker_tag is None: 771 raise exc.PyAirbyteInputError( 772 message="docker_tag is required when docker_image is specified", 773 context={"docker_image": docker_image}, 774 ) 775 776 if unique: 777 existing = self.list_custom_source_definitions( 778 definition_type="yaml" if is_yaml else "docker", 779 ) 780 if any(d.name == name for d in existing): 781 raise exc.AirbyteDuplicateResourcesError( 782 resource_type="custom_source_definition", 783 resource_name=name, 784 ) 785 786 if is_yaml: 787 manifest_dict: dict[str, Any] 788 if isinstance(manifest_yaml, Path): 789 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 790 elif isinstance(manifest_yaml, str): 791 manifest_dict = yaml.safe_load(manifest_yaml) 792 elif manifest_yaml is not None: 793 manifest_dict = manifest_yaml 794 else: 795 raise exc.PyAirbyteInputError( 796 message="manifest_yaml is required for YAML connectors", 797 context={"name": name}, 798 ) 799 800 if pre_validate: 801 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 802 803 result = api_util.create_custom_yaml_source_definition( 804 name=name, 805 workspace_id=self.workspace_id, 806 manifest=manifest_dict, 807 api_root=self.api_root, 808 client_id=self.client_id, 809 client_secret=self.client_secret, 810 bearer_token=self.bearer_token, 811 ) 812 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 813 self, result 814 ) 815 816 # Set testing values if provided 817 if testing_values is not None: 818 custom_definition.set_testing_values(testing_values) 819 820 return custom_definition 821 822 raise NotImplementedError( 823 "Docker custom source definitions are not yet supported. " 824 "Only YAML manifest-based custom sources are currently available." 825 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
827 def list_custom_source_definitions( 828 self, 829 *, 830 definition_type: Literal["yaml", "docker"], 831 ) -> list[CustomCloudSourceDefinition]: 832 """List custom source connector definitions. 833 834 Args: 835 definition_type: Connector type to list ("yaml" or "docker"). Required. 836 837 Returns: 838 List of CustomCloudSourceDefinition objects matching the specified type 839 """ 840 if definition_type == "yaml": 841 yaml_definitions = api_util.list_custom_yaml_source_definitions( 842 workspace_id=self.workspace_id, 843 api_root=self.api_root, 844 client_id=self.client_id, 845 client_secret=self.client_secret, 846 bearer_token=self.bearer_token, 847 ) 848 return [ 849 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 850 for d in yaml_definitions 851 ] 852 853 raise NotImplementedError( 854 "Docker custom source definitions are not yet supported. " 855 "Only YAML manifest-based custom sources are currently available." 856 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
858 def get_custom_source_definition( 859 self, 860 definition_id: str, 861 *, 862 definition_type: Literal["yaml", "docker"], 863 ) -> CustomCloudSourceDefinition: 864 """Get a specific custom source definition by ID. 865 866 Args: 867 definition_id: The definition ID 868 definition_type: Connector type ("yaml" or "docker"). Required. 869 870 Returns: 871 CustomCloudSourceDefinition object 872 """ 873 if definition_type == "yaml": 874 result = api_util.get_custom_yaml_source_definition( 875 workspace_id=self.workspace_id, 876 definition_id=definition_id, 877 api_root=self.api_root, 878 client_id=self.client_id, 879 client_secret=self.client_secret, 880 bearer_token=self.bearer_token, 881 ) 882 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object
21class CloudConnection: # noqa: PLR0904 # Too many public methods 22 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 23 24 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 25 """ 26 27 def __init__( 28 self, 29 workspace: CloudWorkspace, 30 connection_id: str, 31 source: str | None = None, 32 destination: str | None = None, 33 ) -> None: 34 """It is not recommended to create a `CloudConnection` object directly. 35 36 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 37 """ 38 self.connection_id = connection_id 39 """The ID of the connection.""" 40 41 self.workspace = workspace 42 """The workspace that the connection belongs to.""" 43 44 self._source_id = source 45 """The ID of the source.""" 46 47 self._destination_id = destination 48 """The ID of the destination.""" 49 50 self._connection_info: ConnectionResponse | None = None 51 """The connection info object. (Cached.)""" 52 53 self._cloud_source_object: CloudSource | None = None 54 """The source object. (Cached.)""" 55 56 self._cloud_destination_object: CloudDestination | None = None 57 """The destination object. (Cached.)""" 58 59 def _fetch_connection_info( 60 self, 61 *, 62 force_refresh: bool = False, 63 verify: bool = True, 64 ) -> ConnectionResponse: 65 """Fetch and cache connection info from the API. 66 67 By default, this method will only fetch from the API if connection info is not 68 already cached. It also verifies that the connection belongs to the expected 69 workspace unless verification is explicitly disabled. 70 71 Args: 72 force_refresh: If True, always fetch from the API even if cached. 73 If False (default), only fetch if not already cached. 74 verify: If True (default), verify that the connection is valid (e.g., that 75 the workspace_id matches this object's workspace). Raises an error if 76 validation fails. 77 78 Returns: 79 The ConnectionResponse from the API. 80 81 Raises: 82 AirbyteWorkspaceMismatchError: If verify is True and the connection's 83 workspace_id doesn't match the expected workspace. 84 AirbyteMissingResourceError: If the connection doesn't exist. 85 """ 86 if not force_refresh and self._connection_info is not None: 87 # Use cached info, but still verify if requested 88 if verify: 89 self._verify_workspace_match(self._connection_info) 90 return self._connection_info 91 92 # Fetch from API 93 connection_info = api_util.get_connection( 94 workspace_id=self.workspace.workspace_id, 95 connection_id=self.connection_id, 96 api_root=self.workspace.api_root, 97 client_id=self.workspace.client_id, 98 client_secret=self.workspace.client_secret, 99 bearer_token=self.workspace.bearer_token, 100 ) 101 102 # Cache the result first (before verification may raise) 103 self._connection_info = connection_info 104 105 # Verify if requested 106 if verify: 107 self._verify_workspace_match(connection_info) 108 109 return connection_info 110 111 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 112 """Verify that the connection belongs to the expected workspace. 113 114 Raises: 115 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 116 """ 117 if connection_info.workspace_id != self.workspace.workspace_id: 118 raise AirbyteWorkspaceMismatchError( 119 resource_type="connection", 120 resource_id=self.connection_id, 121 workspace=self.workspace, 122 expected_workspace_id=self.workspace.workspace_id, 123 actual_workspace_id=connection_info.workspace_id, 124 message=( 125 f"Connection '{self.connection_id}' belongs to workspace " 126 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 127 ), 128 ) 129 130 def check_is_valid(self) -> bool: 131 """Check if this connection exists and belongs to the expected workspace. 132 133 This method fetches connection info from the API (if not already cached) and 134 verifies that the connection's workspace_id matches the workspace associated 135 with this CloudConnection object. 136 137 Returns: 138 True if the connection exists and belongs to the expected workspace. 139 140 Raises: 141 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 142 AirbyteMissingResourceError: If the connection doesn't exist. 143 """ 144 self._fetch_connection_info(force_refresh=False, verify=True) 145 return True 146 147 @classmethod 148 def _from_connection_response( 149 cls, 150 workspace: CloudWorkspace, 151 connection_response: ConnectionResponse, 152 ) -> CloudConnection: 153 """Create a CloudConnection from a ConnectionResponse.""" 154 result = cls( 155 workspace=workspace, 156 connection_id=connection_response.connection_id, 157 source=connection_response.source_id, 158 destination=connection_response.destination_id, 159 ) 160 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 161 return result 162 163 # Properties 164 165 @property 166 def name(self) -> str | None: 167 """Get the display name of the connection, if available. 168 169 E.g. "My Postgres to Snowflake", not the connection ID. 170 """ 171 if not self._connection_info: 172 self._connection_info = self._fetch_connection_info() 173 174 return self._connection_info.name 175 176 @property 177 def source_id(self) -> str: 178 """The ID of the source.""" 179 if not self._source_id: 180 if not self._connection_info: 181 self._connection_info = self._fetch_connection_info() 182 183 self._source_id = self._connection_info.source_id 184 185 return self._source_id 186 187 @property 188 def source(self) -> CloudSource: 189 """Get the source object.""" 190 if self._cloud_source_object: 191 return self._cloud_source_object 192 193 self._cloud_source_object = CloudSource( 194 workspace=self.workspace, 195 connector_id=self.source_id, 196 ) 197 return self._cloud_source_object 198 199 @property 200 def destination_id(self) -> str: 201 """The ID of the destination.""" 202 if not self._destination_id: 203 if not self._connection_info: 204 self._connection_info = self._fetch_connection_info() 205 206 self._destination_id = self._connection_info.destination_id 207 208 return self._destination_id 209 210 @property 211 def destination(self) -> CloudDestination: 212 """Get the destination object.""" 213 if self._cloud_destination_object: 214 return self._cloud_destination_object 215 216 self._cloud_destination_object = CloudDestination( 217 workspace=self.workspace, 218 connector_id=self.destination_id, 219 ) 220 return self._cloud_destination_object 221 222 @property 223 def stream_names(self) -> list[str]: 224 """The stream names.""" 225 if not self._connection_info: 226 self._connection_info = self._fetch_connection_info() 227 228 return [stream.name for stream in self._connection_info.configurations.streams or []] 229 230 @property 231 def table_prefix(self) -> str: 232 """The table prefix.""" 233 if not self._connection_info: 234 self._connection_info = self._fetch_connection_info() 235 236 return self._connection_info.prefix or "" 237 238 @property 239 def connection_url(self) -> str | None: 240 """The web URL to the connection.""" 241 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 242 243 @property 244 def job_history_url(self) -> str | None: 245 """The URL to the job history for the connection.""" 246 return f"{self.connection_url}/timeline" 247 248 # Run Sync 249 250 def run_sync( 251 self, 252 *, 253 wait: bool = True, 254 wait_timeout: int = 300, 255 ) -> SyncResult: 256 """Run a sync.""" 257 connection_response = api_util.run_connection( 258 connection_id=self.connection_id, 259 api_root=self.workspace.api_root, 260 workspace_id=self.workspace.workspace_id, 261 client_id=self.workspace.client_id, 262 client_secret=self.workspace.client_secret, 263 bearer_token=self.workspace.bearer_token, 264 ) 265 sync_result = SyncResult( 266 workspace=self.workspace, 267 connection=self, 268 job_id=connection_response.job_id, 269 ) 270 271 if wait: 272 sync_result.wait_for_completion( 273 wait_timeout=wait_timeout, 274 raise_failure=True, 275 raise_timeout=True, 276 ) 277 278 return sync_result 279 280 def __repr__(self) -> str: 281 """String representation of the connection.""" 282 return ( 283 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 284 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 285 ) 286 287 # Logs 288 289 def get_previous_sync_logs( 290 self, 291 *, 292 limit: int = 20, 293 offset: int | None = None, 294 from_tail: bool = True, 295 ) -> list[SyncResult]: 296 """Get previous sync jobs for a connection with pagination support. 297 298 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 299 rows_synced, start_time). Full log text can be fetched lazily via 300 `SyncResult.get_full_log_text()`. 301 302 Args: 303 limit: Maximum number of jobs to return. Defaults to 20. 304 offset: Number of jobs to skip from the beginning. Defaults to None (0). 305 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 306 If False, returns jobs ordered oldest-first (createdAt ASC). 307 Defaults to True. 308 309 Returns: 310 A list of SyncResult objects representing the sync jobs. 311 """ 312 order_by = ( 313 api_util.JOB_ORDER_BY_CREATED_AT_DESC 314 if from_tail 315 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 316 ) 317 sync_logs: list[JobResponse] = api_util.get_job_logs( 318 connection_id=self.connection_id, 319 api_root=self.workspace.api_root, 320 workspace_id=self.workspace.workspace_id, 321 limit=limit, 322 offset=offset, 323 order_by=order_by, 324 client_id=self.workspace.client_id, 325 client_secret=self.workspace.client_secret, 326 bearer_token=self.workspace.bearer_token, 327 ) 328 return [ 329 SyncResult( 330 workspace=self.workspace, 331 connection=self, 332 job_id=sync_log.job_id, 333 _latest_job_info=sync_log, 334 ) 335 for sync_log in sync_logs 336 ] 337 338 def get_sync_result( 339 self, 340 job_id: int | None = None, 341 ) -> SyncResult | None: 342 """Get the sync result for the connection. 343 344 If `job_id` is not provided, the most recent sync job will be used. 345 346 Returns `None` if job_id is omitted and no previous jobs are found. 347 """ 348 if job_id is None: 349 # Get the most recent sync job 350 results = self.get_previous_sync_logs( 351 limit=1, 352 ) 353 if results: 354 return results[0] 355 356 return None 357 358 # Get the sync job by ID (lazy loaded) 359 return SyncResult( 360 workspace=self.workspace, 361 connection=self, 362 job_id=job_id, 363 ) 364 365 # Artifacts 366 367 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 368 """Get the connection state artifacts. 369 370 Returns the persisted state for this connection, which can be used 371 when debugging incremental syncs. 372 373 Uses the Config API endpoint: POST /v1/state/get 374 375 Returns: 376 List of state objects for each stream, or None if no state is set. 377 """ 378 state_response = api_util.get_connection_state( 379 connection_id=self.connection_id, 380 api_root=self.workspace.api_root, 381 client_id=self.workspace.client_id, 382 client_secret=self.workspace.client_secret, 383 bearer_token=self.workspace.bearer_token, 384 ) 385 if state_response.get("stateType") == "not_set": 386 return None 387 return state_response.get("streamState", []) 388 389 def get_catalog_artifact(self) -> dict[str, Any] | None: 390 """Get the configured catalog for this connection. 391 392 Returns the full configured catalog (syncCatalog) for this connection, 393 including stream schemas, sync modes, cursor fields, and primary keys. 394 395 Uses the Config API endpoint: POST /v1/web_backend/connections/get 396 397 Returns: 398 Dictionary containing the configured catalog, or `None` if not found. 399 """ 400 connection_response = api_util.get_connection_catalog( 401 connection_id=self.connection_id, 402 api_root=self.workspace.api_root, 403 client_id=self.workspace.client_id, 404 client_secret=self.workspace.client_secret, 405 bearer_token=self.workspace.bearer_token, 406 ) 407 return connection_response.get("syncCatalog") 408 409 def rename(self, name: str) -> CloudConnection: 410 """Rename the connection. 411 412 Args: 413 name: New name for the connection 414 415 Returns: 416 Updated CloudConnection object with refreshed info 417 """ 418 updated_response = api_util.patch_connection( 419 connection_id=self.connection_id, 420 api_root=self.workspace.api_root, 421 client_id=self.workspace.client_id, 422 client_secret=self.workspace.client_secret, 423 bearer_token=self.workspace.bearer_token, 424 name=name, 425 ) 426 self._connection_info = updated_response 427 return self 428 429 def set_table_prefix(self, prefix: str) -> CloudConnection: 430 """Set the table prefix for the connection. 431 432 Args: 433 prefix: New table prefix to use when syncing to the destination 434 435 Returns: 436 Updated CloudConnection object with refreshed info 437 """ 438 updated_response = api_util.patch_connection( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 prefix=prefix, 445 ) 446 self._connection_info = updated_response 447 return self 448 449 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 450 """Set the selected streams for the connection. 451 452 This is a destructive operation that can break existing connections if the 453 stream selection is changed incorrectly. Use with caution. 454 455 Args: 456 stream_names: List of stream names to sync 457 458 Returns: 459 Updated CloudConnection object with refreshed info 460 """ 461 configurations = api_util.build_stream_configurations(stream_names) 462 463 updated_response = api_util.patch_connection( 464 connection_id=self.connection_id, 465 api_root=self.workspace.api_root, 466 client_id=self.workspace.client_id, 467 client_secret=self.workspace.client_secret, 468 bearer_token=self.workspace.bearer_token, 469 configurations=configurations, 470 ) 471 self._connection_info = updated_response 472 return self 473 474 # Enable/Disable 475 476 @property 477 def enabled(self) -> bool: 478 """Get the current enabled status of the connection. 479 480 This property always fetches fresh data from the API to ensure accuracy, 481 as another process or user may have toggled the setting. 482 483 Returns: 484 True if the connection status is 'active', False otherwise. 485 """ 486 connection_info = self._fetch_connection_info(force_refresh=True) 487 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 488 489 @enabled.setter 490 def enabled(self, value: bool) -> None: 491 """Set the enabled status of the connection. 492 493 Args: 494 value: True to enable (set status to 'active'), False to disable 495 (set status to 'inactive'). 496 """ 497 self.set_enabled(enabled=value) 498 499 def set_enabled( 500 self, 501 *, 502 enabled: bool, 503 ignore_noop: bool = True, 504 ) -> None: 505 """Set the enabled status of the connection. 506 507 Args: 508 enabled: True to enable (set status to 'active'), False to disable 509 (set status to 'inactive'). 510 ignore_noop: If True (default), silently return if the connection is already 511 in the requested state. If False, raise ValueError when the requested 512 state matches the current state. 513 514 Raises: 515 ValueError: If ignore_noop is False and the connection is already in the 516 requested state. 517 """ 518 # Always fetch fresh data to check current status 519 connection_info = self._fetch_connection_info(force_refresh=True) 520 current_status = connection_info.status 521 desired_status = ( 522 api_util.models.ConnectionStatusEnum.ACTIVE 523 if enabled 524 else api_util.models.ConnectionStatusEnum.INACTIVE 525 ) 526 527 if current_status == desired_status: 528 if ignore_noop: 529 return 530 raise ValueError( 531 f"Connection is already {'enabled' if enabled else 'disabled'}. " 532 f"Current status: {current_status}" 533 ) 534 535 updated_response = api_util.patch_connection( 536 connection_id=self.connection_id, 537 api_root=self.workspace.api_root, 538 client_id=self.workspace.client_id, 539 client_secret=self.workspace.client_secret, 540 bearer_token=self.workspace.bearer_token, 541 status=desired_status, 542 ) 543 self._connection_info = updated_response 544 545 # Scheduling 546 547 def set_schedule( 548 self, 549 cron_expression: str, 550 ) -> None: 551 """Set a cron schedule for the connection. 552 553 Args: 554 cron_expression: A cron expression defining when syncs should run. 555 556 Examples: 557 - "0 0 * * *" - Daily at midnight UTC 558 - "0 */6 * * *" - Every 6 hours 559 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 560 """ 561 schedule = api_util.models.AirbyteAPIConnectionSchedule( 562 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 563 cron_expression=cron_expression, 564 ) 565 updated_response = api_util.patch_connection( 566 connection_id=self.connection_id, 567 api_root=self.workspace.api_root, 568 client_id=self.workspace.client_id, 569 client_secret=self.workspace.client_secret, 570 bearer_token=self.workspace.bearer_token, 571 schedule=schedule, 572 ) 573 self._connection_info = updated_response 574 575 def set_manual_schedule(self) -> None: 576 """Set the connection to manual scheduling. 577 578 Disables automatic syncs. Syncs will only run when manually triggered. 579 """ 580 schedule = api_util.models.AirbyteAPIConnectionSchedule( 581 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 582 ) 583 updated_response = api_util.patch_connection( 584 connection_id=self.connection_id, 585 api_root=self.workspace.api_root, 586 client_id=self.workspace.client_id, 587 client_secret=self.workspace.client_secret, 588 bearer_token=self.workspace.bearer_token, 589 schedule=schedule, 590 ) 591 self._connection_info = updated_response 592 593 # Deletions 594 595 def permanently_delete( 596 self, 597 *, 598 cascade_delete_source: bool = False, 599 cascade_delete_destination: bool = False, 600 ) -> None: 601 """Delete the connection. 602 603 Args: 604 cascade_delete_source: Whether to also delete the source. 605 cascade_delete_destination: Whether to also delete the destination. 606 """ 607 self.workspace.permanently_delete_connection(self) 608 609 if cascade_delete_source: 610 self.workspace.permanently_delete_source(self.source_id) 611 612 if cascade_delete_destination: 613 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
27 def __init__( 28 self, 29 workspace: CloudWorkspace, 30 connection_id: str, 31 source: str | None = None, 32 destination: str | None = None, 33 ) -> None: 34 """It is not recommended to create a `CloudConnection` object directly. 35 36 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 37 """ 38 self.connection_id = connection_id 39 """The ID of the connection.""" 40 41 self.workspace = workspace 42 """The workspace that the connection belongs to.""" 43 44 self._source_id = source 45 """The ID of the source.""" 46 47 self._destination_id = destination 48 """The ID of the destination.""" 49 50 self._connection_info: ConnectionResponse | None = None 51 """The connection info object. (Cached.)""" 52 53 self._cloud_source_object: CloudSource | None = None 54 """The source object. (Cached.)""" 55 56 self._cloud_destination_object: CloudDestination | None = None 57 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
130 def check_is_valid(self) -> bool: 131 """Check if this connection exists and belongs to the expected workspace. 132 133 This method fetches connection info from the API (if not already cached) and 134 verifies that the connection's workspace_id matches the workspace associated 135 with this CloudConnection object. 136 137 Returns: 138 True if the connection exists and belongs to the expected workspace. 139 140 Raises: 141 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 142 AirbyteMissingResourceError: If the connection doesn't exist. 143 """ 144 self._fetch_connection_info(force_refresh=False, verify=True) 145 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
165 @property 166 def name(self) -> str | None: 167 """Get the display name of the connection, if available. 168 169 E.g. "My Postgres to Snowflake", not the connection ID. 170 """ 171 if not self._connection_info: 172 self._connection_info = self._fetch_connection_info() 173 174 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
176 @property 177 def source_id(self) -> str: 178 """The ID of the source.""" 179 if not self._source_id: 180 if not self._connection_info: 181 self._connection_info = self._fetch_connection_info() 182 183 self._source_id = self._connection_info.source_id 184 185 return self._source_id
The ID of the source.
187 @property 188 def source(self) -> CloudSource: 189 """Get the source object.""" 190 if self._cloud_source_object: 191 return self._cloud_source_object 192 193 self._cloud_source_object = CloudSource( 194 workspace=self.workspace, 195 connector_id=self.source_id, 196 ) 197 return self._cloud_source_object
Get the source object.
199 @property 200 def destination_id(self) -> str: 201 """The ID of the destination.""" 202 if not self._destination_id: 203 if not self._connection_info: 204 self._connection_info = self._fetch_connection_info() 205 206 self._destination_id = self._connection_info.destination_id 207 208 return self._destination_id
The ID of the destination.
210 @property 211 def destination(self) -> CloudDestination: 212 """Get the destination object.""" 213 if self._cloud_destination_object: 214 return self._cloud_destination_object 215 216 self._cloud_destination_object = CloudDestination( 217 workspace=self.workspace, 218 connector_id=self.destination_id, 219 ) 220 return self._cloud_destination_object
Get the destination object.
222 @property 223 def stream_names(self) -> list[str]: 224 """The stream names.""" 225 if not self._connection_info: 226 self._connection_info = self._fetch_connection_info() 227 228 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
230 @property 231 def table_prefix(self) -> str: 232 """The table prefix.""" 233 if not self._connection_info: 234 self._connection_info = self._fetch_connection_info() 235 236 return self._connection_info.prefix or ""
The table prefix.
238 @property 239 def connection_url(self) -> str | None: 240 """The web URL to the connection.""" 241 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
243 @property 244 def job_history_url(self) -> str | None: 245 """The URL to the job history for the connection.""" 246 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
250 def run_sync( 251 self, 252 *, 253 wait: bool = True, 254 wait_timeout: int = 300, 255 ) -> SyncResult: 256 """Run a sync.""" 257 connection_response = api_util.run_connection( 258 connection_id=self.connection_id, 259 api_root=self.workspace.api_root, 260 workspace_id=self.workspace.workspace_id, 261 client_id=self.workspace.client_id, 262 client_secret=self.workspace.client_secret, 263 bearer_token=self.workspace.bearer_token, 264 ) 265 sync_result = SyncResult( 266 workspace=self.workspace, 267 connection=self, 268 job_id=connection_response.job_id, 269 ) 270 271 if wait: 272 sync_result.wait_for_completion( 273 wait_timeout=wait_timeout, 274 raise_failure=True, 275 raise_timeout=True, 276 ) 277 278 return sync_result
Run a sync.
289 def get_previous_sync_logs( 290 self, 291 *, 292 limit: int = 20, 293 offset: int | None = None, 294 from_tail: bool = True, 295 ) -> list[SyncResult]: 296 """Get previous sync jobs for a connection with pagination support. 297 298 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 299 rows_synced, start_time). Full log text can be fetched lazily via 300 `SyncResult.get_full_log_text()`. 301 302 Args: 303 limit: Maximum number of jobs to return. Defaults to 20. 304 offset: Number of jobs to skip from the beginning. Defaults to None (0). 305 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 306 If False, returns jobs ordered oldest-first (createdAt ASC). 307 Defaults to True. 308 309 Returns: 310 A list of SyncResult objects representing the sync jobs. 311 """ 312 order_by = ( 313 api_util.JOB_ORDER_BY_CREATED_AT_DESC 314 if from_tail 315 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 316 ) 317 sync_logs: list[JobResponse] = api_util.get_job_logs( 318 connection_id=self.connection_id, 319 api_root=self.workspace.api_root, 320 workspace_id=self.workspace.workspace_id, 321 limit=limit, 322 offset=offset, 323 order_by=order_by, 324 client_id=self.workspace.client_id, 325 client_secret=self.workspace.client_secret, 326 bearer_token=self.workspace.bearer_token, 327 ) 328 return [ 329 SyncResult( 330 workspace=self.workspace, 331 connection=self, 332 job_id=sync_log.job_id, 333 _latest_job_info=sync_log, 334 ) 335 for sync_log in sync_logs 336 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
Returns:
A list of SyncResult objects representing the sync jobs.
338 def get_sync_result( 339 self, 340 job_id: int | None = None, 341 ) -> SyncResult | None: 342 """Get the sync result for the connection. 343 344 If `job_id` is not provided, the most recent sync job will be used. 345 346 Returns `None` if job_id is omitted and no previous jobs are found. 347 """ 348 if job_id is None: 349 # Get the most recent sync job 350 results = self.get_previous_sync_logs( 351 limit=1, 352 ) 353 if results: 354 return results[0] 355 356 return None 357 358 # Get the sync job by ID (lazy loaded) 359 return SyncResult( 360 workspace=self.workspace, 361 connection=self, 362 job_id=job_id, 363 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
367 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 368 """Get the connection state artifacts. 369 370 Returns the persisted state for this connection, which can be used 371 when debugging incremental syncs. 372 373 Uses the Config API endpoint: POST /v1/state/get 374 375 Returns: 376 List of state objects for each stream, or None if no state is set. 377 """ 378 state_response = api_util.get_connection_state( 379 connection_id=self.connection_id, 380 api_root=self.workspace.api_root, 381 client_id=self.workspace.client_id, 382 client_secret=self.workspace.client_secret, 383 bearer_token=self.workspace.bearer_token, 384 ) 385 if state_response.get("stateType") == "not_set": 386 return None 387 return state_response.get("streamState", [])
Get the connection state artifacts.
Returns the persisted state for this connection, which can be used when debugging incremental syncs.
Uses the Config API endpoint: POST /v1/state/get
Returns:
List of state objects for each stream, or None if no state is set.
389 def get_catalog_artifact(self) -> dict[str, Any] | None: 390 """Get the configured catalog for this connection. 391 392 Returns the full configured catalog (syncCatalog) for this connection, 393 including stream schemas, sync modes, cursor fields, and primary keys. 394 395 Uses the Config API endpoint: POST /v1/web_backend/connections/get 396 397 Returns: 398 Dictionary containing the configured catalog, or `None` if not found. 399 """ 400 connection_response = api_util.get_connection_catalog( 401 connection_id=self.connection_id, 402 api_root=self.workspace.api_root, 403 client_id=self.workspace.client_id, 404 client_secret=self.workspace.client_secret, 405 bearer_token=self.workspace.bearer_token, 406 ) 407 return connection_response.get("syncCatalog")
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
409 def rename(self, name: str) -> CloudConnection: 410 """Rename the connection. 411 412 Args: 413 name: New name for the connection 414 415 Returns: 416 Updated CloudConnection object with refreshed info 417 """ 418 updated_response = api_util.patch_connection( 419 connection_id=self.connection_id, 420 api_root=self.workspace.api_root, 421 client_id=self.workspace.client_id, 422 client_secret=self.workspace.client_secret, 423 bearer_token=self.workspace.bearer_token, 424 name=name, 425 ) 426 self._connection_info = updated_response 427 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
429 def set_table_prefix(self, prefix: str) -> CloudConnection: 430 """Set the table prefix for the connection. 431 432 Args: 433 prefix: New table prefix to use when syncing to the destination 434 435 Returns: 436 Updated CloudConnection object with refreshed info 437 """ 438 updated_response = api_util.patch_connection( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 prefix=prefix, 445 ) 446 self._connection_info = updated_response 447 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
449 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 450 """Set the selected streams for the connection. 451 452 This is a destructive operation that can break existing connections if the 453 stream selection is changed incorrectly. Use with caution. 454 455 Args: 456 stream_names: List of stream names to sync 457 458 Returns: 459 Updated CloudConnection object with refreshed info 460 """ 461 configurations = api_util.build_stream_configurations(stream_names) 462 463 updated_response = api_util.patch_connection( 464 connection_id=self.connection_id, 465 api_root=self.workspace.api_root, 466 client_id=self.workspace.client_id, 467 client_secret=self.workspace.client_secret, 468 bearer_token=self.workspace.bearer_token, 469 configurations=configurations, 470 ) 471 self._connection_info = updated_response 472 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
476 @property 477 def enabled(self) -> bool: 478 """Get the current enabled status of the connection. 479 480 This property always fetches fresh data from the API to ensure accuracy, 481 as another process or user may have toggled the setting. 482 483 Returns: 484 True if the connection status is 'active', False otherwise. 485 """ 486 connection_info = self._fetch_connection_info(force_refresh=True) 487 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
499 def set_enabled( 500 self, 501 *, 502 enabled: bool, 503 ignore_noop: bool = True, 504 ) -> None: 505 """Set the enabled status of the connection. 506 507 Args: 508 enabled: True to enable (set status to 'active'), False to disable 509 (set status to 'inactive'). 510 ignore_noop: If True (default), silently return if the connection is already 511 in the requested state. If False, raise ValueError when the requested 512 state matches the current state. 513 514 Raises: 515 ValueError: If ignore_noop is False and the connection is already in the 516 requested state. 517 """ 518 # Always fetch fresh data to check current status 519 connection_info = self._fetch_connection_info(force_refresh=True) 520 current_status = connection_info.status 521 desired_status = ( 522 api_util.models.ConnectionStatusEnum.ACTIVE 523 if enabled 524 else api_util.models.ConnectionStatusEnum.INACTIVE 525 ) 526 527 if current_status == desired_status: 528 if ignore_noop: 529 return 530 raise ValueError( 531 f"Connection is already {'enabled' if enabled else 'disabled'}. " 532 f"Current status: {current_status}" 533 ) 534 535 updated_response = api_util.patch_connection( 536 connection_id=self.connection_id, 537 api_root=self.workspace.api_root, 538 client_id=self.workspace.client_id, 539 client_secret=self.workspace.client_secret, 540 bearer_token=self.workspace.bearer_token, 541 status=desired_status, 542 ) 543 self._connection_info = updated_response
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
547 def set_schedule( 548 self, 549 cron_expression: str, 550 ) -> None: 551 """Set a cron schedule for the connection. 552 553 Args: 554 cron_expression: A cron expression defining when syncs should run. 555 556 Examples: 557 - "0 0 * * *" - Daily at midnight UTC 558 - "0 */6 * * *" - Every 6 hours 559 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 560 """ 561 schedule = api_util.models.AirbyteAPIConnectionSchedule( 562 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 563 cron_expression=cron_expression, 564 ) 565 updated_response = api_util.patch_connection( 566 connection_id=self.connection_id, 567 api_root=self.workspace.api_root, 568 client_id=self.workspace.client_id, 569 client_secret=self.workspace.client_secret, 570 bearer_token=self.workspace.bearer_token, 571 schedule=schedule, 572 ) 573 self._connection_info = updated_response
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
575 def set_manual_schedule(self) -> None: 576 """Set the connection to manual scheduling. 577 578 Disables automatic syncs. Syncs will only run when manually triggered. 579 """ 580 schedule = api_util.models.AirbyteAPIConnectionSchedule( 581 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 582 ) 583 updated_response = api_util.patch_connection( 584 connection_id=self.connection_id, 585 api_root=self.workspace.api_root, 586 client_id=self.workspace.client_id, 587 client_secret=self.workspace.client_secret, 588 bearer_token=self.workspace.bearer_token, 589 schedule=schedule, 590 ) 591 self._connection_info = updated_response
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
595 def permanently_delete( 596 self, 597 *, 598 cascade_delete_source: bool = False, 599 cascade_delete_destination: bool = False, 600 ) -> None: 601 """Delete the connection. 602 603 Args: 604 cascade_delete_source: Whether to also delete the source. 605 cascade_delete_destination: Whether to also delete the destination. 606 """ 607 self.workspace.permanently_delete_connection(self) 608 609 if cascade_delete_source: 610 self.workspace.permanently_delete_source(self.source_id) 611 612 if cascade_delete_destination: 613 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
57@dataclass 58class CloudClientConfig: 59 """Client configuration for Airbyte Cloud API. 60 61 This class encapsulates the authentication and API configuration needed to connect 62 to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually 63 exclusive authentication methods: 64 65 1. OAuth2 client credentials flow (client_id + client_secret) 66 2. Bearer token authentication 67 68 Exactly one authentication method must be provided. Providing both or neither 69 will raise a validation error. 70 71 Attributes: 72 client_id: OAuth2 client ID for client credentials flow. 73 client_secret: OAuth2 client secret for client credentials flow. 74 bearer_token: Pre-generated bearer token for direct authentication. 75 api_root: The API root URL. Defaults to Airbyte Cloud API. 76 """ 77 78 client_id: SecretString | None = None 79 """OAuth2 client ID for client credentials authentication.""" 80 81 client_secret: SecretString | None = None 82 """OAuth2 client secret for client credentials authentication.""" 83 84 bearer_token: SecretString | None = None 85 """Bearer token for direct authentication (alternative to client credentials).""" 86 87 api_root: str = api_util.CLOUD_API_ROOT 88 """The API root URL. Defaults to Airbyte Cloud API.""" 89 90 def __post_init__(self) -> None: 91 """Validate credentials and ensure secrets are properly wrapped.""" 92 # Wrap secrets in SecretString if they aren't already 93 if self.client_id is not None: 94 self.client_id = SecretString(self.client_id) 95 if self.client_secret is not None: 96 self.client_secret = SecretString(self.client_secret) 97 if self.bearer_token is not None: 98 self.bearer_token = SecretString(self.bearer_token) 99 100 # Validate mutual exclusivity 101 has_client_credentials = self.client_id is not None or self.client_secret is not None 102 has_bearer_token = self.bearer_token is not None 103 104 if has_client_credentials and has_bearer_token: 105 raise PyAirbyteInputError( 106 message="Cannot use both client credentials and bearer token authentication.", 107 guidance=( 108 "Provide either client_id and client_secret together, " 109 "or bearer_token alone, but not both." 110 ), 111 ) 112 113 if has_client_credentials and (self.client_id is None or self.client_secret is None): 114 # If using client credentials, both must be provided 115 raise PyAirbyteInputError( 116 message="Incomplete client credentials.", 117 guidance=( 118 "When using client credentials authentication, " 119 "both client_id and client_secret must be provided." 120 ), 121 ) 122 123 if not has_client_credentials and not has_bearer_token: 124 raise PyAirbyteInputError( 125 message="No authentication credentials provided.", 126 guidance=( 127 "Provide either client_id and client_secret together for OAuth2 " 128 "client credentials flow, or bearer_token for direct authentication." 129 ), 130 ) 131 132 @property 133 def uses_bearer_token(self) -> bool: 134 """Return True if using bearer token authentication.""" 135 return self.bearer_token is not None 136 137 @property 138 def uses_client_credentials(self) -> bool: 139 """Return True if using client credentials authentication.""" 140 return self.client_id is not None and self.client_secret is not None 141 142 @classmethod 143 def from_env( 144 cls, 145 *, 146 api_root: str | None = None, 147 ) -> CloudClientConfig: 148 """Create CloudClientConfig from environment variables. 149 150 This factory method resolves credentials from environment variables, 151 providing a convenient way to create credentials without explicitly 152 passing secrets. 153 154 Environment variables used: 155 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 156 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 157 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 158 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 159 160 The method will first check for a bearer token. If not found, it will 161 attempt to use client credentials. 162 163 Args: 164 api_root: The API root URL. If not provided, will be resolved from 165 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 166 the Airbyte Cloud API. 167 168 Returns: 169 A CloudClientConfig instance configured with credentials from the environment. 170 171 Raises: 172 PyAirbyteSecretNotFoundError: If required credentials are not found in 173 the environment. 174 """ 175 resolved_api_root = resolve_cloud_api_url(api_root) 176 177 # Try bearer token first 178 bearer_token = resolve_cloud_bearer_token() 179 if bearer_token: 180 return cls( 181 bearer_token=bearer_token, 182 api_root=resolved_api_root, 183 ) 184 185 # Fall back to client credentials 186 return cls( 187 client_id=resolve_cloud_client_id(), 188 client_secret=resolve_cloud_client_secret(), 189 api_root=resolved_api_root, 190 )
Client configuration for Airbyte Cloud API.
This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:
- OAuth2 client credentials flow (client_id + client_secret)
- Bearer token authentication
Exactly one authentication method must be provided. Providing both or neither will raise a validation error.
Attributes:
- client_id: OAuth2 client ID for client credentials flow.
- client_secret: OAuth2 client secret for client credentials flow.
- bearer_token: Pre-generated bearer token for direct authentication.
- api_root: The API root URL. Defaults to Airbyte Cloud API.
132 @property 133 def uses_bearer_token(self) -> bool: 134 """Return True if using bearer token authentication.""" 135 return self.bearer_token is not None
Return True if using bearer token authentication.
137 @property 138 def uses_client_credentials(self) -> bool: 139 """Return True if using client credentials authentication.""" 140 return self.client_id is not None and self.client_secret is not None
Return True if using client credentials authentication.
142 @classmethod 143 def from_env( 144 cls, 145 *, 146 api_root: str | None = None, 147 ) -> CloudClientConfig: 148 """Create CloudClientConfig from environment variables. 149 150 This factory method resolves credentials from environment variables, 151 providing a convenient way to create credentials without explicitly 152 passing secrets. 153 154 Environment variables used: 155 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 156 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 157 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 158 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 159 160 The method will first check for a bearer token. If not found, it will 161 attempt to use client credentials. 162 163 Args: 164 api_root: The API root URL. If not provided, will be resolved from 165 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 166 the Airbyte Cloud API. 167 168 Returns: 169 A CloudClientConfig instance configured with credentials from the environment. 170 171 Raises: 172 PyAirbyteSecretNotFoundError: If required credentials are not found in 173 the environment. 174 """ 175 resolved_api_root = resolve_cloud_api_url(api_root) 176 177 # Try bearer token first 178 bearer_token = resolve_cloud_bearer_token() 179 if bearer_token: 180 return cls( 181 bearer_token=bearer_token, 182 api_root=resolved_api_root, 183 ) 184 185 # Fall back to client credentials 186 return cls( 187 client_id=resolve_cloud_client_id(), 188 client_secret=resolve_cloud_client_secret(), 189 api_root=resolved_api_root, 190 )
Create CloudClientConfig from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.
Environment variables used:
AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
The method will first check for a bearer token. If not found, it will attempt to use client credentials.
Arguments:
- api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API.
Returns:
A CloudClientConfig instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: JobResponse | None = None 232 _connection_response: ConnectionResponse | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 bearer_token=self.workspace.bearer_token, 260 ) 261 return self._connection_response 262 263 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 264 """Return the destination configuration for the sync job.""" 265 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 266 destination_response = api_util.get_destination( 267 destination_id=connection_info.destination_id, 268 api_root=self.workspace.api_root, 269 client_id=self.workspace.client_id, 270 client_secret=self.workspace.client_secret, 271 bearer_token=self.workspace.bearer_token, 272 ) 273 return asdict(destination_response.configuration) 274 275 def is_job_complete(self) -> bool: 276 """Check if the sync job is complete.""" 277 return self.get_job_status() in FINAL_STATUSES 278 279 def get_job_status(self) -> JobStatusEnum: 280 """Check if the sync job is still running.""" 281 return self._fetch_latest_job_info().status 282 283 def _fetch_latest_job_info(self) -> JobResponse: 284 """Return the job info for the sync job.""" 285 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 286 return self._latest_job_info 287 288 self._latest_job_info = api_util.get_job_info( 289 job_id=self.job_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 bearer_token=self.workspace.bearer_token, 294 ) 295 return self._latest_job_info 296 297 @property 298 def bytes_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().bytes_synced or 0 301 302 @property 303 def records_synced(self) -> int: 304 """Return the number of records processed.""" 305 return self._fetch_latest_job_info().rows_synced or 0 306 307 @property 308 def start_time(self) -> datetime: 309 """Return the start time of the sync job in UTC.""" 310 try: 311 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 312 except (ValueError, TypeError) as e: 313 if "Invalid isoformat string" in str(e): 314 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 315 api_root=self.workspace.api_root, 316 path="/jobs/get", 317 json={"id": self.job_id}, 318 client_id=self.workspace.client_id, 319 client_secret=self.workspace.client_secret, 320 bearer_token=self.workspace.bearer_token, 321 ) 322 raw_start_time = job_info_raw.get("startTime") 323 if raw_start_time: 324 return ab_datetime_parse(raw_start_time) 325 raise 326 327 def _fetch_job_with_attempts(self) -> dict[str, Any]: 328 """Fetch job info with attempts from Config API using lazy loading pattern.""" 329 if self._job_with_attempts_info is not None: 330 return self._job_with_attempts_info 331 332 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 333 api_root=self.workspace.api_root, 334 path="/jobs/get", 335 json={ 336 "id": self.job_id, 337 }, 338 client_id=self.workspace.client_id, 339 client_secret=self.workspace.client_secret, 340 bearer_token=self.workspace.bearer_token, 341 ) 342 return self._job_with_attempts_info 343 344 def get_attempts(self) -> list[SyncAttempt]: 345 """Return a list of attempts for this sync job.""" 346 job_with_attempts = self._fetch_job_with_attempts() 347 attempts_data = job_with_attempts.get("attempts", []) 348 349 return [ 350 SyncAttempt( 351 workspace=self.workspace, 352 connection=self.connection, 353 job_id=self.job_id, 354 attempt_number=i, 355 _attempt_data=attempt_data, 356 ) 357 for i, attempt_data in enumerate(attempts_data, start=0) 358 ] 359 360 def raise_failure_status( 361 self, 362 *, 363 refresh_status: bool = False, 364 ) -> None: 365 """Raise an exception if the sync job failed. 366 367 By default, this method will use the latest status available. If you want to refresh the 368 status before checking for failure, set `refresh_status=True`. If the job has failed, this 369 method will raise a `AirbyteConnectionSyncError`. 370 371 Otherwise, do nothing. 372 """ 373 if not refresh_status and self._latest_job_info: 374 latest_status = self._latest_job_info.status 375 else: 376 latest_status = self.get_job_status() 377 378 if latest_status in FAILED_STATUSES: 379 raise AirbyteConnectionSyncError( 380 workspace=self.workspace, 381 connection_id=self.connection.connection_id, 382 job_id=self.job_id, 383 job_status=self.get_job_status(), 384 ) 385 386 def wait_for_completion( 387 self, 388 *, 389 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 390 raise_timeout: bool = True, 391 raise_failure: bool = False, 392 ) -> JobStatusEnum: 393 """Wait for a job to finish running.""" 394 start_time = time.time() 395 while True: 396 latest_status = self.get_job_status() 397 if latest_status in FINAL_STATUSES: 398 if raise_failure: 399 # No-op if the job succeeded or is still running: 400 self.raise_failure_status() 401 402 return latest_status 403 404 if time.time() - start_time > wait_timeout: 405 if raise_timeout: 406 raise AirbyteConnectionSyncTimeoutError( 407 workspace=self.workspace, 408 connection_id=self.connection.connection_id, 409 job_id=self.job_id, 410 job_status=latest_status, 411 timeout=wait_timeout, 412 ) 413 414 return latest_status # This will be a non-final status 415 416 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 417 418 def get_sql_cache(self) -> CacheBase: 419 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 420 if self._cache: 421 return self._cache 422 423 destination_configuration = self._get_destination_configuration() 424 self._cache = destination_to_cache(destination_configuration=destination_configuration) 425 return self._cache 426 427 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 428 """Return a SQL Engine for querying a SQL-based destination.""" 429 return self.get_sql_cache().get_sql_engine() 430 431 def get_sql_table_name(self, stream_name: str) -> str: 432 """Return the SQL table name of the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 434 435 def get_sql_table( 436 self, 437 stream_name: str, 438 ) -> sqlalchemy.Table: 439 """Return a SQLAlchemy table object for the named stream.""" 440 return self.get_sql_cache().processor.get_sql_table(stream_name) 441 442 def get_dataset(self, stream_name: str) -> CachedDataset: 443 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 444 445 This can be used to read and analyze the data in a SQL-based destination. 446 447 TODO: In a future iteration, we can consider providing stream configuration information 448 (catalog information) to the `CachedDataset` object via the "Get stream properties" 449 API: https://reference.airbyte.com/reference/getstreamproperties 450 """ 451 return CachedDataset( 452 self.get_sql_cache(), 453 stream_name=stream_name, 454 stream_configuration=False, # Don't look for stream configuration in cache. 455 ) 456 457 def get_sql_database_name(self) -> str: 458 """Return the SQL database name.""" 459 cache = self.get_sql_cache() 460 return cache.get_database_name() 461 462 def get_sql_schema_name(self) -> str: 463 """Return the SQL schema name.""" 464 cache = self.get_sql_cache() 465 return cache.schema_name 466 467 @property 468 def stream_names(self) -> list[str]: 469 """Return the set of stream names.""" 470 return self.connection.stream_names 471 472 @final 473 @property 474 def streams( 475 self, 476 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 477 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 478 479 This is a convenience wrapper around the `stream_names` 480 property and `get_dataset()` method. 481 """ 482 return self._SyncResultStreams(self) 483 484 class _SyncResultStreams(Mapping[str, CachedDataset]): 485 """A mapping of stream names to cached datasets.""" 486 487 def __init__( 488 self, 489 parent: SyncResult, 490 /, 491 ) -> None: 492 self.parent: SyncResult = parent 493 494 def __getitem__(self, key: str) -> CachedDataset: 495 return self.parent.get_dataset(stream_name=key) 496 497 def __iter__(self) -> Iterator[str]: 498 return iter(self.parent.stream_names) 499 500 def __len__(self) -> int: 501 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
275 def is_job_complete(self) -> bool: 276 """Check if the sync job is complete.""" 277 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
279 def get_job_status(self) -> JobStatusEnum: 280 """Check if the sync job is still running.""" 281 return self._fetch_latest_job_info().status
Check if the sync job is still running.
297 @property 298 def bytes_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
302 @property 303 def records_synced(self) -> int: 304 """Return the number of records processed.""" 305 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
307 @property 308 def start_time(self) -> datetime: 309 """Return the start time of the sync job in UTC.""" 310 try: 311 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 312 except (ValueError, TypeError) as e: 313 if "Invalid isoformat string" in str(e): 314 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 315 api_root=self.workspace.api_root, 316 path="/jobs/get", 317 json={"id": self.job_id}, 318 client_id=self.workspace.client_id, 319 client_secret=self.workspace.client_secret, 320 bearer_token=self.workspace.bearer_token, 321 ) 322 raw_start_time = job_info_raw.get("startTime") 323 if raw_start_time: 324 return ab_datetime_parse(raw_start_time) 325 raise
Return the start time of the sync job in UTC.
344 def get_attempts(self) -> list[SyncAttempt]: 345 """Return a list of attempts for this sync job.""" 346 job_with_attempts = self._fetch_job_with_attempts() 347 attempts_data = job_with_attempts.get("attempts", []) 348 349 return [ 350 SyncAttempt( 351 workspace=self.workspace, 352 connection=self.connection, 353 job_id=self.job_id, 354 attempt_number=i, 355 _attempt_data=attempt_data, 356 ) 357 for i, attempt_data in enumerate(attempts_data, start=0) 358 ]
Return a list of attempts for this sync job.
360 def raise_failure_status( 361 self, 362 *, 363 refresh_status: bool = False, 364 ) -> None: 365 """Raise an exception if the sync job failed. 366 367 By default, this method will use the latest status available. If you want to refresh the 368 status before checking for failure, set `refresh_status=True`. If the job has failed, this 369 method will raise a `AirbyteConnectionSyncError`. 370 371 Otherwise, do nothing. 372 """ 373 if not refresh_status and self._latest_job_info: 374 latest_status = self._latest_job_info.status 375 else: 376 latest_status = self.get_job_status() 377 378 if latest_status in FAILED_STATUSES: 379 raise AirbyteConnectionSyncError( 380 workspace=self.workspace, 381 connection_id=self.connection.connection_id, 382 job_id=self.job_id, 383 job_status=self.get_job_status(), 384 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
386 def wait_for_completion( 387 self, 388 *, 389 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 390 raise_timeout: bool = True, 391 raise_failure: bool = False, 392 ) -> JobStatusEnum: 393 """Wait for a job to finish running.""" 394 start_time = time.time() 395 while True: 396 latest_status = self.get_job_status() 397 if latest_status in FINAL_STATUSES: 398 if raise_failure: 399 # No-op if the job succeeded or is still running: 400 self.raise_failure_status() 401 402 return latest_status 403 404 if time.time() - start_time > wait_timeout: 405 if raise_timeout: 406 raise AirbyteConnectionSyncTimeoutError( 407 workspace=self.workspace, 408 connection_id=self.connection.connection_id, 409 job_id=self.job_id, 410 job_status=latest_status, 411 timeout=wait_timeout, 412 ) 413 414 return latest_status # This will be a non-final status 415 416 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
418 def get_sql_cache(self) -> CacheBase: 419 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 420 if self._cache: 421 return self._cache 422 423 destination_configuration = self._get_destination_configuration() 424 self._cache = destination_to_cache(destination_configuration=destination_configuration) 425 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
427 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 428 """Return a SQL Engine for querying a SQL-based destination.""" 429 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
431 def get_sql_table_name(self, stream_name: str) -> str: 432 """Return the SQL table name of the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
435 def get_sql_table( 436 self, 437 stream_name: str, 438 ) -> sqlalchemy.Table: 439 """Return a SQLAlchemy table object for the named stream.""" 440 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
442 def get_dataset(self, stream_name: str) -> CachedDataset: 443 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 444 445 This can be used to read and analyze the data in a SQL-based destination. 446 447 TODO: In a future iteration, we can consider providing stream configuration information 448 (catalog information) to the `CachedDataset` object via the "Get stream properties" 449 API: https://reference.airbyte.com/reference/getstreamproperties 450 """ 451 return CachedDataset( 452 self.get_sql_cache(), 453 stream_name=stream_name, 454 stream_configuration=False, # Don't look for stream configuration in cache. 455 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
457 def get_sql_database_name(self) -> str: 458 """Return the SQL database name.""" 459 cache = self.get_sql_cache() 460 return cache.get_database_name()
Return the SQL database name.
462 def get_sql_schema_name(self) -> str: 463 """Return the SQL schema name.""" 464 cache = self.get_sql_cache() 465 return cache.schema_name
Return the SQL schema name.
467 @property 468 def stream_names(self) -> list[str]: 469 """Return the set of stream names.""" 470 return self.connection.stream_names
Return the set of stream names.
472 @final 473 @property 474 def streams( 475 self, 476 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 477 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 478 479 This is a convenience wrapper around the `stream_names` 480 property and `get_dataset()` method. 481 """ 482 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.