airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.client_config import CloudClientConfig 57from airbyte.cloud.connections import CloudConnection 58from airbyte.cloud.constants import JobStatusEnum 59from airbyte.cloud.sync_results import SyncResult 60from airbyte.cloud.workspaces import CloudWorkspace 61 62 63# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 64if TYPE_CHECKING: 65 # ruff: noqa: TC004 66 from airbyte.cloud import ( 67 client_config, 68 connections, 69 constants, 70 sync_results, 71 workspaces, 72 ) 73 74 75__all__ = [ 76 # Submodules 77 "workspaces", 78 "connections", 79 "constants", 80 "client_config", 81 "sync_results", 82 # Classes 83 "CloudWorkspace", 84 "CloudConnection", 85 "CloudClientConfig", 86 "SyncResult", 87 # Enums 88 "JobStatusEnum", 89]
210@dataclass 211class CloudWorkspace: 212 """A remote workspace on the Airbyte Cloud. 213 214 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 215 instances, both OSS and Enterprise. 216 217 Two authentication methods are supported (mutually exclusive): 218 1. OAuth2 client credentials (client_id + client_secret) 219 2. Bearer token authentication 220 221 Example with client credentials: 222 ```python 223 workspace = CloudWorkspace( 224 workspace_id="...", 225 client_id="...", 226 client_secret="...", 227 ) 228 ``` 229 230 Example with bearer token: 231 ```python 232 workspace = CloudWorkspace( 233 workspace_id="...", 234 bearer_token="...", 235 ) 236 ``` 237 """ 238 239 workspace_id: str 240 client_id: SecretString | None = None 241 client_secret: SecretString | None = None 242 api_root: str = api_util.CLOUD_API_ROOT 243 bearer_token: SecretString | None = None 244 245 # Internal credentials object (set in __post_init__, excluded from __init__) 246 _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False) 247 248 def __post_init__(self) -> None: 249 """Validate and initialize credentials.""" 250 # Wrap secrets in SecretString if provided 251 if self.client_id is not None: 252 self.client_id = SecretString(self.client_id) 253 if self.client_secret is not None: 254 self.client_secret = SecretString(self.client_secret) 255 if self.bearer_token is not None: 256 self.bearer_token = SecretString(self.bearer_token) 257 258 # Create internal CloudClientConfig object (validates mutual exclusivity) 259 self._credentials = CloudClientConfig( 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 bearer_token=self.bearer_token, 263 api_root=self.api_root, 264 ) 265 266 @classmethod 267 def from_env( 268 cls, 269 workspace_id: str | None = None, 270 *, 271 api_root: str | None = None, 272 ) -> CloudWorkspace: 273 """Create a CloudWorkspace using credentials from environment variables. 274 275 This factory method resolves credentials from environment variables, 276 providing a convenient way to create a workspace without explicitly 277 passing credentials. 278 279 Two authentication methods are supported (mutually exclusive): 280 1. Bearer token (checked first) 281 2. OAuth2 client credentials (fallback) 282 283 Environment variables used: 284 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 285 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 286 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 287 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 288 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 289 290 Args: 291 workspace_id: The workspace ID. If not provided, will be resolved from 292 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 293 api_root: The API root URL. If not provided, will be resolved from 294 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 295 the Airbyte Cloud API. 296 297 Returns: 298 A CloudWorkspace instance configured with credentials from the environment. 299 300 Raises: 301 PyAirbyteSecretNotFoundError: If required credentials are not found in 302 the environment. 303 304 Example: 305 ```python 306 # With workspace_id from environment 307 workspace = CloudWorkspace.from_env() 308 309 # With explicit workspace_id 310 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 311 ``` 312 """ 313 resolved_api_root = resolve_cloud_api_url(api_root) 314 315 # Try bearer token first 316 bearer_token = resolve_cloud_bearer_token() 317 if bearer_token: 318 return cls( 319 workspace_id=resolve_cloud_workspace_id(workspace_id), 320 bearer_token=bearer_token, 321 api_root=resolved_api_root, 322 ) 323 324 # Fall back to client credentials 325 return cls( 326 workspace_id=resolve_cloud_workspace_id(workspace_id), 327 client_id=resolve_cloud_client_id(), 328 client_secret=resolve_cloud_client_secret(), 329 api_root=resolved_api_root, 330 ) 331 332 @property 333 def workspace_url(self) -> str | None: 334 """The web URL of the workspace.""" 335 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 336 337 @cached_property 338 def _organization_info(self) -> dict[str, Any]: 339 """Fetch and cache organization info for this workspace. 340 341 Uses the Config API endpoint for an efficient O(1) lookup. 342 This is an internal method; use get_organization() for public access. 343 """ 344 return api_util.get_workspace_organization_info( 345 workspace_id=self.workspace_id, 346 api_root=self.api_root, 347 client_id=self.client_id, 348 client_secret=self.client_secret, 349 bearer_token=self.bearer_token, 350 ) 351 352 @overload 353 def get_organization(self) -> CloudOrganization: ... 354 355 @overload 356 def get_organization( 357 self, 358 *, 359 raise_on_error: Literal[True], 360 ) -> CloudOrganization: ... 361 362 @overload 363 def get_organization( 364 self, 365 *, 366 raise_on_error: Literal[False], 367 ) -> CloudOrganization | None: ... 368 369 def get_organization( 370 self, 371 *, 372 raise_on_error: bool = True, 373 ) -> CloudOrganization | None: 374 """Get the organization this workspace belongs to. 375 376 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 377 which may not be available with workspace-scoped credentials. 378 379 Args: 380 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 381 If False, returns None instead of raising. 382 383 Returns: 384 CloudOrganization object with organization_id and organization_name, 385 or None if raise_on_error=False and an error occurred. 386 387 Raises: 388 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 389 (e.g., due to insufficient permissions or missing data). 390 """ 391 try: 392 info = self._organization_info 393 except (AirbyteError, NotImplementedError): 394 if raise_on_error: 395 raise 396 return None 397 398 organization_id = info.get("organizationId") 399 organization_name = info.get("organizationName") 400 401 # Validate that both organization_id and organization_name are non-null and non-empty 402 if not organization_id or not organization_name: 403 if raise_on_error: 404 raise AirbyteError( 405 message="Organization info is incomplete.", 406 context={ 407 "organization_id": organization_id, 408 "organization_name": organization_name, 409 }, 410 ) 411 return None 412 413 return CloudOrganization( 414 organization_id=organization_id, 415 organization_name=organization_name, 416 api_root=self.api_root, 417 client_id=self.client_id, 418 client_secret=self.client_secret, 419 bearer_token=self.bearer_token, 420 ) 421 422 # Test connection and creds 423 424 def connect(self) -> None: 425 """Check that the workspace is reachable and raise an exception otherwise. 426 427 Note: It is not necessary to call this method before calling other operations. It 428 serves primarily as a simple check to ensure that the workspace is reachable 429 and credentials are correct. 430 """ 431 _ = api_util.get_workspace( 432 api_root=self.api_root, 433 workspace_id=self.workspace_id, 434 client_id=self.client_id, 435 client_secret=self.client_secret, 436 bearer_token=self.bearer_token, 437 ) 438 print(f"Successfully connected to workspace: {self.workspace_url}") 439 440 # Get sources, destinations, and connections 441 442 def get_connection( 443 self, 444 connection_id: str, 445 ) -> CloudConnection: 446 """Get a connection by ID. 447 448 This method does not fetch data from the API. It returns a `CloudConnection` object, 449 which will be loaded lazily as needed. 450 """ 451 return CloudConnection( 452 workspace=self, 453 connection_id=connection_id, 454 ) 455 456 def get_source( 457 self, 458 source_id: str, 459 ) -> CloudSource: 460 """Get a source by ID. 461 462 This method does not fetch data from the API. It returns a `CloudSource` object, 463 which will be loaded lazily as needed. 464 """ 465 return CloudSource( 466 workspace=self, 467 connector_id=source_id, 468 ) 469 470 def get_destination( 471 self, 472 destination_id: str, 473 ) -> CloudDestination: 474 """Get a destination by ID. 475 476 This method does not fetch data from the API. It returns a `CloudDestination` object, 477 which will be loaded lazily as needed. 478 """ 479 return CloudDestination( 480 workspace=self, 481 connector_id=destination_id, 482 ) 483 484 # Deploy sources and destinations 485 486 def deploy_source( 487 self, 488 name: str, 489 source: Source, 490 *, 491 unique: bool = True, 492 random_name_suffix: bool = False, 493 ) -> CloudSource: 494 """Deploy a source to the workspace. 495 496 Returns the newly deployed source. 497 498 Args: 499 name: The name to use when deploying. 500 source: The source object to deploy. 501 unique: Whether to require a unique name. If `True`, duplicate names 502 are not allowed. Defaults to `True`. 503 random_name_suffix: Whether to append a random suffix to the name. 504 """ 505 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 506 source_config_dict["sourceType"] = source.name.replace("source-", "") 507 508 if random_name_suffix: 509 name += f" (ID: {text_util.generate_random_suffix()})" 510 511 if unique: 512 existing = self.list_sources(name=name) 513 if existing: 514 raise exc.AirbyteDuplicateResourcesError( 515 resource_type="source", 516 resource_name=name, 517 ) 518 519 deployed_source = api_util.create_source( 520 name=name, 521 api_root=self.api_root, 522 workspace_id=self.workspace_id, 523 config=source_config_dict, 524 client_id=self.client_id, 525 client_secret=self.client_secret, 526 bearer_token=self.bearer_token, 527 ) 528 return CloudSource( 529 workspace=self, 530 connector_id=deployed_source.source_id, 531 ) 532 533 def deploy_destination( 534 self, 535 name: str, 536 destination: Destination | dict[str, Any], 537 *, 538 unique: bool = True, 539 random_name_suffix: bool = False, 540 ) -> CloudDestination: 541 """Deploy a destination to the workspace. 542 543 Returns the newly deployed destination ID. 544 545 Args: 546 name: The name to use when deploying. 547 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 548 dictionary of configuration values. 549 unique: Whether to require a unique name. If `True`, duplicate names 550 are not allowed. Defaults to `True`. 551 random_name_suffix: Whether to append a random suffix to the name. 552 """ 553 if isinstance(destination, Destination): 554 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 555 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 556 # raise ValueError(destination_conf_dict) 557 else: 558 destination_conf_dict = destination.copy() 559 if "destinationType" not in destination_conf_dict: 560 raise exc.PyAirbyteInputError( 561 message="Missing `destinationType` in configuration dictionary.", 562 ) 563 564 if random_name_suffix: 565 name += f" (ID: {text_util.generate_random_suffix()})" 566 567 if unique: 568 existing = self.list_destinations(name=name) 569 if existing: 570 raise exc.AirbyteDuplicateResourcesError( 571 resource_type="destination", 572 resource_name=name, 573 ) 574 575 deployed_destination = api_util.create_destination( 576 name=name, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 config=destination_conf_dict, # Wants a dataclass but accepts dict 580 client_id=self.client_id, 581 client_secret=self.client_secret, 582 bearer_token=self.bearer_token, 583 ) 584 return CloudDestination( 585 workspace=self, 586 connector_id=deployed_destination.destination_id, 587 ) 588 589 def permanently_delete_source( 590 self, 591 source: str | CloudSource, 592 *, 593 safe_mode: bool = True, 594 ) -> None: 595 """Delete a source from the workspace. 596 597 You can pass either the source ID `str` or a deployed `Source` object. 598 599 Args: 600 source: The source ID or CloudSource object to delete 601 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 602 (case insensitive) to prevent accidental deletion. Defaults to True. 603 """ 604 if not isinstance(source, (str, CloudSource)): 605 raise exc.PyAirbyteInputError( 606 message="Invalid source type.", 607 input_value=type(source).__name__, 608 ) 609 610 api_util.delete_source( 611 source_id=source.connector_id if isinstance(source, CloudSource) else source, 612 source_name=source.name if isinstance(source, CloudSource) else None, 613 api_root=self.api_root, 614 client_id=self.client_id, 615 client_secret=self.client_secret, 616 bearer_token=self.bearer_token, 617 safe_mode=safe_mode, 618 ) 619 620 # Deploy and delete destinations 621 622 def permanently_delete_destination( 623 self, 624 destination: str | CloudDestination, 625 *, 626 safe_mode: bool = True, 627 ) -> None: 628 """Delete a deployed destination from the workspace. 629 630 You can pass either the `Cache` class or the deployed destination ID as a `str`. 631 632 Args: 633 destination: The destination ID or CloudDestination object to delete 634 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 635 (case insensitive) to prevent accidental deletion. Defaults to True. 636 """ 637 if not isinstance(destination, (str, CloudDestination)): 638 raise exc.PyAirbyteInputError( 639 message="Invalid destination type.", 640 input_value=type(destination).__name__, 641 ) 642 643 api_util.delete_destination( 644 destination_id=( 645 destination if isinstance(destination, str) else destination.destination_id 646 ), 647 destination_name=( 648 destination.name if isinstance(destination, CloudDestination) else None 649 ), 650 api_root=self.api_root, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 safe_mode=safe_mode, 655 ) 656 657 # Deploy and delete connections 658 659 def deploy_connection( 660 self, 661 connection_name: str, 662 *, 663 source: CloudSource | str, 664 selected_streams: list[str], 665 destination: CloudDestination | str, 666 table_prefix: str | None = None, 667 ) -> CloudConnection: 668 """Create a new connection between an already deployed source and destination. 669 670 Returns the newly deployed connection object. 671 672 Args: 673 connection_name: The name of the connection. 674 source: The deployed source. You can pass a source ID or a CloudSource object. 675 destination: The deployed destination. You can pass a destination ID or a 676 CloudDestination object. 677 table_prefix: Optional. The table prefix to use when syncing to the destination. 678 selected_streams: The selected stream names to sync within the connection. 679 """ 680 if not selected_streams: 681 raise exc.PyAirbyteInputError( 682 guidance="You must provide `selected_streams` when creating a connection." 683 ) 684 685 source_id: str = source if isinstance(source, str) else source.connector_id 686 destination_id: str = ( 687 destination if isinstance(destination, str) else destination.connector_id 688 ) 689 690 deployed_connection = api_util.create_connection( 691 name=connection_name, 692 source_id=source_id, 693 destination_id=destination_id, 694 api_root=self.api_root, 695 workspace_id=self.workspace_id, 696 selected_stream_names=selected_streams, 697 prefix=table_prefix or "", 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 ) 702 703 return CloudConnection( 704 workspace=self, 705 connection_id=deployed_connection.connection_id, 706 source=deployed_connection.source_id, 707 destination=deployed_connection.destination_id, 708 ) 709 710 def permanently_delete_connection( 711 self, 712 connection: str | CloudConnection, 713 *, 714 cascade_delete_source: bool = False, 715 cascade_delete_destination: bool = False, 716 safe_mode: bool = True, 717 ) -> None: 718 """Delete a deployed connection from the workspace. 719 720 Args: 721 connection: The connection ID or CloudConnection object to delete 722 cascade_delete_source: If True, also delete the source after deleting the connection 723 cascade_delete_destination: If True, also delete the destination after deleting 724 the connection 725 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 726 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 727 to cascade deletes. 728 """ 729 if connection is None: 730 raise ValueError("No connection ID provided.") 731 732 if isinstance(connection, str): 733 connection = CloudConnection( 734 workspace=self, 735 connection_id=connection, 736 ) 737 738 api_util.delete_connection( 739 connection_id=connection.connection_id, 740 connection_name=connection.name, 741 api_root=self.api_root, 742 workspace_id=self.workspace_id, 743 client_id=self.client_id, 744 client_secret=self.client_secret, 745 bearer_token=self.bearer_token, 746 safe_mode=safe_mode, 747 ) 748 749 if cascade_delete_source: 750 self.permanently_delete_source( 751 source=connection.source_id, 752 safe_mode=safe_mode, 753 ) 754 if cascade_delete_destination: 755 self.permanently_delete_destination( 756 destination=connection.destination_id, 757 safe_mode=safe_mode, 758 ) 759 760 # List sources, destinations, and connections 761 762 def list_connections( 763 self, 764 name: str | None = None, 765 *, 766 name_filter: Callable | None = None, 767 ) -> list[CloudConnection]: 768 """List connections by name in the workspace. 769 770 TODO: Add pagination support 771 """ 772 connections = api_util.list_connections( 773 api_root=self.api_root, 774 workspace_id=self.workspace_id, 775 name=name, 776 name_filter=name_filter, 777 client_id=self.client_id, 778 client_secret=self.client_secret, 779 bearer_token=self.bearer_token, 780 ) 781 return [ 782 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 783 workspace=self, 784 connection_response=connection, 785 ) 786 for connection in connections 787 if name is None or connection.name == name 788 ] 789 790 def list_sources( 791 self, 792 name: str | None = None, 793 *, 794 name_filter: Callable | None = None, 795 ) -> list[CloudSource]: 796 """List all sources in the workspace. 797 798 TODO: Add pagination support 799 """ 800 sources = api_util.list_sources( 801 api_root=self.api_root, 802 workspace_id=self.workspace_id, 803 name=name, 804 name_filter=name_filter, 805 client_id=self.client_id, 806 client_secret=self.client_secret, 807 bearer_token=self.bearer_token, 808 ) 809 return [ 810 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 811 workspace=self, 812 source_response=source, 813 ) 814 for source in sources 815 if name is None or source.name == name 816 ] 817 818 def list_destinations( 819 self, 820 name: str | None = None, 821 *, 822 name_filter: Callable | None = None, 823 ) -> list[CloudDestination]: 824 """List all destinations in the workspace. 825 826 TODO: Add pagination support 827 """ 828 destinations = api_util.list_destinations( 829 api_root=self.api_root, 830 workspace_id=self.workspace_id, 831 name=name, 832 name_filter=name_filter, 833 client_id=self.client_id, 834 client_secret=self.client_secret, 835 bearer_token=self.bearer_token, 836 ) 837 return [ 838 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 839 workspace=self, 840 destination_response=destination, 841 ) 842 for destination in destinations 843 if name is None or destination.name == name 844 ] 845 846 def publish_custom_source_definition( 847 self, 848 name: str, 849 *, 850 manifest_yaml: dict[str, Any] | Path | str | None = None, 851 docker_image: str | None = None, 852 docker_tag: str | None = None, 853 unique: bool = True, 854 pre_validate: bool = True, 855 testing_values: dict[str, Any] | None = None, 856 ) -> CustomCloudSourceDefinition: 857 """Publish a custom source connector definition. 858 859 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 860 and docker_tag (for Docker connectors), but not both. 861 862 Args: 863 name: Display name for the connector definition 864 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 865 docker_image: Docker repository (e.g., 'airbyte/source-custom') 866 docker_tag: Docker image tag (e.g., '1.0.0') 867 unique: Whether to enforce name uniqueness 868 pre_validate: Whether to validate manifest client-side (YAML only) 869 testing_values: Optional configuration values to use for testing in the 870 Connector Builder UI. If provided, these values are stored as the complete 871 testing values object for the connector builder project (replaces any existing 872 values), allowing immediate test read operations. 873 874 Returns: 875 CustomCloudSourceDefinition object representing the created definition 876 877 Raises: 878 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 879 AirbyteDuplicateResourcesError: If unique=True and name already exists 880 """ 881 is_yaml = manifest_yaml is not None 882 is_docker = docker_image is not None 883 884 if is_yaml == is_docker: 885 raise exc.PyAirbyteInputError( 886 message=( 887 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 888 "docker_image + docker_tag (for Docker connectors), but not both" 889 ), 890 context={ 891 "manifest_yaml_provided": is_yaml, 892 "docker_image_provided": is_docker, 893 }, 894 ) 895 896 if is_docker and docker_tag is None: 897 raise exc.PyAirbyteInputError( 898 message="docker_tag is required when docker_image is specified", 899 context={"docker_image": docker_image}, 900 ) 901 902 if unique: 903 existing = self.list_custom_source_definitions( 904 definition_type="yaml" if is_yaml else "docker", 905 ) 906 if any(d.name == name for d in existing): 907 raise exc.AirbyteDuplicateResourcesError( 908 resource_type="custom_source_definition", 909 resource_name=name, 910 ) 911 912 if is_yaml: 913 manifest_dict: dict[str, Any] 914 if isinstance(manifest_yaml, Path): 915 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 916 elif isinstance(manifest_yaml, str): 917 manifest_dict = yaml.safe_load(manifest_yaml) 918 elif manifest_yaml is not None: 919 manifest_dict = manifest_yaml 920 else: 921 raise exc.PyAirbyteInputError( 922 message="manifest_yaml is required for YAML connectors", 923 context={"name": name}, 924 ) 925 926 if pre_validate: 927 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 928 929 result = api_util.create_custom_yaml_source_definition( 930 name=name, 931 workspace_id=self.workspace_id, 932 manifest=manifest_dict, 933 api_root=self.api_root, 934 client_id=self.client_id, 935 client_secret=self.client_secret, 936 bearer_token=self.bearer_token, 937 ) 938 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 939 self, result 940 ) 941 942 # Set testing values if provided 943 if testing_values is not None: 944 custom_definition.set_testing_values(testing_values) 945 946 return custom_definition 947 948 raise NotImplementedError( 949 "Docker custom source definitions are not yet supported. " 950 "Only YAML manifest-based custom sources are currently available." 951 ) 952 953 def list_custom_source_definitions( 954 self, 955 *, 956 definition_type: Literal["yaml", "docker"], 957 ) -> list[CustomCloudSourceDefinition]: 958 """List custom source connector definitions. 959 960 Args: 961 definition_type: Connector type to list ("yaml" or "docker"). Required. 962 963 Returns: 964 List of CustomCloudSourceDefinition objects matching the specified type 965 """ 966 if definition_type == "yaml": 967 yaml_definitions = api_util.list_custom_yaml_source_definitions( 968 workspace_id=self.workspace_id, 969 api_root=self.api_root, 970 client_id=self.client_id, 971 client_secret=self.client_secret, 972 bearer_token=self.bearer_token, 973 ) 974 return [ 975 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 976 for d in yaml_definitions 977 ] 978 979 raise NotImplementedError( 980 "Docker custom source definitions are not yet supported. " 981 "Only YAML manifest-based custom sources are currently available." 982 ) 983 984 def get_custom_source_definition( 985 self, 986 definition_id: str, 987 *, 988 definition_type: Literal["yaml", "docker"], 989 ) -> CustomCloudSourceDefinition: 990 """Get a specific custom source definition by ID. 991 992 Args: 993 definition_id: The definition ID 994 definition_type: Connector type ("yaml" or "docker"). Required. 995 996 Returns: 997 CustomCloudSourceDefinition object 998 """ 999 if definition_type == "yaml": 1000 result = api_util.get_custom_yaml_source_definition( 1001 workspace_id=self.workspace_id, 1002 definition_id=definition_id, 1003 api_root=self.api_root, 1004 client_id=self.client_id, 1005 client_secret=self.client_secret, 1006 bearer_token=self.bearer_token, 1007 ) 1008 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 1009 1010 raise NotImplementedError( 1011 "Docker custom source definitions are not yet supported. " 1012 "Only YAML manifest-based custom sources are currently available." 1013 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
Two authentication methods are supported (mutually exclusive):
- OAuth2 client credentials (client_id + client_secret)
- Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", )
Example with bearer token:
workspace = CloudWorkspace( workspace_id="...", bearer_token="...", )
266 @classmethod 267 def from_env( 268 cls, 269 workspace_id: str | None = None, 270 *, 271 api_root: str | None = None, 272 ) -> CloudWorkspace: 273 """Create a CloudWorkspace using credentials from environment variables. 274 275 This factory method resolves credentials from environment variables, 276 providing a convenient way to create a workspace without explicitly 277 passing credentials. 278 279 Two authentication methods are supported (mutually exclusive): 280 1. Bearer token (checked first) 281 2. OAuth2 client credentials (fallback) 282 283 Environment variables used: 284 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 285 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 286 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 287 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 288 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 289 290 Args: 291 workspace_id: The workspace ID. If not provided, will be resolved from 292 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 293 api_root: The API root URL. If not provided, will be resolved from 294 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 295 the Airbyte Cloud API. 296 297 Returns: 298 A CloudWorkspace instance configured with credentials from the environment. 299 300 Raises: 301 PyAirbyteSecretNotFoundError: If required credentials are not found in 302 the environment. 303 304 Example: 305 ```python 306 # With workspace_id from environment 307 workspace = CloudWorkspace.from_env() 308 309 # With explicit workspace_id 310 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 311 ``` 312 """ 313 resolved_api_root = resolve_cloud_api_url(api_root) 314 315 # Try bearer token first 316 bearer_token = resolve_cloud_bearer_token() 317 if bearer_token: 318 return cls( 319 workspace_id=resolve_cloud_workspace_id(workspace_id), 320 bearer_token=bearer_token, 321 api_root=resolved_api_root, 322 ) 323 324 # Fall back to client credentials 325 return cls( 326 workspace_id=resolve_cloud_workspace_id(workspace_id), 327 client_id=resolve_cloud_client_id(), 328 client_secret=resolve_cloud_client_secret(), 329 api_root=resolved_api_root, 330 )
Create a CloudWorkspace using credentials from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.
Two authentication methods are supported (mutually exclusive):
- Bearer token (checked first)
- OAuth2 client credentials (fallback)
Environment variables used:
AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
- workspace_id: The workspace ID. If not provided, will be resolved from
the
AIRBYTE_CLOUD_WORKSPACE_IDenvironment variable. - api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API.
Returns:
A CloudWorkspace instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
332 @property 333 def workspace_url(self) -> str | None: 334 """The web URL of the workspace.""" 335 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
369 def get_organization( 370 self, 371 *, 372 raise_on_error: bool = True, 373 ) -> CloudOrganization | None: 374 """Get the organization this workspace belongs to. 375 376 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 377 which may not be available with workspace-scoped credentials. 378 379 Args: 380 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 381 If False, returns None instead of raising. 382 383 Returns: 384 CloudOrganization object with organization_id and organization_name, 385 or None if raise_on_error=False and an error occurred. 386 387 Raises: 388 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 389 (e.g., due to insufficient permissions or missing data). 390 """ 391 try: 392 info = self._organization_info 393 except (AirbyteError, NotImplementedError): 394 if raise_on_error: 395 raise 396 return None 397 398 organization_id = info.get("organizationId") 399 organization_name = info.get("organizationName") 400 401 # Validate that both organization_id and organization_name are non-null and non-empty 402 if not organization_id or not organization_name: 403 if raise_on_error: 404 raise AirbyteError( 405 message="Organization info is incomplete.", 406 context={ 407 "organization_id": organization_id, 408 "organization_name": organization_name, 409 }, 410 ) 411 return None 412 413 return CloudOrganization( 414 organization_id=organization_id, 415 organization_name=organization_name, 416 api_root=self.api_root, 417 client_id=self.client_id, 418 client_secret=self.client_secret, 419 bearer_token=self.bearer_token, 420 )
Get the organization this workspace belongs to.
Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.
Arguments:
- raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:
CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.
Raises:
- AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
424 def connect(self) -> None: 425 """Check that the workspace is reachable and raise an exception otherwise. 426 427 Note: It is not necessary to call this method before calling other operations. It 428 serves primarily as a simple check to ensure that the workspace is reachable 429 and credentials are correct. 430 """ 431 _ = api_util.get_workspace( 432 api_root=self.api_root, 433 workspace_id=self.workspace_id, 434 client_id=self.client_id, 435 client_secret=self.client_secret, 436 bearer_token=self.bearer_token, 437 ) 438 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
442 def get_connection( 443 self, 444 connection_id: str, 445 ) -> CloudConnection: 446 """Get a connection by ID. 447 448 This method does not fetch data from the API. It returns a `CloudConnection` object, 449 which will be loaded lazily as needed. 450 """ 451 return CloudConnection( 452 workspace=self, 453 connection_id=connection_id, 454 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
456 def get_source( 457 self, 458 source_id: str, 459 ) -> CloudSource: 460 """Get a source by ID. 461 462 This method does not fetch data from the API. It returns a `CloudSource` object, 463 which will be loaded lazily as needed. 464 """ 465 return CloudSource( 466 workspace=self, 467 connector_id=source_id, 468 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
470 def get_destination( 471 self, 472 destination_id: str, 473 ) -> CloudDestination: 474 """Get a destination by ID. 475 476 This method does not fetch data from the API. It returns a `CloudDestination` object, 477 which will be loaded lazily as needed. 478 """ 479 return CloudDestination( 480 workspace=self, 481 connector_id=destination_id, 482 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
486 def deploy_source( 487 self, 488 name: str, 489 source: Source, 490 *, 491 unique: bool = True, 492 random_name_suffix: bool = False, 493 ) -> CloudSource: 494 """Deploy a source to the workspace. 495 496 Returns the newly deployed source. 497 498 Args: 499 name: The name to use when deploying. 500 source: The source object to deploy. 501 unique: Whether to require a unique name. If `True`, duplicate names 502 are not allowed. Defaults to `True`. 503 random_name_suffix: Whether to append a random suffix to the name. 504 """ 505 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 506 source_config_dict["sourceType"] = source.name.replace("source-", "") 507 508 if random_name_suffix: 509 name += f" (ID: {text_util.generate_random_suffix()})" 510 511 if unique: 512 existing = self.list_sources(name=name) 513 if existing: 514 raise exc.AirbyteDuplicateResourcesError( 515 resource_type="source", 516 resource_name=name, 517 ) 518 519 deployed_source = api_util.create_source( 520 name=name, 521 api_root=self.api_root, 522 workspace_id=self.workspace_id, 523 config=source_config_dict, 524 client_id=self.client_id, 525 client_secret=self.client_secret, 526 bearer_token=self.bearer_token, 527 ) 528 return CloudSource( 529 workspace=self, 530 connector_id=deployed_source.source_id, 531 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
533 def deploy_destination( 534 self, 535 name: str, 536 destination: Destination | dict[str, Any], 537 *, 538 unique: bool = True, 539 random_name_suffix: bool = False, 540 ) -> CloudDestination: 541 """Deploy a destination to the workspace. 542 543 Returns the newly deployed destination ID. 544 545 Args: 546 name: The name to use when deploying. 547 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 548 dictionary of configuration values. 549 unique: Whether to require a unique name. If `True`, duplicate names 550 are not allowed. Defaults to `True`. 551 random_name_suffix: Whether to append a random suffix to the name. 552 """ 553 if isinstance(destination, Destination): 554 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 555 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 556 # raise ValueError(destination_conf_dict) 557 else: 558 destination_conf_dict = destination.copy() 559 if "destinationType" not in destination_conf_dict: 560 raise exc.PyAirbyteInputError( 561 message="Missing `destinationType` in configuration dictionary.", 562 ) 563 564 if random_name_suffix: 565 name += f" (ID: {text_util.generate_random_suffix()})" 566 567 if unique: 568 existing = self.list_destinations(name=name) 569 if existing: 570 raise exc.AirbyteDuplicateResourcesError( 571 resource_type="destination", 572 resource_name=name, 573 ) 574 575 deployed_destination = api_util.create_destination( 576 name=name, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 config=destination_conf_dict, # Wants a dataclass but accepts dict 580 client_id=self.client_id, 581 client_secret=self.client_secret, 582 bearer_token=self.bearer_token, 583 ) 584 return CloudDestination( 585 workspace=self, 586 connector_id=deployed_destination.destination_id, 587 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
589 def permanently_delete_source( 590 self, 591 source: str | CloudSource, 592 *, 593 safe_mode: bool = True, 594 ) -> None: 595 """Delete a source from the workspace. 596 597 You can pass either the source ID `str` or a deployed `Source` object. 598 599 Args: 600 source: The source ID or CloudSource object to delete 601 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 602 (case insensitive) to prevent accidental deletion. Defaults to True. 603 """ 604 if not isinstance(source, (str, CloudSource)): 605 raise exc.PyAirbyteInputError( 606 message="Invalid source type.", 607 input_value=type(source).__name__, 608 ) 609 610 api_util.delete_source( 611 source_id=source.connector_id if isinstance(source, CloudSource) else source, 612 source_name=source.name if isinstance(source, CloudSource) else None, 613 api_root=self.api_root, 614 client_id=self.client_id, 615 client_secret=self.client_secret, 616 bearer_token=self.bearer_token, 617 safe_mode=safe_mode, 618 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
622 def permanently_delete_destination( 623 self, 624 destination: str | CloudDestination, 625 *, 626 safe_mode: bool = True, 627 ) -> None: 628 """Delete a deployed destination from the workspace. 629 630 You can pass either the `Cache` class or the deployed destination ID as a `str`. 631 632 Args: 633 destination: The destination ID or CloudDestination object to delete 634 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 635 (case insensitive) to prevent accidental deletion. Defaults to True. 636 """ 637 if not isinstance(destination, (str, CloudDestination)): 638 raise exc.PyAirbyteInputError( 639 message="Invalid destination type.", 640 input_value=type(destination).__name__, 641 ) 642 643 api_util.delete_destination( 644 destination_id=( 645 destination if isinstance(destination, str) else destination.destination_id 646 ), 647 destination_name=( 648 destination.name if isinstance(destination, CloudDestination) else None 649 ), 650 api_root=self.api_root, 651 client_id=self.client_id, 652 client_secret=self.client_secret, 653 bearer_token=self.bearer_token, 654 safe_mode=safe_mode, 655 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
659 def deploy_connection( 660 self, 661 connection_name: str, 662 *, 663 source: CloudSource | str, 664 selected_streams: list[str], 665 destination: CloudDestination | str, 666 table_prefix: str | None = None, 667 ) -> CloudConnection: 668 """Create a new connection between an already deployed source and destination. 669 670 Returns the newly deployed connection object. 671 672 Args: 673 connection_name: The name of the connection. 674 source: The deployed source. You can pass a source ID or a CloudSource object. 675 destination: The deployed destination. You can pass a destination ID or a 676 CloudDestination object. 677 table_prefix: Optional. The table prefix to use when syncing to the destination. 678 selected_streams: The selected stream names to sync within the connection. 679 """ 680 if not selected_streams: 681 raise exc.PyAirbyteInputError( 682 guidance="You must provide `selected_streams` when creating a connection." 683 ) 684 685 source_id: str = source if isinstance(source, str) else source.connector_id 686 destination_id: str = ( 687 destination if isinstance(destination, str) else destination.connector_id 688 ) 689 690 deployed_connection = api_util.create_connection( 691 name=connection_name, 692 source_id=source_id, 693 destination_id=destination_id, 694 api_root=self.api_root, 695 workspace_id=self.workspace_id, 696 selected_stream_names=selected_streams, 697 prefix=table_prefix or "", 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 ) 702 703 return CloudConnection( 704 workspace=self, 705 connection_id=deployed_connection.connection_id, 706 source=deployed_connection.source_id, 707 destination=deployed_connection.destination_id, 708 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
710 def permanently_delete_connection( 711 self, 712 connection: str | CloudConnection, 713 *, 714 cascade_delete_source: bool = False, 715 cascade_delete_destination: bool = False, 716 safe_mode: bool = True, 717 ) -> None: 718 """Delete a deployed connection from the workspace. 719 720 Args: 721 connection: The connection ID or CloudConnection object to delete 722 cascade_delete_source: If True, also delete the source after deleting the connection 723 cascade_delete_destination: If True, also delete the destination after deleting 724 the connection 725 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 726 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 727 to cascade deletes. 728 """ 729 if connection is None: 730 raise ValueError("No connection ID provided.") 731 732 if isinstance(connection, str): 733 connection = CloudConnection( 734 workspace=self, 735 connection_id=connection, 736 ) 737 738 api_util.delete_connection( 739 connection_id=connection.connection_id, 740 connection_name=connection.name, 741 api_root=self.api_root, 742 workspace_id=self.workspace_id, 743 client_id=self.client_id, 744 client_secret=self.client_secret, 745 bearer_token=self.bearer_token, 746 safe_mode=safe_mode, 747 ) 748 749 if cascade_delete_source: 750 self.permanently_delete_source( 751 source=connection.source_id, 752 safe_mode=safe_mode, 753 ) 754 if cascade_delete_destination: 755 self.permanently_delete_destination( 756 destination=connection.destination_id, 757 safe_mode=safe_mode, 758 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
762 def list_connections( 763 self, 764 name: str | None = None, 765 *, 766 name_filter: Callable | None = None, 767 ) -> list[CloudConnection]: 768 """List connections by name in the workspace. 769 770 TODO: Add pagination support 771 """ 772 connections = api_util.list_connections( 773 api_root=self.api_root, 774 workspace_id=self.workspace_id, 775 name=name, 776 name_filter=name_filter, 777 client_id=self.client_id, 778 client_secret=self.client_secret, 779 bearer_token=self.bearer_token, 780 ) 781 return [ 782 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 783 workspace=self, 784 connection_response=connection, 785 ) 786 for connection in connections 787 if name is None or connection.name == name 788 ]
List connections by name in the workspace.
TODO: Add pagination support
790 def list_sources( 791 self, 792 name: str | None = None, 793 *, 794 name_filter: Callable | None = None, 795 ) -> list[CloudSource]: 796 """List all sources in the workspace. 797 798 TODO: Add pagination support 799 """ 800 sources = api_util.list_sources( 801 api_root=self.api_root, 802 workspace_id=self.workspace_id, 803 name=name, 804 name_filter=name_filter, 805 client_id=self.client_id, 806 client_secret=self.client_secret, 807 bearer_token=self.bearer_token, 808 ) 809 return [ 810 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 811 workspace=self, 812 source_response=source, 813 ) 814 for source in sources 815 if name is None or source.name == name 816 ]
List all sources in the workspace.
TODO: Add pagination support
818 def list_destinations( 819 self, 820 name: str | None = None, 821 *, 822 name_filter: Callable | None = None, 823 ) -> list[CloudDestination]: 824 """List all destinations in the workspace. 825 826 TODO: Add pagination support 827 """ 828 destinations = api_util.list_destinations( 829 api_root=self.api_root, 830 workspace_id=self.workspace_id, 831 name=name, 832 name_filter=name_filter, 833 client_id=self.client_id, 834 client_secret=self.client_secret, 835 bearer_token=self.bearer_token, 836 ) 837 return [ 838 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 839 workspace=self, 840 destination_response=destination, 841 ) 842 for destination in destinations 843 if name is None or destination.name == name 844 ]
List all destinations in the workspace.
TODO: Add pagination support
846 def publish_custom_source_definition( 847 self, 848 name: str, 849 *, 850 manifest_yaml: dict[str, Any] | Path | str | None = None, 851 docker_image: str | None = None, 852 docker_tag: str | None = None, 853 unique: bool = True, 854 pre_validate: bool = True, 855 testing_values: dict[str, Any] | None = None, 856 ) -> CustomCloudSourceDefinition: 857 """Publish a custom source connector definition. 858 859 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 860 and docker_tag (for Docker connectors), but not both. 861 862 Args: 863 name: Display name for the connector definition 864 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 865 docker_image: Docker repository (e.g., 'airbyte/source-custom') 866 docker_tag: Docker image tag (e.g., '1.0.0') 867 unique: Whether to enforce name uniqueness 868 pre_validate: Whether to validate manifest client-side (YAML only) 869 testing_values: Optional configuration values to use for testing in the 870 Connector Builder UI. If provided, these values are stored as the complete 871 testing values object for the connector builder project (replaces any existing 872 values), allowing immediate test read operations. 873 874 Returns: 875 CustomCloudSourceDefinition object representing the created definition 876 877 Raises: 878 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 879 AirbyteDuplicateResourcesError: If unique=True and name already exists 880 """ 881 is_yaml = manifest_yaml is not None 882 is_docker = docker_image is not None 883 884 if is_yaml == is_docker: 885 raise exc.PyAirbyteInputError( 886 message=( 887 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 888 "docker_image + docker_tag (for Docker connectors), but not both" 889 ), 890 context={ 891 "manifest_yaml_provided": is_yaml, 892 "docker_image_provided": is_docker, 893 }, 894 ) 895 896 if is_docker and docker_tag is None: 897 raise exc.PyAirbyteInputError( 898 message="docker_tag is required when docker_image is specified", 899 context={"docker_image": docker_image}, 900 ) 901 902 if unique: 903 existing = self.list_custom_source_definitions( 904 definition_type="yaml" if is_yaml else "docker", 905 ) 906 if any(d.name == name for d in existing): 907 raise exc.AirbyteDuplicateResourcesError( 908 resource_type="custom_source_definition", 909 resource_name=name, 910 ) 911 912 if is_yaml: 913 manifest_dict: dict[str, Any] 914 if isinstance(manifest_yaml, Path): 915 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 916 elif isinstance(manifest_yaml, str): 917 manifest_dict = yaml.safe_load(manifest_yaml) 918 elif manifest_yaml is not None: 919 manifest_dict = manifest_yaml 920 else: 921 raise exc.PyAirbyteInputError( 922 message="manifest_yaml is required for YAML connectors", 923 context={"name": name}, 924 ) 925 926 if pre_validate: 927 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 928 929 result = api_util.create_custom_yaml_source_definition( 930 name=name, 931 workspace_id=self.workspace_id, 932 manifest=manifest_dict, 933 api_root=self.api_root, 934 client_id=self.client_id, 935 client_secret=self.client_secret, 936 bearer_token=self.bearer_token, 937 ) 938 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 939 self, result 940 ) 941 942 # Set testing values if provided 943 if testing_values is not None: 944 custom_definition.set_testing_values(testing_values) 945 946 return custom_definition 947 948 raise NotImplementedError( 949 "Docker custom source definitions are not yet supported. " 950 "Only YAML manifest-based custom sources are currently available." 951 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
953 def list_custom_source_definitions( 954 self, 955 *, 956 definition_type: Literal["yaml", "docker"], 957 ) -> list[CustomCloudSourceDefinition]: 958 """List custom source connector definitions. 959 960 Args: 961 definition_type: Connector type to list ("yaml" or "docker"). Required. 962 963 Returns: 964 List of CustomCloudSourceDefinition objects matching the specified type 965 """ 966 if definition_type == "yaml": 967 yaml_definitions = api_util.list_custom_yaml_source_definitions( 968 workspace_id=self.workspace_id, 969 api_root=self.api_root, 970 client_id=self.client_id, 971 client_secret=self.client_secret, 972 bearer_token=self.bearer_token, 973 ) 974 return [ 975 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 976 for d in yaml_definitions 977 ] 978 979 raise NotImplementedError( 980 "Docker custom source definitions are not yet supported. " 981 "Only YAML manifest-based custom sources are currently available." 982 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
984 def get_custom_source_definition( 985 self, 986 definition_id: str, 987 *, 988 definition_type: Literal["yaml", "docker"], 989 ) -> CustomCloudSourceDefinition: 990 """Get a specific custom source definition by ID. 991 992 Args: 993 definition_id: The definition ID 994 definition_type: Connector type ("yaml" or "docker"). Required. 995 996 Returns: 997 CustomCloudSourceDefinition object 998 """ 999 if definition_type == "yaml": 1000 result = api_util.get_custom_yaml_source_definition( 1001 workspace_id=self.workspace_id, 1002 definition_id=definition_id, 1003 api_root=self.api_root, 1004 client_id=self.client_id, 1005 client_secret=self.client_secret, 1006 bearer_token=self.bearer_token, 1007 ) 1008 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 1009 1010 raise NotImplementedError( 1011 "Docker custom source definitions are not yet supported. " 1012 "Only YAML manifest-based custom sources are currently available." 1013 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object
40class CloudConnection: # noqa: PLR0904 # Too many public methods 41 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 42 43 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 44 """ 45 46 def __init__( 47 self, 48 workspace: CloudWorkspace, 49 connection_id: str, 50 source: str | None = None, 51 destination: str | None = None, 52 ) -> None: 53 """It is not recommended to create a `CloudConnection` object directly. 54 55 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 56 """ 57 self.connection_id = connection_id 58 """The ID of the connection.""" 59 60 self.workspace = workspace 61 """The workspace that the connection belongs to.""" 62 63 self._source_id = source 64 """The ID of the source.""" 65 66 self._destination_id = destination 67 """The ID of the destination.""" 68 69 self._connection_info: ConnectionResponse | None = None 70 """The connection info object. (Cached.)""" 71 72 self._cloud_source_object: CloudSource | None = None 73 """The source object. (Cached.)""" 74 75 self._cloud_destination_object: CloudDestination | None = None 76 """The destination object. (Cached.)""" 77 78 def _fetch_connection_info( 79 self, 80 *, 81 force_refresh: bool = False, 82 verify: bool = True, 83 ) -> ConnectionResponse: 84 """Fetch and cache connection info from the API. 85 86 By default, this method will only fetch from the API if connection info is not 87 already cached. It also verifies that the connection belongs to the expected 88 workspace unless verification is explicitly disabled. 89 90 Args: 91 force_refresh: If True, always fetch from the API even if cached. 92 If False (default), only fetch if not already cached. 93 verify: If True (default), verify that the connection is valid (e.g., that 94 the workspace_id matches this object's workspace). Raises an error if 95 validation fails. 96 97 Returns: 98 The ConnectionResponse from the API. 99 100 Raises: 101 AirbyteWorkspaceMismatchError: If verify is True and the connection's 102 workspace_id doesn't match the expected workspace. 103 AirbyteMissingResourceError: If the connection doesn't exist. 104 """ 105 if not force_refresh and self._connection_info is not None: 106 # Use cached info, but still verify if requested 107 if verify: 108 self._verify_workspace_match(self._connection_info) 109 return self._connection_info 110 111 # Fetch from API 112 connection_info = api_util.get_connection( 113 workspace_id=self.workspace.workspace_id, 114 connection_id=self.connection_id, 115 api_root=self.workspace.api_root, 116 client_id=self.workspace.client_id, 117 client_secret=self.workspace.client_secret, 118 bearer_token=self.workspace.bearer_token, 119 ) 120 121 # Cache the result first (before verification may raise) 122 self._connection_info = connection_info 123 124 # Verify if requested 125 if verify: 126 self._verify_workspace_match(connection_info) 127 128 return connection_info 129 130 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 131 """Verify that the connection belongs to the expected workspace. 132 133 Raises: 134 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 135 """ 136 if connection_info.workspace_id != self.workspace.workspace_id: 137 raise AirbyteWorkspaceMismatchError( 138 resource_type="connection", 139 resource_id=self.connection_id, 140 workspace=self.workspace, 141 expected_workspace_id=self.workspace.workspace_id, 142 actual_workspace_id=connection_info.workspace_id, 143 message=( 144 f"Connection '{self.connection_id}' belongs to workspace " 145 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 146 ), 147 ) 148 149 def check_is_valid(self) -> bool: 150 """Check if this connection exists and belongs to the expected workspace. 151 152 This method fetches connection info from the API (if not already cached) and 153 verifies that the connection's workspace_id matches the workspace associated 154 with this CloudConnection object. 155 156 Returns: 157 True if the connection exists and belongs to the expected workspace. 158 159 Raises: 160 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 161 AirbyteMissingResourceError: If the connection doesn't exist. 162 """ 163 self._fetch_connection_info(force_refresh=False, verify=True) 164 return True 165 166 @classmethod 167 def _from_connection_response( 168 cls, 169 workspace: CloudWorkspace, 170 connection_response: ConnectionResponse, 171 ) -> CloudConnection: 172 """Create a CloudConnection from a ConnectionResponse.""" 173 result = cls( 174 workspace=workspace, 175 connection_id=connection_response.connection_id, 176 source=connection_response.source_id, 177 destination=connection_response.destination_id, 178 ) 179 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 180 return result 181 182 # Properties 183 184 @property 185 def name(self) -> str | None: 186 """Get the display name of the connection, if available. 187 188 E.g. "My Postgres to Snowflake", not the connection ID. 189 """ 190 if not self._connection_info: 191 self._connection_info = self._fetch_connection_info() 192 193 return self._connection_info.name 194 195 @property 196 def source_id(self) -> str: 197 """The ID of the source.""" 198 if not self._source_id: 199 if not self._connection_info: 200 self._connection_info = self._fetch_connection_info() 201 202 self._source_id = self._connection_info.source_id 203 204 return self._source_id 205 206 @property 207 def source(self) -> CloudSource: 208 """Get the source object.""" 209 if self._cloud_source_object: 210 return self._cloud_source_object 211 212 self._cloud_source_object = CloudSource( 213 workspace=self.workspace, 214 connector_id=self.source_id, 215 ) 216 return self._cloud_source_object 217 218 @property 219 def destination_id(self) -> str: 220 """The ID of the destination.""" 221 if not self._destination_id: 222 if not self._connection_info: 223 self._connection_info = self._fetch_connection_info() 224 225 self._destination_id = self._connection_info.destination_id 226 227 return self._destination_id 228 229 @property 230 def destination(self) -> CloudDestination: 231 """Get the destination object.""" 232 if self._cloud_destination_object: 233 return self._cloud_destination_object 234 235 self._cloud_destination_object = CloudDestination( 236 workspace=self.workspace, 237 connector_id=self.destination_id, 238 ) 239 return self._cloud_destination_object 240 241 @property 242 def stream_names(self) -> list[str]: 243 """The stream names.""" 244 if not self._connection_info: 245 self._connection_info = self._fetch_connection_info() 246 247 return [stream.name for stream in self._connection_info.configurations.streams or []] 248 249 @property 250 def table_prefix(self) -> str: 251 """The table prefix.""" 252 if not self._connection_info: 253 self._connection_info = self._fetch_connection_info() 254 255 return self._connection_info.prefix or "" 256 257 @property 258 def connection_url(self) -> str | None: 259 """The web URL to the connection.""" 260 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 261 262 @property 263 def job_history_url(self) -> str | None: 264 """The URL to the job history for the connection.""" 265 return f"{self.connection_url}/timeline" 266 267 # Run Sync 268 269 def run_sync( 270 self, 271 *, 272 wait: bool = True, 273 wait_timeout: int = 300, 274 ) -> SyncResult: 275 """Run a sync.""" 276 connection_response = api_util.run_connection( 277 connection_id=self.connection_id, 278 api_root=self.workspace.api_root, 279 workspace_id=self.workspace.workspace_id, 280 client_id=self.workspace.client_id, 281 client_secret=self.workspace.client_secret, 282 bearer_token=self.workspace.bearer_token, 283 ) 284 sync_result = SyncResult( 285 workspace=self.workspace, 286 connection=self, 287 job_id=connection_response.job_id, 288 ) 289 290 if wait: 291 sync_result.wait_for_completion( 292 wait_timeout=wait_timeout, 293 raise_failure=True, 294 raise_timeout=True, 295 ) 296 297 return sync_result 298 299 def __repr__(self) -> str: 300 """String representation of the connection.""" 301 return ( 302 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 303 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 304 ) 305 306 # Logs 307 308 def get_previous_sync_logs( 309 self, 310 *, 311 limit: int = 20, 312 offset: int | None = None, 313 from_tail: bool = True, 314 job_type: JobTypeEnum | None = None, 315 ) -> list[SyncResult]: 316 """Get previous sync jobs for a connection with pagination support. 317 318 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 319 rows_synced, start_time). Full log text can be fetched lazily via 320 `SyncResult.get_full_log_text()`. 321 322 Args: 323 limit: Maximum number of jobs to return. Defaults to 20. 324 offset: Number of jobs to skip from the beginning. Defaults to None (0). 325 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 326 If False, returns jobs ordered oldest-first (createdAt ASC). 327 Defaults to True. 328 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 329 If not specified, defaults to sync and reset jobs only (API default behavior). 330 331 Returns: 332 A list of SyncResult objects representing the sync jobs. 333 """ 334 order_by = ( 335 api_util.JOB_ORDER_BY_CREATED_AT_DESC 336 if from_tail 337 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 338 ) 339 sync_logs: list[JobResponse] = api_util.get_job_logs( 340 connection_id=self.connection_id, 341 api_root=self.workspace.api_root, 342 workspace_id=self.workspace.workspace_id, 343 limit=limit, 344 offset=offset, 345 order_by=order_by, 346 job_type=job_type, 347 client_id=self.workspace.client_id, 348 client_secret=self.workspace.client_secret, 349 bearer_token=self.workspace.bearer_token, 350 ) 351 return [ 352 SyncResult( 353 workspace=self.workspace, 354 connection=self, 355 job_id=sync_log.job_id, 356 _latest_job_info=sync_log, 357 ) 358 for sync_log in sync_logs 359 ] 360 361 def get_sync_result( 362 self, 363 job_id: int | None = None, 364 ) -> SyncResult | None: 365 """Get the sync result for the connection. 366 367 If `job_id` is not provided, the most recent sync job will be used. 368 369 Returns `None` if job_id is omitted and no previous jobs are found. 370 """ 371 if job_id is None: 372 # Get the most recent sync job 373 results = self.get_previous_sync_logs( 374 limit=1, 375 ) 376 if results: 377 return results[0] 378 379 return None 380 381 # Get the sync job by ID (lazy loaded) 382 return SyncResult( 383 workspace=self.workspace, 384 connection=self, 385 job_id=job_id, 386 ) 387 388 # Artifacts 389 390 @deprecated("Use 'dump_raw_state()' instead.") 391 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 392 """Deprecated. Use `dump_raw_state()` instead.""" 393 state_response = api_util.get_connection_state( 394 connection_id=self.connection_id, 395 api_root=self.workspace.api_root, 396 client_id=self.workspace.client_id, 397 client_secret=self.workspace.client_secret, 398 bearer_token=self.workspace.bearer_token, 399 ) 400 if state_response.get("stateType") == "not_set": 401 return None 402 return state_response.get("streamState", []) 403 404 @overload 405 def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ... 406 407 @overload 408 def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ... 409 410 def dump_raw_state( 411 self, 412 *, 413 normalize: bool = True, 414 ) -> dict[str, Any] | list[dict[str, Any]]: 415 """Dump the state for this connection. 416 417 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 418 with snake_case keys, suitable for passing to a connector's `--state` flag. 419 420 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 421 includes `stateType` and `connectionId`). This raw format can be passed 422 directly to `import_raw_state()` for backup/restore workflows. 423 424 Args: 425 normalize: If `True` (default), convert to Airbyte protocol format. 426 If `False`, return the raw Config API response. 427 428 Returns: 429 Normalized: list of protocol-format state message dicts (empty list if 430 no state). Raw: the full Config API state dict. 431 """ 432 raw = api_util.get_connection_state( 433 connection_id=self.connection_id, 434 api_root=self.workspace.api_root, 435 client_id=self.workspace.client_id, 436 client_secret=self.workspace.client_secret, 437 bearer_token=self.workspace.bearer_token, 438 ) 439 if normalize: 440 return _normalize_state_to_protocol(raw) 441 return raw 442 443 def import_raw_state( 444 self, 445 connection_state: dict[str, Any] | list[dict[str, Any]], 446 ) -> dict[str, Any]: 447 """Import (restore) the full state for this connection. 448 449 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 450 > could result in broken connections, and/or incorrect sync behavior. 451 452 Replaces the entire connection state with the provided state blob. 453 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 454 455 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 456 The `connectionId` in the blob is always overridden with this connection's 457 ID, making state blobs portable across connections. 458 459 Accepts either format: 460 461 - **Config API format** (dict with `stateType`): passed through directly. 462 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 463 converted to Config API format before sending. 464 465 Args: 466 connection_state: Connection state in either Config API or Airbyte protocol format. 467 468 Returns: 469 The updated connection state as a dictionary. 470 471 Raises: 472 AirbyteConnectionSyncActiveError: If a sync is currently running on this 473 connection (HTTP 423). Wait for the sync to complete before retrying. 474 """ 475 api_state: dict[str, Any] 476 if isinstance(connection_state, list): 477 if not _is_protocol_state_format(connection_state): 478 msg = ( 479 "Expected connection_state list to contain Airbyte protocol state " 480 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 481 "or LEGACY). Got a list that does not match protocol format." 482 ) 483 raise ValueError(msg) 484 api_state = _denormalize_protocol_state_to_api( 485 protocol_messages=connection_state, 486 connection_id=self.connection_id, 487 ) 488 elif isinstance(connection_state, dict): 489 if _is_protocol_state_format(connection_state): 490 api_state = _denormalize_protocol_state_to_api( 491 protocol_messages=[connection_state], 492 connection_id=self.connection_id, 493 ) 494 else: 495 api_state = connection_state 496 else: 497 msg = f"Expected a dict or list, got {type(connection_state)}" 498 raise TypeError(msg) 499 500 return api_util.replace_connection_state( 501 connection_id=self.connection_id, 502 connection_state_dict=api_state, 503 api_root=self.workspace.api_root, 504 client_id=self.workspace.client_id, 505 client_secret=self.workspace.client_secret, 506 bearer_token=self.workspace.bearer_token, 507 ) 508 509 def get_stream_state( 510 self, 511 stream_name: str, 512 stream_namespace: str | None = None, 513 ) -> dict[str, Any] | None: 514 """Get the state blob for a single stream within this connection. 515 516 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 517 not the full connection state envelope. 518 519 This is compatible with `stream`-type state and stream-level entries 520 within a `global`-type state. It is not compatible with `legacy` state. 521 To get or set the entire connection-level state artifact, use 522 `dump_raw_state` and `import_raw_state` instead. 523 524 Args: 525 stream_name: The name of the stream to get state for. 526 stream_namespace: The source-side stream namespace. This refers to the 527 namespace from the source (e.g., database schema), not any destination 528 namespace override set in connection advanced settings. 529 530 Returns: 531 The stream's state blob as a dictionary, or None if the stream is not found. 532 """ 533 state_data = self.dump_raw_state(normalize=False) 534 result = ConnectionStateResponse(**state_data) 535 536 streams = _get_stream_list(result) 537 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 538 539 if not matching: 540 available = [s.stream_descriptor.name for s in streams] 541 logger.warning( 542 "Stream '%s' not found in connection state for connection '%s'. " 543 "Available streams: %s", 544 stream_name, 545 self.connection_id, 546 available, 547 ) 548 return None 549 550 return matching[0].stream_state 551 552 def set_stream_state( 553 self, 554 stream_name: str, 555 state_blob_dict: dict[str, Any], 556 stream_namespace: str | None = None, 557 ) -> None: 558 """Set the state for a single stream within this connection. 559 560 Fetches the current full state, replaces only the specified stream's state, 561 then sends the full updated state back to the API. If the stream does not 562 exist in the current state, it is appended. 563 564 This is compatible with `stream`-type state and stream-level entries 565 within a `global`-type state. It is not compatible with `legacy` state. 566 To get or set the entire connection-level state artifact, use 567 `dump_raw_state` and `import_raw_state` instead. 568 569 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 570 571 Args: 572 stream_name: The name of the stream to update state for. 573 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 574 stream_namespace: The source-side stream namespace. This refers to the 575 namespace from the source (e.g., database schema), not any destination 576 namespace override set in connection advanced settings. 577 578 Raises: 579 PyAirbyteInputError: If the connection state type is not supported for 580 stream-level operations (not_set, legacy). 581 AirbyteConnectionSyncActiveError: If a sync is currently running on this 582 connection (HTTP 423). Wait for the sync to complete before retrying. 583 """ 584 state_data = self.dump_raw_state(normalize=False) 585 current = ConnectionStateResponse(**state_data) 586 587 if current.state_type == "not_set": 588 raise PyAirbyteInputError( 589 message="Cannot set stream state: connection has no existing state.", 590 context={"connection_id": self.connection_id}, 591 ) 592 593 if current.state_type == "legacy": 594 raise PyAirbyteInputError( 595 message="Cannot set stream state on a legacy-type connection state.", 596 context={"connection_id": self.connection_id}, 597 ) 598 599 new_stream_entry = { 600 "streamDescriptor": { 601 "name": stream_name, 602 **( 603 { 604 "namespace": stream_namespace, 605 } 606 if stream_namespace 607 else {} 608 ), 609 }, 610 "streamState": state_blob_dict, 611 } 612 613 raw_streams: list[dict[str, Any]] 614 if current.state_type == "stream": 615 raw_streams = state_data.get("streamState", []) 616 elif current.state_type == "global": 617 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 618 else: 619 raw_streams = [] 620 621 streams = _get_stream_list(current) 622 found = False 623 updated_streams_raw: list[dict[str, Any]] = [] 624 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 625 if _match_stream(parsed_s, stream_name, stream_namespace): 626 updated_streams_raw.append(new_stream_entry) 627 found = True 628 else: 629 updated_streams_raw.append(raw_s) 630 631 if not found: 632 updated_streams_raw.append(new_stream_entry) 633 634 full_state: dict[str, Any] = { 635 **state_data, 636 } 637 638 if current.state_type == "stream": 639 full_state["streamState"] = updated_streams_raw 640 elif current.state_type == "global": 641 original_global = state_data.get("globalState", {}) 642 full_state["globalState"] = { 643 **original_global, 644 "streamStates": updated_streams_raw, 645 } 646 647 self.import_raw_state(full_state) 648 649 @deprecated("Use 'dump_raw_catalog()' instead.") 650 def get_catalog_artifact(self) -> dict[str, Any] | None: 651 """Get the configured catalog for this connection. 652 653 Returns the full configured catalog (syncCatalog) for this connection, 654 including stream schemas, sync modes, cursor fields, and primary keys. 655 656 Uses the Config API endpoint: POST /v1/web_backend/connections/get 657 658 Returns: 659 Dictionary containing the configured catalog, or `None` if not found. 660 """ 661 return self.dump_raw_catalog() 662 663 def dump_raw_catalog( 664 self, 665 *, 666 normalize: bool = True, 667 ) -> dict[str, Any] | None: 668 """Dump the configured catalog for this connection. 669 670 By default, returns the catalog in Airbyte protocol format 671 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 672 to a connector's `--catalog` flag. 673 674 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 675 Config API (camelCase keys, nested `config` block). This raw format can be 676 passed directly to `import_raw_catalog()` for backup/restore workflows. 677 678 Args: 679 normalize: If `True` (default), convert to Airbyte protocol format. 680 If `False`, return the raw Config API catalog. 681 682 Returns: 683 The configured catalog dict, or `None` if not found. 684 """ 685 connection_response = api_util.get_connection_catalog( 686 connection_id=self.connection_id, 687 api_root=self.workspace.api_root, 688 client_id=self.workspace.client_id, 689 client_secret=self.workspace.client_secret, 690 bearer_token=self.workspace.bearer_token, 691 ) 692 raw = connection_response.get("syncCatalog") 693 if raw is None: 694 return None 695 if normalize: 696 return _normalize_catalog_to_protocol(raw) 697 return raw 698 699 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 700 """Replace the configured catalog for this connection. 701 702 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 703 > could result in broken connections, and/or incorrect sync behavior. 704 705 Accepts a configured catalog dict and replaces the connection's entire 706 catalog with it. All other connection settings remain unchanged. 707 708 Accepts either format: 709 710 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 711 passed through directly. 712 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 713 automatically converted to Config API format before sending. 714 715 Args: 716 catalog: The configured catalog dict in either format. 717 """ 718 if _is_protocol_catalog_format(catalog): 719 catalog = _denormalize_catalog_to_api(catalog) 720 721 api_util.replace_connection_catalog( 722 connection_id=self.connection_id, 723 configured_catalog_dict=catalog, 724 api_root=self.workspace.api_root, 725 client_id=self.workspace.client_id, 726 client_secret=self.workspace.client_secret, 727 bearer_token=self.workspace.bearer_token, 728 ) 729 730 def rename(self, name: str) -> CloudConnection: 731 """Rename the connection. 732 733 Args: 734 name: New name for the connection 735 736 Returns: 737 Updated CloudConnection object with refreshed info 738 """ 739 updated_response = api_util.patch_connection( 740 connection_id=self.connection_id, 741 api_root=self.workspace.api_root, 742 client_id=self.workspace.client_id, 743 client_secret=self.workspace.client_secret, 744 bearer_token=self.workspace.bearer_token, 745 name=name, 746 ) 747 self._connection_info = updated_response 748 return self 749 750 def set_table_prefix(self, prefix: str) -> CloudConnection: 751 """Set the table prefix for the connection. 752 753 Args: 754 prefix: New table prefix to use when syncing to the destination 755 756 Returns: 757 Updated CloudConnection object with refreshed info 758 """ 759 updated_response = api_util.patch_connection( 760 connection_id=self.connection_id, 761 api_root=self.workspace.api_root, 762 client_id=self.workspace.client_id, 763 client_secret=self.workspace.client_secret, 764 bearer_token=self.workspace.bearer_token, 765 prefix=prefix, 766 ) 767 self._connection_info = updated_response 768 return self 769 770 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 771 """Set the selected streams for the connection. 772 773 This is a destructive operation that can break existing connections if the 774 stream selection is changed incorrectly. Use with caution. 775 776 Args: 777 stream_names: List of stream names to sync 778 779 Returns: 780 Updated CloudConnection object with refreshed info 781 """ 782 configurations = api_util.build_stream_configurations(stream_names) 783 784 updated_response = api_util.patch_connection( 785 connection_id=self.connection_id, 786 api_root=self.workspace.api_root, 787 client_id=self.workspace.client_id, 788 client_secret=self.workspace.client_secret, 789 bearer_token=self.workspace.bearer_token, 790 configurations=configurations, 791 ) 792 self._connection_info = updated_response 793 return self 794 795 # Enable/Disable 796 797 @property 798 def enabled(self) -> bool: 799 """Get the current enabled status of the connection. 800 801 This property always fetches fresh data from the API to ensure accuracy, 802 as another process or user may have toggled the setting. 803 804 Returns: 805 True if the connection status is 'active', False otherwise. 806 """ 807 connection_info = self._fetch_connection_info(force_refresh=True) 808 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 809 810 @enabled.setter 811 def enabled(self, value: bool) -> None: 812 """Set the enabled status of the connection. 813 814 Args: 815 value: True to enable (set status to 'active'), False to disable 816 (set status to 'inactive'). 817 """ 818 self.set_enabled(enabled=value) 819 820 def set_enabled( 821 self, 822 *, 823 enabled: bool, 824 ignore_noop: bool = True, 825 ) -> None: 826 """Set the enabled status of the connection. 827 828 Args: 829 enabled: True to enable (set status to 'active'), False to disable 830 (set status to 'inactive'). 831 ignore_noop: If True (default), silently return if the connection is already 832 in the requested state. If False, raise ValueError when the requested 833 state matches the current state. 834 835 Raises: 836 ValueError: If ignore_noop is False and the connection is already in the 837 requested state. 838 """ 839 # Always fetch fresh data to check current status 840 connection_info = self._fetch_connection_info(force_refresh=True) 841 current_status = connection_info.status 842 desired_status = ( 843 api_util.models.ConnectionStatusEnum.ACTIVE 844 if enabled 845 else api_util.models.ConnectionStatusEnum.INACTIVE 846 ) 847 848 if current_status == desired_status: 849 if ignore_noop: 850 return 851 raise ValueError( 852 f"Connection is already {'enabled' if enabled else 'disabled'}. " 853 f"Current status: {current_status}" 854 ) 855 856 updated_response = api_util.patch_connection( 857 connection_id=self.connection_id, 858 api_root=self.workspace.api_root, 859 client_id=self.workspace.client_id, 860 client_secret=self.workspace.client_secret, 861 bearer_token=self.workspace.bearer_token, 862 status=desired_status, 863 ) 864 self._connection_info = updated_response 865 866 # Scheduling 867 868 def set_schedule( 869 self, 870 cron_expression: str, 871 ) -> None: 872 """Set a cron schedule for the connection. 873 874 Args: 875 cron_expression: A cron expression defining when syncs should run. 876 877 Examples: 878 - "0 0 * * *" - Daily at midnight UTC 879 - "0 */6 * * *" - Every 6 hours 880 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 881 """ 882 schedule = api_util.models.AirbyteAPIConnectionSchedule( 883 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 884 cron_expression=cron_expression, 885 ) 886 updated_response = api_util.patch_connection( 887 connection_id=self.connection_id, 888 api_root=self.workspace.api_root, 889 client_id=self.workspace.client_id, 890 client_secret=self.workspace.client_secret, 891 bearer_token=self.workspace.bearer_token, 892 schedule=schedule, 893 ) 894 self._connection_info = updated_response 895 896 def set_manual_schedule(self) -> None: 897 """Set the connection to manual scheduling. 898 899 Disables automatic syncs. Syncs will only run when manually triggered. 900 """ 901 schedule = api_util.models.AirbyteAPIConnectionSchedule( 902 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 903 ) 904 updated_response = api_util.patch_connection( 905 connection_id=self.connection_id, 906 api_root=self.workspace.api_root, 907 client_id=self.workspace.client_id, 908 client_secret=self.workspace.client_secret, 909 bearer_token=self.workspace.bearer_token, 910 schedule=schedule, 911 ) 912 self._connection_info = updated_response 913 914 # Deletions 915 916 def permanently_delete( 917 self, 918 *, 919 cascade_delete_source: bool = False, 920 cascade_delete_destination: bool = False, 921 ) -> None: 922 """Delete the connection. 923 924 Args: 925 cascade_delete_source: Whether to also delete the source. 926 cascade_delete_destination: Whether to also delete the destination. 927 """ 928 self.workspace.permanently_delete_connection(self) 929 930 if cascade_delete_source: 931 self.workspace.permanently_delete_source(self.source_id) 932 933 if cascade_delete_destination: 934 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
46 def __init__( 47 self, 48 workspace: CloudWorkspace, 49 connection_id: str, 50 source: str | None = None, 51 destination: str | None = None, 52 ) -> None: 53 """It is not recommended to create a `CloudConnection` object directly. 54 55 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 56 """ 57 self.connection_id = connection_id 58 """The ID of the connection.""" 59 60 self.workspace = workspace 61 """The workspace that the connection belongs to.""" 62 63 self._source_id = source 64 """The ID of the source.""" 65 66 self._destination_id = destination 67 """The ID of the destination.""" 68 69 self._connection_info: ConnectionResponse | None = None 70 """The connection info object. (Cached.)""" 71 72 self._cloud_source_object: CloudSource | None = None 73 """The source object. (Cached.)""" 74 75 self._cloud_destination_object: CloudDestination | None = None 76 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
149 def check_is_valid(self) -> bool: 150 """Check if this connection exists and belongs to the expected workspace. 151 152 This method fetches connection info from the API (if not already cached) and 153 verifies that the connection's workspace_id matches the workspace associated 154 with this CloudConnection object. 155 156 Returns: 157 True if the connection exists and belongs to the expected workspace. 158 159 Raises: 160 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 161 AirbyteMissingResourceError: If the connection doesn't exist. 162 """ 163 self._fetch_connection_info(force_refresh=False, verify=True) 164 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
184 @property 185 def name(self) -> str | None: 186 """Get the display name of the connection, if available. 187 188 E.g. "My Postgres to Snowflake", not the connection ID. 189 """ 190 if not self._connection_info: 191 self._connection_info = self._fetch_connection_info() 192 193 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
195 @property 196 def source_id(self) -> str: 197 """The ID of the source.""" 198 if not self._source_id: 199 if not self._connection_info: 200 self._connection_info = self._fetch_connection_info() 201 202 self._source_id = self._connection_info.source_id 203 204 return self._source_id
The ID of the source.
206 @property 207 def source(self) -> CloudSource: 208 """Get the source object.""" 209 if self._cloud_source_object: 210 return self._cloud_source_object 211 212 self._cloud_source_object = CloudSource( 213 workspace=self.workspace, 214 connector_id=self.source_id, 215 ) 216 return self._cloud_source_object
Get the source object.
218 @property 219 def destination_id(self) -> str: 220 """The ID of the destination.""" 221 if not self._destination_id: 222 if not self._connection_info: 223 self._connection_info = self._fetch_connection_info() 224 225 self._destination_id = self._connection_info.destination_id 226 227 return self._destination_id
The ID of the destination.
229 @property 230 def destination(self) -> CloudDestination: 231 """Get the destination object.""" 232 if self._cloud_destination_object: 233 return self._cloud_destination_object 234 235 self._cloud_destination_object = CloudDestination( 236 workspace=self.workspace, 237 connector_id=self.destination_id, 238 ) 239 return self._cloud_destination_object
Get the destination object.
241 @property 242 def stream_names(self) -> list[str]: 243 """The stream names.""" 244 if not self._connection_info: 245 self._connection_info = self._fetch_connection_info() 246 247 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
249 @property 250 def table_prefix(self) -> str: 251 """The table prefix.""" 252 if not self._connection_info: 253 self._connection_info = self._fetch_connection_info() 254 255 return self._connection_info.prefix or ""
The table prefix.
257 @property 258 def connection_url(self) -> str | None: 259 """The web URL to the connection.""" 260 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
262 @property 263 def job_history_url(self) -> str | None: 264 """The URL to the job history for the connection.""" 265 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
269 def run_sync( 270 self, 271 *, 272 wait: bool = True, 273 wait_timeout: int = 300, 274 ) -> SyncResult: 275 """Run a sync.""" 276 connection_response = api_util.run_connection( 277 connection_id=self.connection_id, 278 api_root=self.workspace.api_root, 279 workspace_id=self.workspace.workspace_id, 280 client_id=self.workspace.client_id, 281 client_secret=self.workspace.client_secret, 282 bearer_token=self.workspace.bearer_token, 283 ) 284 sync_result = SyncResult( 285 workspace=self.workspace, 286 connection=self, 287 job_id=connection_response.job_id, 288 ) 289 290 if wait: 291 sync_result.wait_for_completion( 292 wait_timeout=wait_timeout, 293 raise_failure=True, 294 raise_timeout=True, 295 ) 296 297 return sync_result
Run a sync.
308 def get_previous_sync_logs( 309 self, 310 *, 311 limit: int = 20, 312 offset: int | None = None, 313 from_tail: bool = True, 314 job_type: JobTypeEnum | None = None, 315 ) -> list[SyncResult]: 316 """Get previous sync jobs for a connection with pagination support. 317 318 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 319 rows_synced, start_time). Full log text can be fetched lazily via 320 `SyncResult.get_full_log_text()`. 321 322 Args: 323 limit: Maximum number of jobs to return. Defaults to 20. 324 offset: Number of jobs to skip from the beginning. Defaults to None (0). 325 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 326 If False, returns jobs ordered oldest-first (createdAt ASC). 327 Defaults to True. 328 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 329 If not specified, defaults to sync and reset jobs only (API default behavior). 330 331 Returns: 332 A list of SyncResult objects representing the sync jobs. 333 """ 334 order_by = ( 335 api_util.JOB_ORDER_BY_CREATED_AT_DESC 336 if from_tail 337 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 338 ) 339 sync_logs: list[JobResponse] = api_util.get_job_logs( 340 connection_id=self.connection_id, 341 api_root=self.workspace.api_root, 342 workspace_id=self.workspace.workspace_id, 343 limit=limit, 344 offset=offset, 345 order_by=order_by, 346 job_type=job_type, 347 client_id=self.workspace.client_id, 348 client_secret=self.workspace.client_secret, 349 bearer_token=self.workspace.bearer_token, 350 ) 351 return [ 352 SyncResult( 353 workspace=self.workspace, 354 connection=self, 355 job_id=sync_log.job_id, 356 _latest_job_info=sync_log, 357 ) 358 for sync_log in sync_logs 359 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
- job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:
A list of SyncResult objects representing the sync jobs.
361 def get_sync_result( 362 self, 363 job_id: int | None = None, 364 ) -> SyncResult | None: 365 """Get the sync result for the connection. 366 367 If `job_id` is not provided, the most recent sync job will be used. 368 369 Returns `None` if job_id is omitted and no previous jobs are found. 370 """ 371 if job_id is None: 372 # Get the most recent sync job 373 results = self.get_previous_sync_logs( 374 limit=1, 375 ) 376 if results: 377 return results[0] 378 379 return None 380 381 # Get the sync job by ID (lazy loaded) 382 return SyncResult( 383 workspace=self.workspace, 384 connection=self, 385 job_id=job_id, 386 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
390 @deprecated("Use 'dump_raw_state()' instead.") 391 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 392 """Deprecated. Use `dump_raw_state()` instead.""" 393 state_response = api_util.get_connection_state( 394 connection_id=self.connection_id, 395 api_root=self.workspace.api_root, 396 client_id=self.workspace.client_id, 397 client_secret=self.workspace.client_secret, 398 bearer_token=self.workspace.bearer_token, 399 ) 400 if state_response.get("stateType") == "not_set": 401 return None 402 return state_response.get("streamState", [])
Deprecated. Use dump_raw_state() instead.
410 def dump_raw_state( 411 self, 412 *, 413 normalize: bool = True, 414 ) -> dict[str, Any] | list[dict[str, Any]]: 415 """Dump the state for this connection. 416 417 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 418 with snake_case keys, suitable for passing to a connector's `--state` flag. 419 420 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 421 includes `stateType` and `connectionId`). This raw format can be passed 422 directly to `import_raw_state()` for backup/restore workflows. 423 424 Args: 425 normalize: If `True` (default), convert to Airbyte protocol format. 426 If `False`, return the raw Config API response. 427 428 Returns: 429 Normalized: list of protocol-format state message dicts (empty list if 430 no state). Raw: the full Config API state dict. 431 """ 432 raw = api_util.get_connection_state( 433 connection_id=self.connection_id, 434 api_root=self.workspace.api_root, 435 client_id=self.workspace.client_id, 436 client_secret=self.workspace.client_secret, 437 bearer_token=self.workspace.bearer_token, 438 ) 439 if normalize: 440 return _normalize_state_to_protocol(raw) 441 return raw
Dump the state for this connection.
By default, returns a list of Airbyte protocol AirbyteStateMessage dicts
with snake_case keys, suitable for passing to a connector's --state flag.
When normalize is False, returns the raw Config API dict (camelCase keys,
includes stateType and connectionId). This raw format can be passed
directly to import_raw_state() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API response.
Returns:
Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.
443 def import_raw_state( 444 self, 445 connection_state: dict[str, Any] | list[dict[str, Any]], 446 ) -> dict[str, Any]: 447 """Import (restore) the full state for this connection. 448 449 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 450 > could result in broken connections, and/or incorrect sync behavior. 451 452 Replaces the entire connection state with the provided state blob. 453 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 454 455 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 456 The `connectionId` in the blob is always overridden with this connection's 457 ID, making state blobs portable across connections. 458 459 Accepts either format: 460 461 - **Config API format** (dict with `stateType`): passed through directly. 462 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 463 converted to Config API format before sending. 464 465 Args: 466 connection_state: Connection state in either Config API or Airbyte protocol format. 467 468 Returns: 469 The updated connection state as a dictionary. 470 471 Raises: 472 AirbyteConnectionSyncActiveError: If a sync is currently running on this 473 connection (HTTP 423). Wait for the sync to complete before retrying. 474 """ 475 api_state: dict[str, Any] 476 if isinstance(connection_state, list): 477 if not _is_protocol_state_format(connection_state): 478 msg = ( 479 "Expected connection_state list to contain Airbyte protocol state " 480 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 481 "or LEGACY). Got a list that does not match protocol format." 482 ) 483 raise ValueError(msg) 484 api_state = _denormalize_protocol_state_to_api( 485 protocol_messages=connection_state, 486 connection_id=self.connection_id, 487 ) 488 elif isinstance(connection_state, dict): 489 if _is_protocol_state_format(connection_state): 490 api_state = _denormalize_protocol_state_to_api( 491 protocol_messages=[connection_state], 492 connection_id=self.connection_id, 493 ) 494 else: 495 api_state = connection_state 496 else: 497 msg = f"Expected a dict or list, got {type(connection_state)}" 498 raise TypeError(msg) 499 500 return api_util.replace_connection_state( 501 connection_id=self.connection_id, 502 connection_state_dict=api_state, 503 api_root=self.workspace.api_root, 504 client_id=self.workspace.client_id, 505 client_secret=self.workspace.client_secret, 506 bearer_token=self.workspace.bearer_token, 507 )
Import (restore) the full state for this connection.
⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
This is the counterpart to dump_raw_state() for backup/restore workflows.
The connectionId in the blob is always overridden with this connection's
ID, making state blobs portable across connections.
Accepts either format:
- Config API format (dict with
stateType): passed through directly. - Airbyte protocol format (list of
AirbyteStateMessagedicts): automatically converted to Config API format before sending.
Arguments:
- connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:
The updated connection state as a dictionary.
Raises:
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
509 def get_stream_state( 510 self, 511 stream_name: str, 512 stream_namespace: str | None = None, 513 ) -> dict[str, Any] | None: 514 """Get the state blob for a single stream within this connection. 515 516 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 517 not the full connection state envelope. 518 519 This is compatible with `stream`-type state and stream-level entries 520 within a `global`-type state. It is not compatible with `legacy` state. 521 To get or set the entire connection-level state artifact, use 522 `dump_raw_state` and `import_raw_state` instead. 523 524 Args: 525 stream_name: The name of the stream to get state for. 526 stream_namespace: The source-side stream namespace. This refers to the 527 namespace from the source (e.g., database schema), not any destination 528 namespace override set in connection advanced settings. 529 530 Returns: 531 The stream's state blob as a dictionary, or None if the stream is not found. 532 """ 533 state_data = self.dump_raw_state(normalize=False) 534 result = ConnectionStateResponse(**state_data) 535 536 streams = _get_stream_list(result) 537 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 538 539 if not matching: 540 available = [s.stream_descriptor.name for s in streams] 541 logger.warning( 542 "Stream '%s' not found in connection state for connection '%s'. " 543 "Available streams: %s", 544 stream_name, 545 self.connection_id, 546 available, 547 ) 548 return None 549 550 return matching[0].stream_state
Get the state blob for a single stream within this connection.
Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Arguments:
- stream_name: The name of the stream to get state for.
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:
The stream's state blob as a dictionary, or None if the stream is not found.
552 def set_stream_state( 553 self, 554 stream_name: str, 555 state_blob_dict: dict[str, Any], 556 stream_namespace: str | None = None, 557 ) -> None: 558 """Set the state for a single stream within this connection. 559 560 Fetches the current full state, replaces only the specified stream's state, 561 then sends the full updated state back to the API. If the stream does not 562 exist in the current state, it is appended. 563 564 This is compatible with `stream`-type state and stream-level entries 565 within a `global`-type state. It is not compatible with `legacy` state. 566 To get or set the entire connection-level state artifact, use 567 `dump_raw_state` and `import_raw_state` instead. 568 569 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 570 571 Args: 572 stream_name: The name of the stream to update state for. 573 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 574 stream_namespace: The source-side stream namespace. This refers to the 575 namespace from the source (e.g., database schema), not any destination 576 namespace override set in connection advanced settings. 577 578 Raises: 579 PyAirbyteInputError: If the connection state type is not supported for 580 stream-level operations (not_set, legacy). 581 AirbyteConnectionSyncActiveError: If a sync is currently running on this 582 connection (HTTP 423). Wait for the sync to complete before retrying. 583 """ 584 state_data = self.dump_raw_state(normalize=False) 585 current = ConnectionStateResponse(**state_data) 586 587 if current.state_type == "not_set": 588 raise PyAirbyteInputError( 589 message="Cannot set stream state: connection has no existing state.", 590 context={"connection_id": self.connection_id}, 591 ) 592 593 if current.state_type == "legacy": 594 raise PyAirbyteInputError( 595 message="Cannot set stream state on a legacy-type connection state.", 596 context={"connection_id": self.connection_id}, 597 ) 598 599 new_stream_entry = { 600 "streamDescriptor": { 601 "name": stream_name, 602 **( 603 { 604 "namespace": stream_namespace, 605 } 606 if stream_namespace 607 else {} 608 ), 609 }, 610 "streamState": state_blob_dict, 611 } 612 613 raw_streams: list[dict[str, Any]] 614 if current.state_type == "stream": 615 raw_streams = state_data.get("streamState", []) 616 elif current.state_type == "global": 617 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 618 else: 619 raw_streams = [] 620 621 streams = _get_stream_list(current) 622 found = False 623 updated_streams_raw: list[dict[str, Any]] = [] 624 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 625 if _match_stream(parsed_s, stream_name, stream_namespace): 626 updated_streams_raw.append(new_stream_entry) 627 found = True 628 else: 629 updated_streams_raw.append(raw_s) 630 631 if not found: 632 updated_streams_raw.append(new_stream_entry) 633 634 full_state: dict[str, Any] = { 635 **state_data, 636 } 637 638 if current.state_type == "stream": 639 full_state["streamState"] = updated_streams_raw 640 elif current.state_type == "global": 641 original_global = state_data.get("globalState", {}) 642 full_state["globalState"] = { 643 **original_global, 644 "streamStates": updated_streams_raw, 645 } 646 647 self.import_raw_state(full_state)
Set the state for a single stream within this connection.
Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Uses the safe variant that prevents updates while a sync is running (HTTP 423).
Arguments:
- stream_name: The name of the stream to update state for.
- state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
- PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
649 @deprecated("Use 'dump_raw_catalog()' instead.") 650 def get_catalog_artifact(self) -> dict[str, Any] | None: 651 """Get the configured catalog for this connection. 652 653 Returns the full configured catalog (syncCatalog) for this connection, 654 including stream schemas, sync modes, cursor fields, and primary keys. 655 656 Uses the Config API endpoint: POST /v1/web_backend/connections/get 657 658 Returns: 659 Dictionary containing the configured catalog, or `None` if not found. 660 """ 661 return self.dump_raw_catalog()
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
663 def dump_raw_catalog( 664 self, 665 *, 666 normalize: bool = True, 667 ) -> dict[str, Any] | None: 668 """Dump the configured catalog for this connection. 669 670 By default, returns the catalog in Airbyte protocol format 671 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 672 to a connector's `--catalog` flag. 673 674 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 675 Config API (camelCase keys, nested `config` block). This raw format can be 676 passed directly to `import_raw_catalog()` for backup/restore workflows. 677 678 Args: 679 normalize: If `True` (default), convert to Airbyte protocol format. 680 If `False`, return the raw Config API catalog. 681 682 Returns: 683 The configured catalog dict, or `None` if not found. 684 """ 685 connection_response = api_util.get_connection_catalog( 686 connection_id=self.connection_id, 687 api_root=self.workspace.api_root, 688 client_id=self.workspace.client_id, 689 client_secret=self.workspace.client_secret, 690 bearer_token=self.workspace.bearer_token, 691 ) 692 raw = connection_response.get("syncCatalog") 693 if raw is None: 694 return None 695 if normalize: 696 return _normalize_catalog_to_protocol(raw) 697 return raw
Dump the configured catalog for this connection.
By default, returns the catalog in Airbyte protocol format
(ConfiguredAirbyteCatalog with snake_case keys), suitable for passing
to a connector's --catalog flag.
When normalize is False, returns the raw syncCatalog dict from the
Config API (camelCase keys, nested config block). This raw format can be
passed directly to import_raw_catalog() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API catalog.
Returns:
The configured catalog dict, or
Noneif not found.
699 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 700 """Replace the configured catalog for this connection. 701 702 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 703 > could result in broken connections, and/or incorrect sync behavior. 704 705 Accepts a configured catalog dict and replaces the connection's entire 706 catalog with it. All other connection settings remain unchanged. 707 708 Accepts either format: 709 710 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 711 passed through directly. 712 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 713 automatically converted to Config API format before sending. 714 715 Args: 716 catalog: The configured catalog dict in either format. 717 """ 718 if _is_protocol_catalog_format(catalog): 719 catalog = _denormalize_catalog_to_api(catalog) 720 721 api_util.replace_connection_catalog( 722 connection_id=self.connection_id, 723 configured_catalog_dict=catalog, 724 api_root=self.workspace.api_root, 725 client_id=self.workspace.client_id, 726 client_secret=self.workspace.client_secret, 727 bearer_token=self.workspace.bearer_token, 728 )
Replace the configured catalog for this connection.
⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.
Accepts either format:
- Config API format (
syncCatalogwith camelCase keys and nestedconfig): passed through directly. - Airbyte protocol format (
ConfiguredAirbyteCatalogwith snake_case keys): automatically converted to Config API format before sending.
Arguments:
- catalog: The configured catalog dict in either format.
730 def rename(self, name: str) -> CloudConnection: 731 """Rename the connection. 732 733 Args: 734 name: New name for the connection 735 736 Returns: 737 Updated CloudConnection object with refreshed info 738 """ 739 updated_response = api_util.patch_connection( 740 connection_id=self.connection_id, 741 api_root=self.workspace.api_root, 742 client_id=self.workspace.client_id, 743 client_secret=self.workspace.client_secret, 744 bearer_token=self.workspace.bearer_token, 745 name=name, 746 ) 747 self._connection_info = updated_response 748 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
750 def set_table_prefix(self, prefix: str) -> CloudConnection: 751 """Set the table prefix for the connection. 752 753 Args: 754 prefix: New table prefix to use when syncing to the destination 755 756 Returns: 757 Updated CloudConnection object with refreshed info 758 """ 759 updated_response = api_util.patch_connection( 760 connection_id=self.connection_id, 761 api_root=self.workspace.api_root, 762 client_id=self.workspace.client_id, 763 client_secret=self.workspace.client_secret, 764 bearer_token=self.workspace.bearer_token, 765 prefix=prefix, 766 ) 767 self._connection_info = updated_response 768 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
770 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 771 """Set the selected streams for the connection. 772 773 This is a destructive operation that can break existing connections if the 774 stream selection is changed incorrectly. Use with caution. 775 776 Args: 777 stream_names: List of stream names to sync 778 779 Returns: 780 Updated CloudConnection object with refreshed info 781 """ 782 configurations = api_util.build_stream_configurations(stream_names) 783 784 updated_response = api_util.patch_connection( 785 connection_id=self.connection_id, 786 api_root=self.workspace.api_root, 787 client_id=self.workspace.client_id, 788 client_secret=self.workspace.client_secret, 789 bearer_token=self.workspace.bearer_token, 790 configurations=configurations, 791 ) 792 self._connection_info = updated_response 793 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
797 @property 798 def enabled(self) -> bool: 799 """Get the current enabled status of the connection. 800 801 This property always fetches fresh data from the API to ensure accuracy, 802 as another process or user may have toggled the setting. 803 804 Returns: 805 True if the connection status is 'active', False otherwise. 806 """ 807 connection_info = self._fetch_connection_info(force_refresh=True) 808 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
820 def set_enabled( 821 self, 822 *, 823 enabled: bool, 824 ignore_noop: bool = True, 825 ) -> None: 826 """Set the enabled status of the connection. 827 828 Args: 829 enabled: True to enable (set status to 'active'), False to disable 830 (set status to 'inactive'). 831 ignore_noop: If True (default), silently return if the connection is already 832 in the requested state. If False, raise ValueError when the requested 833 state matches the current state. 834 835 Raises: 836 ValueError: If ignore_noop is False and the connection is already in the 837 requested state. 838 """ 839 # Always fetch fresh data to check current status 840 connection_info = self._fetch_connection_info(force_refresh=True) 841 current_status = connection_info.status 842 desired_status = ( 843 api_util.models.ConnectionStatusEnum.ACTIVE 844 if enabled 845 else api_util.models.ConnectionStatusEnum.INACTIVE 846 ) 847 848 if current_status == desired_status: 849 if ignore_noop: 850 return 851 raise ValueError( 852 f"Connection is already {'enabled' if enabled else 'disabled'}. " 853 f"Current status: {current_status}" 854 ) 855 856 updated_response = api_util.patch_connection( 857 connection_id=self.connection_id, 858 api_root=self.workspace.api_root, 859 client_id=self.workspace.client_id, 860 client_secret=self.workspace.client_secret, 861 bearer_token=self.workspace.bearer_token, 862 status=desired_status, 863 ) 864 self._connection_info = updated_response
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
868 def set_schedule( 869 self, 870 cron_expression: str, 871 ) -> None: 872 """Set a cron schedule for the connection. 873 874 Args: 875 cron_expression: A cron expression defining when syncs should run. 876 877 Examples: 878 - "0 0 * * *" - Daily at midnight UTC 879 - "0 */6 * * *" - Every 6 hours 880 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 881 """ 882 schedule = api_util.models.AirbyteAPIConnectionSchedule( 883 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 884 cron_expression=cron_expression, 885 ) 886 updated_response = api_util.patch_connection( 887 connection_id=self.connection_id, 888 api_root=self.workspace.api_root, 889 client_id=self.workspace.client_id, 890 client_secret=self.workspace.client_secret, 891 bearer_token=self.workspace.bearer_token, 892 schedule=schedule, 893 ) 894 self._connection_info = updated_response
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
896 def set_manual_schedule(self) -> None: 897 """Set the connection to manual scheduling. 898 899 Disables automatic syncs. Syncs will only run when manually triggered. 900 """ 901 schedule = api_util.models.AirbyteAPIConnectionSchedule( 902 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 903 ) 904 updated_response = api_util.patch_connection( 905 connection_id=self.connection_id, 906 api_root=self.workspace.api_root, 907 client_id=self.workspace.client_id, 908 client_secret=self.workspace.client_secret, 909 bearer_token=self.workspace.bearer_token, 910 schedule=schedule, 911 ) 912 self._connection_info = updated_response
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
916 def permanently_delete( 917 self, 918 *, 919 cascade_delete_source: bool = False, 920 cascade_delete_destination: bool = False, 921 ) -> None: 922 """Delete the connection. 923 924 Args: 925 cascade_delete_source: Whether to also delete the source. 926 cascade_delete_destination: Whether to also delete the destination. 927 """ 928 self.workspace.permanently_delete_connection(self) 929 930 if cascade_delete_source: 931 self.workspace.permanently_delete_source(self.source_id) 932 933 if cascade_delete_destination: 934 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
57@dataclass 58class CloudClientConfig: 59 """Client configuration for Airbyte Cloud API. 60 61 This class encapsulates the authentication and API configuration needed to connect 62 to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually 63 exclusive authentication methods: 64 65 1. OAuth2 client credentials flow (client_id + client_secret) 66 2. Bearer token authentication 67 68 Exactly one authentication method must be provided. Providing both or neither 69 will raise a validation error. 70 71 Attributes: 72 client_id: OAuth2 client ID for client credentials flow. 73 client_secret: OAuth2 client secret for client credentials flow. 74 bearer_token: Pre-generated bearer token for direct authentication. 75 api_root: The API root URL. Defaults to Airbyte Cloud API. 76 """ 77 78 client_id: SecretString | None = None 79 """OAuth2 client ID for client credentials authentication.""" 80 81 client_secret: SecretString | None = None 82 """OAuth2 client secret for client credentials authentication.""" 83 84 bearer_token: SecretString | None = None 85 """Bearer token for direct authentication (alternative to client credentials).""" 86 87 api_root: str = api_util.CLOUD_API_ROOT 88 """The API root URL. Defaults to Airbyte Cloud API.""" 89 90 def __post_init__(self) -> None: 91 """Validate credentials and ensure secrets are properly wrapped.""" 92 # Wrap secrets in SecretString if they aren't already 93 if self.client_id is not None: 94 self.client_id = SecretString(self.client_id) 95 if self.client_secret is not None: 96 self.client_secret = SecretString(self.client_secret) 97 if self.bearer_token is not None: 98 self.bearer_token = SecretString(self.bearer_token) 99 100 # Validate mutual exclusivity 101 has_client_credentials = self.client_id is not None or self.client_secret is not None 102 has_bearer_token = self.bearer_token is not None 103 104 if has_client_credentials and has_bearer_token: 105 raise PyAirbyteInputError( 106 message="Cannot use both client credentials and bearer token authentication.", 107 guidance=( 108 "Provide either client_id and client_secret together, " 109 "or bearer_token alone, but not both." 110 ), 111 ) 112 113 if has_client_credentials and (self.client_id is None or self.client_secret is None): 114 # If using client credentials, both must be provided 115 raise PyAirbyteInputError( 116 message="Incomplete client credentials.", 117 guidance=( 118 "When using client credentials authentication, " 119 "both client_id and client_secret must be provided." 120 ), 121 ) 122 123 if not has_client_credentials and not has_bearer_token: 124 raise PyAirbyteInputError( 125 message="No authentication credentials provided.", 126 guidance=( 127 "Provide either client_id and client_secret together for OAuth2 " 128 "client credentials flow, or bearer_token for direct authentication." 129 ), 130 ) 131 132 @property 133 def uses_bearer_token(self) -> bool: 134 """Return True if using bearer token authentication.""" 135 return self.bearer_token is not None 136 137 @property 138 def uses_client_credentials(self) -> bool: 139 """Return True if using client credentials authentication.""" 140 return self.client_id is not None and self.client_secret is not None 141 142 @classmethod 143 def from_env( 144 cls, 145 *, 146 api_root: str | None = None, 147 ) -> CloudClientConfig: 148 """Create CloudClientConfig from environment variables. 149 150 This factory method resolves credentials from environment variables, 151 providing a convenient way to create credentials without explicitly 152 passing secrets. 153 154 Environment variables used: 155 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 156 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 157 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 158 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 159 160 The method will first check for a bearer token. If not found, it will 161 attempt to use client credentials. 162 163 Args: 164 api_root: The API root URL. If not provided, will be resolved from 165 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 166 the Airbyte Cloud API. 167 168 Returns: 169 A CloudClientConfig instance configured with credentials from the environment. 170 171 Raises: 172 PyAirbyteSecretNotFoundError: If required credentials are not found in 173 the environment. 174 """ 175 resolved_api_root = resolve_cloud_api_url(api_root) 176 177 # Try bearer token first 178 bearer_token = resolve_cloud_bearer_token() 179 if bearer_token: 180 return cls( 181 bearer_token=bearer_token, 182 api_root=resolved_api_root, 183 ) 184 185 # Fall back to client credentials 186 return cls( 187 client_id=resolve_cloud_client_id(), 188 client_secret=resolve_cloud_client_secret(), 189 api_root=resolved_api_root, 190 )
Client configuration for Airbyte Cloud API.
This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:
- OAuth2 client credentials flow (client_id + client_secret)
- Bearer token authentication
Exactly one authentication method must be provided. Providing both or neither will raise a validation error.
Attributes:
- client_id: OAuth2 client ID for client credentials flow.
- client_secret: OAuth2 client secret for client credentials flow.
- bearer_token: Pre-generated bearer token for direct authentication.
- api_root: The API root URL. Defaults to Airbyte Cloud API.
132 @property 133 def uses_bearer_token(self) -> bool: 134 """Return True if using bearer token authentication.""" 135 return self.bearer_token is not None
Return True if using bearer token authentication.
137 @property 138 def uses_client_credentials(self) -> bool: 139 """Return True if using client credentials authentication.""" 140 return self.client_id is not None and self.client_secret is not None
Return True if using client credentials authentication.
142 @classmethod 143 def from_env( 144 cls, 145 *, 146 api_root: str | None = None, 147 ) -> CloudClientConfig: 148 """Create CloudClientConfig from environment variables. 149 150 This factory method resolves credentials from environment variables, 151 providing a convenient way to create credentials without explicitly 152 passing secrets. 153 154 Environment variables used: 155 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 156 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 157 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 158 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 159 160 The method will first check for a bearer token. If not found, it will 161 attempt to use client credentials. 162 163 Args: 164 api_root: The API root URL. If not provided, will be resolved from 165 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 166 the Airbyte Cloud API. 167 168 Returns: 169 A CloudClientConfig instance configured with credentials from the environment. 170 171 Raises: 172 PyAirbyteSecretNotFoundError: If required credentials are not found in 173 the environment. 174 """ 175 resolved_api_root = resolve_cloud_api_url(api_root) 176 177 # Try bearer token first 178 bearer_token = resolve_cloud_bearer_token() 179 if bearer_token: 180 return cls( 181 bearer_token=bearer_token, 182 api_root=resolved_api_root, 183 ) 184 185 # Fall back to client credentials 186 return cls( 187 client_id=resolve_cloud_client_id(), 188 client_secret=resolve_cloud_client_secret(), 189 api_root=resolved_api_root, 190 )
Create CloudClientConfig from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.
Environment variables used:
AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
The method will first check for a bearer token. If not found, it will attempt to use client credentials.
Arguments:
- api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API.
Returns:
A CloudClientConfig instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: JobResponse | None = None 232 _connection_response: ConnectionResponse | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 bearer_token=self.workspace.bearer_token, 260 ) 261 return self._connection_response 262 263 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 264 """Return the destination configuration for the sync job.""" 265 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 266 destination_response = api_util.get_destination( 267 destination_id=connection_info.destination_id, 268 api_root=self.workspace.api_root, 269 client_id=self.workspace.client_id, 270 client_secret=self.workspace.client_secret, 271 bearer_token=self.workspace.bearer_token, 272 ) 273 return asdict(destination_response.configuration) 274 275 def is_job_complete(self) -> bool: 276 """Check if the sync job is complete.""" 277 return self.get_job_status() in FINAL_STATUSES 278 279 def get_job_status(self) -> JobStatusEnum: 280 """Check if the sync job is still running.""" 281 return self._fetch_latest_job_info().status 282 283 def _fetch_latest_job_info(self) -> JobResponse: 284 """Return the job info for the sync job.""" 285 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 286 return self._latest_job_info 287 288 self._latest_job_info = api_util.get_job_info( 289 job_id=self.job_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 bearer_token=self.workspace.bearer_token, 294 ) 295 return self._latest_job_info 296 297 @property 298 def bytes_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().bytes_synced or 0 301 302 @property 303 def records_synced(self) -> int: 304 """Return the number of records processed.""" 305 return self._fetch_latest_job_info().rows_synced or 0 306 307 @property 308 def start_time(self) -> datetime: 309 """Return the start time of the sync job in UTC.""" 310 try: 311 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 312 except (ValueError, TypeError) as e: 313 if "Invalid isoformat string" in str(e): 314 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 315 api_root=self.workspace.api_root, 316 path="/jobs/get", 317 json={"id": self.job_id}, 318 client_id=self.workspace.client_id, 319 client_secret=self.workspace.client_secret, 320 bearer_token=self.workspace.bearer_token, 321 ) 322 raw_start_time = job_info_raw.get("startTime") 323 if raw_start_time: 324 return ab_datetime_parse(raw_start_time) 325 raise 326 327 def _fetch_job_with_attempts(self) -> dict[str, Any]: 328 """Fetch job info with attempts from Config API using lazy loading pattern.""" 329 if self._job_with_attempts_info is not None: 330 return self._job_with_attempts_info 331 332 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 333 api_root=self.workspace.api_root, 334 path="/jobs/get", 335 json={ 336 "id": self.job_id, 337 }, 338 client_id=self.workspace.client_id, 339 client_secret=self.workspace.client_secret, 340 bearer_token=self.workspace.bearer_token, 341 ) 342 return self._job_with_attempts_info 343 344 def get_attempts(self) -> list[SyncAttempt]: 345 """Return a list of attempts for this sync job.""" 346 job_with_attempts = self._fetch_job_with_attempts() 347 attempts_data = job_with_attempts.get("attempts", []) 348 349 return [ 350 SyncAttempt( 351 workspace=self.workspace, 352 connection=self.connection, 353 job_id=self.job_id, 354 attempt_number=i, 355 _attempt_data=attempt_data, 356 ) 357 for i, attempt_data in enumerate(attempts_data, start=0) 358 ] 359 360 def raise_failure_status( 361 self, 362 *, 363 refresh_status: bool = False, 364 ) -> None: 365 """Raise an exception if the sync job failed. 366 367 By default, this method will use the latest status available. If you want to refresh the 368 status before checking for failure, set `refresh_status=True`. If the job has failed, this 369 method will raise a `AirbyteConnectionSyncError`. 370 371 Otherwise, do nothing. 372 """ 373 if not refresh_status and self._latest_job_info: 374 latest_status = self._latest_job_info.status 375 else: 376 latest_status = self.get_job_status() 377 378 if latest_status in FAILED_STATUSES: 379 raise AirbyteConnectionSyncError( 380 workspace=self.workspace, 381 connection_id=self.connection.connection_id, 382 job_id=self.job_id, 383 job_status=self.get_job_status(), 384 ) 385 386 def wait_for_completion( 387 self, 388 *, 389 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 390 raise_timeout: bool = True, 391 raise_failure: bool = False, 392 ) -> JobStatusEnum: 393 """Wait for a job to finish running.""" 394 start_time = time.time() 395 while True: 396 latest_status = self.get_job_status() 397 if latest_status in FINAL_STATUSES: 398 if raise_failure: 399 # No-op if the job succeeded or is still running: 400 self.raise_failure_status() 401 402 return latest_status 403 404 if time.time() - start_time > wait_timeout: 405 if raise_timeout: 406 raise AirbyteConnectionSyncTimeoutError( 407 workspace=self.workspace, 408 connection_id=self.connection.connection_id, 409 job_id=self.job_id, 410 job_status=latest_status, 411 timeout=wait_timeout, 412 ) 413 414 return latest_status # This will be a non-final status 415 416 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 417 418 def get_sql_cache(self) -> CacheBase: 419 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 420 if self._cache: 421 return self._cache 422 423 destination_configuration = self._get_destination_configuration() 424 self._cache = destination_to_cache(destination_configuration=destination_configuration) 425 return self._cache 426 427 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 428 """Return a SQL Engine for querying a SQL-based destination.""" 429 return self.get_sql_cache().get_sql_engine() 430 431 def get_sql_table_name(self, stream_name: str) -> str: 432 """Return the SQL table name of the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 434 435 def get_sql_table( 436 self, 437 stream_name: str, 438 ) -> sqlalchemy.Table: 439 """Return a SQLAlchemy table object for the named stream.""" 440 return self.get_sql_cache().processor.get_sql_table(stream_name) 441 442 def get_dataset(self, stream_name: str) -> CachedDataset: 443 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 444 445 This can be used to read and analyze the data in a SQL-based destination. 446 447 TODO: In a future iteration, we can consider providing stream configuration information 448 (catalog information) to the `CachedDataset` object via the "Get stream properties" 449 API: https://reference.airbyte.com/reference/getstreamproperties 450 """ 451 return CachedDataset( 452 self.get_sql_cache(), 453 stream_name=stream_name, 454 stream_configuration=False, # Don't look for stream configuration in cache. 455 ) 456 457 def get_sql_database_name(self) -> str: 458 """Return the SQL database name.""" 459 cache = self.get_sql_cache() 460 return cache.get_database_name() 461 462 def get_sql_schema_name(self) -> str: 463 """Return the SQL schema name.""" 464 cache = self.get_sql_cache() 465 return cache.schema_name 466 467 @property 468 def stream_names(self) -> list[str]: 469 """Return the set of stream names.""" 470 return self.connection.stream_names 471 472 @final 473 @property 474 def streams( 475 self, 476 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 477 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 478 479 This is a convenience wrapper around the `stream_names` 480 property and `get_dataset()` method. 481 """ 482 return self._SyncResultStreams(self) 483 484 class _SyncResultStreams(Mapping[str, CachedDataset]): 485 """A mapping of stream names to cached datasets.""" 486 487 def __init__( 488 self, 489 parent: SyncResult, 490 /, 491 ) -> None: 492 self.parent: SyncResult = parent 493 494 def __getitem__(self, key: str) -> CachedDataset: 495 return self.parent.get_dataset(stream_name=key) 496 497 def __iter__(self) -> Iterator[str]: 498 return iter(self.parent.stream_names) 499 500 def __len__(self) -> int: 501 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
275 def is_job_complete(self) -> bool: 276 """Check if the sync job is complete.""" 277 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
279 def get_job_status(self) -> JobStatusEnum: 280 """Check if the sync job is still running.""" 281 return self._fetch_latest_job_info().status
Check if the sync job is still running.
297 @property 298 def bytes_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
302 @property 303 def records_synced(self) -> int: 304 """Return the number of records processed.""" 305 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
307 @property 308 def start_time(self) -> datetime: 309 """Return the start time of the sync job in UTC.""" 310 try: 311 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 312 except (ValueError, TypeError) as e: 313 if "Invalid isoformat string" in str(e): 314 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 315 api_root=self.workspace.api_root, 316 path="/jobs/get", 317 json={"id": self.job_id}, 318 client_id=self.workspace.client_id, 319 client_secret=self.workspace.client_secret, 320 bearer_token=self.workspace.bearer_token, 321 ) 322 raw_start_time = job_info_raw.get("startTime") 323 if raw_start_time: 324 return ab_datetime_parse(raw_start_time) 325 raise
Return the start time of the sync job in UTC.
344 def get_attempts(self) -> list[SyncAttempt]: 345 """Return a list of attempts for this sync job.""" 346 job_with_attempts = self._fetch_job_with_attempts() 347 attempts_data = job_with_attempts.get("attempts", []) 348 349 return [ 350 SyncAttempt( 351 workspace=self.workspace, 352 connection=self.connection, 353 job_id=self.job_id, 354 attempt_number=i, 355 _attempt_data=attempt_data, 356 ) 357 for i, attempt_data in enumerate(attempts_data, start=0) 358 ]
Return a list of attempts for this sync job.
360 def raise_failure_status( 361 self, 362 *, 363 refresh_status: bool = False, 364 ) -> None: 365 """Raise an exception if the sync job failed. 366 367 By default, this method will use the latest status available. If you want to refresh the 368 status before checking for failure, set `refresh_status=True`. If the job has failed, this 369 method will raise a `AirbyteConnectionSyncError`. 370 371 Otherwise, do nothing. 372 """ 373 if not refresh_status and self._latest_job_info: 374 latest_status = self._latest_job_info.status 375 else: 376 latest_status = self.get_job_status() 377 378 if latest_status in FAILED_STATUSES: 379 raise AirbyteConnectionSyncError( 380 workspace=self.workspace, 381 connection_id=self.connection.connection_id, 382 job_id=self.job_id, 383 job_status=self.get_job_status(), 384 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
386 def wait_for_completion( 387 self, 388 *, 389 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 390 raise_timeout: bool = True, 391 raise_failure: bool = False, 392 ) -> JobStatusEnum: 393 """Wait for a job to finish running.""" 394 start_time = time.time() 395 while True: 396 latest_status = self.get_job_status() 397 if latest_status in FINAL_STATUSES: 398 if raise_failure: 399 # No-op if the job succeeded or is still running: 400 self.raise_failure_status() 401 402 return latest_status 403 404 if time.time() - start_time > wait_timeout: 405 if raise_timeout: 406 raise AirbyteConnectionSyncTimeoutError( 407 workspace=self.workspace, 408 connection_id=self.connection.connection_id, 409 job_id=self.job_id, 410 job_status=latest_status, 411 timeout=wait_timeout, 412 ) 413 414 return latest_status # This will be a non-final status 415 416 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
418 def get_sql_cache(self) -> CacheBase: 419 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 420 if self._cache: 421 return self._cache 422 423 destination_configuration = self._get_destination_configuration() 424 self._cache = destination_to_cache(destination_configuration=destination_configuration) 425 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
427 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 428 """Return a SQL Engine for querying a SQL-based destination.""" 429 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
431 def get_sql_table_name(self, stream_name: str) -> str: 432 """Return the SQL table name of the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
435 def get_sql_table( 436 self, 437 stream_name: str, 438 ) -> sqlalchemy.Table: 439 """Return a SQLAlchemy table object for the named stream.""" 440 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
442 def get_dataset(self, stream_name: str) -> CachedDataset: 443 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 444 445 This can be used to read and analyze the data in a SQL-based destination. 446 447 TODO: In a future iteration, we can consider providing stream configuration information 448 (catalog information) to the `CachedDataset` object via the "Get stream properties" 449 API: https://reference.airbyte.com/reference/getstreamproperties 450 """ 451 return CachedDataset( 452 self.get_sql_cache(), 453 stream_name=stream_name, 454 stream_configuration=False, # Don't look for stream configuration in cache. 455 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
457 def get_sql_database_name(self) -> str: 458 """Return the SQL database name.""" 459 cache = self.get_sql_cache() 460 return cache.get_database_name()
Return the SQL database name.
462 def get_sql_schema_name(self) -> str: 463 """Return the SQL schema name.""" 464 cache = self.get_sql_cache() 465 return cache.schema_name
Return the SQL schema name.
467 @property 468 def stream_names(self) -> list[str]: 469 """Return the set of stream names.""" 470 return self.connection.stream_names
Return the set of stream names.
472 @final 473 @property 474 def streams( 475 self, 476 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 477 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 478 479 This is a convenience wrapper around the `stream_names` 480 property and `get_dataset()` method. 481 """ 482 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.