airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.client_config import CloudClientConfig
57from airbyte.cloud.connections import CloudConnection
58from airbyte.cloud.constants import JobStatusEnum
59from airbyte.cloud.sync_results import SyncResult
60from airbyte.cloud.workspaces import CloudWorkspace
61
62
63# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
64if TYPE_CHECKING:
65    # ruff: noqa: TC004
66    from airbyte.cloud import client_config, connections, constants, sync_results, workspaces
67
68
69__all__ = [
70    # Submodules
71    "workspaces",
72    "connections",
73    "constants",
74    "client_config",
75    "sync_results",
76    # Classes
77    "CloudWorkspace",
78    "CloudConnection",
79    "CloudClientConfig",
80    "SyncResult",
81    # Enums
82    "JobStatusEnum",
83]
@dataclass
class CloudWorkspace:
 210@dataclass
 211class CloudWorkspace:
 212    """A remote workspace on the Airbyte Cloud.
 213
 214    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 215    instances, both OSS and Enterprise.
 216
 217    Two authentication methods are supported (mutually exclusive):
 218    1. OAuth2 client credentials (client_id + client_secret)
 219    2. Bearer token authentication
 220
 221    Example with client credentials:
 222        ```python
 223        workspace = CloudWorkspace(
 224            workspace_id="...",
 225            client_id="...",
 226            client_secret="...",
 227        )
 228        ```
 229
 230    Example with bearer token:
 231        ```python
 232        workspace = CloudWorkspace(
 233            workspace_id="...",
 234            bearer_token="...",
 235        )
 236        ```
 237    """
 238
 239    workspace_id: str
 240    client_id: SecretString | None = None
 241    client_secret: SecretString | None = None
 242    api_root: str = api_util.CLOUD_API_ROOT
 243    bearer_token: SecretString | None = None
 244
 245    # Internal credentials object (set in __post_init__, excluded from __init__)
 246    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
 247
 248    def __post_init__(self) -> None:
 249        """Validate and initialize credentials."""
 250        # Wrap secrets in SecretString if provided
 251        if self.client_id is not None:
 252            self.client_id = SecretString(self.client_id)
 253        if self.client_secret is not None:
 254            self.client_secret = SecretString(self.client_secret)
 255        if self.bearer_token is not None:
 256            self.bearer_token = SecretString(self.bearer_token)
 257
 258        # Create internal CloudClientConfig object (validates mutual exclusivity)
 259        self._credentials = CloudClientConfig(
 260            client_id=self.client_id,
 261            client_secret=self.client_secret,
 262            bearer_token=self.bearer_token,
 263            api_root=self.api_root,
 264        )
 265
 266    @classmethod
 267    def from_env(
 268        cls,
 269        workspace_id: str | None = None,
 270        *,
 271        api_root: str | None = None,
 272    ) -> CloudWorkspace:
 273        """Create a CloudWorkspace using credentials from environment variables.
 274
 275        This factory method resolves credentials from environment variables,
 276        providing a convenient way to create a workspace without explicitly
 277        passing credentials.
 278
 279        Two authentication methods are supported (mutually exclusive):
 280        1. Bearer token (checked first)
 281        2. OAuth2 client credentials (fallback)
 282
 283        Environment variables used:
 284            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
 285            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
 286            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
 287            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
 288            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
 289
 290        Args:
 291            workspace_id: The workspace ID. If not provided, will be resolved from
 292                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
 293            api_root: The API root URL. If not provided, will be resolved from
 294                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
 295                the Airbyte Cloud API.
 296
 297        Returns:
 298            A CloudWorkspace instance configured with credentials from the environment.
 299
 300        Raises:
 301            PyAirbyteSecretNotFoundError: If required credentials are not found in
 302                the environment.
 303
 304        Example:
 305            ```python
 306            # With workspace_id from environment
 307            workspace = CloudWorkspace.from_env()
 308
 309            # With explicit workspace_id
 310            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
 311            ```
 312        """
 313        resolved_api_root = resolve_cloud_api_url(api_root)
 314
 315        # Try bearer token first
 316        bearer_token = resolve_cloud_bearer_token()
 317        if bearer_token:
 318            return cls(
 319                workspace_id=resolve_cloud_workspace_id(workspace_id),
 320                bearer_token=bearer_token,
 321                api_root=resolved_api_root,
 322            )
 323
 324        # Fall back to client credentials
 325        return cls(
 326            workspace_id=resolve_cloud_workspace_id(workspace_id),
 327            client_id=resolve_cloud_client_id(),
 328            client_secret=resolve_cloud_client_secret(),
 329            api_root=resolved_api_root,
 330        )
 331
 332    @property
 333    def workspace_url(self) -> str | None:
 334        """The web URL of the workspace."""
 335        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 336
 337    @cached_property
 338    def _organization_info(self) -> dict[str, Any]:
 339        """Fetch and cache organization info for this workspace.
 340
 341        Uses the Config API endpoint for an efficient O(1) lookup.
 342        This is an internal method; use get_organization() for public access.
 343        """
 344        return api_util.get_workspace_organization_info(
 345            workspace_id=self.workspace_id,
 346            api_root=self.api_root,
 347            client_id=self.client_id,
 348            client_secret=self.client_secret,
 349            bearer_token=self.bearer_token,
 350        )
 351
 352    @overload
 353    def get_organization(self) -> CloudOrganization: ...
 354
 355    @overload
 356    def get_organization(
 357        self,
 358        *,
 359        raise_on_error: Literal[True],
 360    ) -> CloudOrganization: ...
 361
 362    @overload
 363    def get_organization(
 364        self,
 365        *,
 366        raise_on_error: Literal[False],
 367    ) -> CloudOrganization | None: ...
 368
 369    def get_organization(
 370        self,
 371        *,
 372        raise_on_error: bool = True,
 373    ) -> CloudOrganization | None:
 374        """Get the organization this workspace belongs to.
 375
 376        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
 377        which may not be available with workspace-scoped credentials.
 378
 379        Args:
 380            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
 381                If False, returns None instead of raising.
 382
 383        Returns:
 384            CloudOrganization object with organization_id and organization_name,
 385            or None if raise_on_error=False and an error occurred.
 386
 387        Raises:
 388            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
 389                (e.g., due to insufficient permissions or missing data).
 390        """
 391        try:
 392            info = self._organization_info
 393        except (AirbyteError, NotImplementedError):
 394            if raise_on_error:
 395                raise
 396            return None
 397
 398        organization_id = info.get("organizationId")
 399        organization_name = info.get("organizationName")
 400
 401        # Validate that both organization_id and organization_name are non-null and non-empty
 402        if not organization_id or not organization_name:
 403            if raise_on_error:
 404                raise AirbyteError(
 405                    message="Organization info is incomplete.",
 406                    context={
 407                        "organization_id": organization_id,
 408                        "organization_name": organization_name,
 409                    },
 410                )
 411            return None
 412
 413        return CloudOrganization(
 414            organization_id=organization_id,
 415            organization_name=organization_name,
 416            api_root=self.api_root,
 417            client_id=self.client_id,
 418            client_secret=self.client_secret,
 419            bearer_token=self.bearer_token,
 420        )
 421
 422    # Test connection and creds
 423
 424    def connect(self) -> None:
 425        """Check that the workspace is reachable and raise an exception otherwise.
 426
 427        Note: It is not necessary to call this method before calling other operations. It
 428              serves primarily as a simple check to ensure that the workspace is reachable
 429              and credentials are correct.
 430        """
 431        _ = api_util.get_workspace(
 432            api_root=self.api_root,
 433            workspace_id=self.workspace_id,
 434            client_id=self.client_id,
 435            client_secret=self.client_secret,
 436            bearer_token=self.bearer_token,
 437        )
 438        print(f"Successfully connected to workspace: {self.workspace_url}")
 439
 440    # Get sources, destinations, and connections
 441
 442    def get_connection(
 443        self,
 444        connection_id: str,
 445    ) -> CloudConnection:
 446        """Get a connection by ID.
 447
 448        This method does not fetch data from the API. It returns a `CloudConnection` object,
 449        which will be loaded lazily as needed.
 450        """
 451        return CloudConnection(
 452            workspace=self,
 453            connection_id=connection_id,
 454        )
 455
 456    def get_source(
 457        self,
 458        source_id: str,
 459    ) -> CloudSource:
 460        """Get a source by ID.
 461
 462        This method does not fetch data from the API. It returns a `CloudSource` object,
 463        which will be loaded lazily as needed.
 464        """
 465        return CloudSource(
 466            workspace=self,
 467            connector_id=source_id,
 468        )
 469
 470    def get_destination(
 471        self,
 472        destination_id: str,
 473    ) -> CloudDestination:
 474        """Get a destination by ID.
 475
 476        This method does not fetch data from the API. It returns a `CloudDestination` object,
 477        which will be loaded lazily as needed.
 478        """
 479        return CloudDestination(
 480            workspace=self,
 481            connector_id=destination_id,
 482        )
 483
 484    # Deploy sources and destinations
 485
 486    def deploy_source(
 487        self,
 488        name: str,
 489        source: Source,
 490        *,
 491        unique: bool = True,
 492        random_name_suffix: bool = False,
 493    ) -> CloudSource:
 494        """Deploy a source to the workspace.
 495
 496        Returns the newly deployed source.
 497
 498        Args:
 499            name: The name to use when deploying.
 500            source: The source object to deploy.
 501            unique: Whether to require a unique name. If `True`, duplicate names
 502                are not allowed. Defaults to `True`.
 503            random_name_suffix: Whether to append a random suffix to the name.
 504        """
 505        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 506        source_config_dict["sourceType"] = source.name.replace("source-", "")
 507
 508        if random_name_suffix:
 509            name += f" (ID: {text_util.generate_random_suffix()})"
 510
 511        if unique:
 512            existing = self.list_sources(name=name)
 513            if existing:
 514                raise exc.AirbyteDuplicateResourcesError(
 515                    resource_type="source",
 516                    resource_name=name,
 517                )
 518
 519        deployed_source = api_util.create_source(
 520            name=name,
 521            api_root=self.api_root,
 522            workspace_id=self.workspace_id,
 523            config=source_config_dict,
 524            client_id=self.client_id,
 525            client_secret=self.client_secret,
 526            bearer_token=self.bearer_token,
 527        )
 528        return CloudSource(
 529            workspace=self,
 530            connector_id=deployed_source.source_id,
 531        )
 532
 533    def deploy_destination(
 534        self,
 535        name: str,
 536        destination: Destination | dict[str, Any],
 537        *,
 538        unique: bool = True,
 539        random_name_suffix: bool = False,
 540    ) -> CloudDestination:
 541        """Deploy a destination to the workspace.
 542
 543        Returns the newly deployed destination ID.
 544
 545        Args:
 546            name: The name to use when deploying.
 547            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
 548                dictionary of configuration values.
 549            unique: Whether to require a unique name. If `True`, duplicate names
 550                are not allowed. Defaults to `True`.
 551            random_name_suffix: Whether to append a random suffix to the name.
 552        """
 553        if isinstance(destination, Destination):
 554            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 555            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
 556            # raise ValueError(destination_conf_dict)
 557        else:
 558            destination_conf_dict = destination.copy()
 559            if "destinationType" not in destination_conf_dict:
 560                raise exc.PyAirbyteInputError(
 561                    message="Missing `destinationType` in configuration dictionary.",
 562                )
 563
 564        if random_name_suffix:
 565            name += f" (ID: {text_util.generate_random_suffix()})"
 566
 567        if unique:
 568            existing = self.list_destinations(name=name)
 569            if existing:
 570                raise exc.AirbyteDuplicateResourcesError(
 571                    resource_type="destination",
 572                    resource_name=name,
 573                )
 574
 575        deployed_destination = api_util.create_destination(
 576            name=name,
 577            api_root=self.api_root,
 578            workspace_id=self.workspace_id,
 579            config=destination_conf_dict,  # Wants a dataclass but accepts dict
 580            client_id=self.client_id,
 581            client_secret=self.client_secret,
 582            bearer_token=self.bearer_token,
 583        )
 584        return CloudDestination(
 585            workspace=self,
 586            connector_id=deployed_destination.destination_id,
 587        )
 588
 589    def permanently_delete_source(
 590        self,
 591        source: str | CloudSource,
 592        *,
 593        safe_mode: bool = True,
 594    ) -> None:
 595        """Delete a source from the workspace.
 596
 597        You can pass either the source ID `str` or a deployed `Source` object.
 598
 599        Args:
 600            source: The source ID or CloudSource object to delete
 601            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
 602                (case insensitive) to prevent accidental deletion. Defaults to True.
 603        """
 604        if not isinstance(source, (str, CloudSource)):
 605            raise exc.PyAirbyteInputError(
 606                message="Invalid source type.",
 607                input_value=type(source).__name__,
 608            )
 609
 610        api_util.delete_source(
 611            source_id=source.connector_id if isinstance(source, CloudSource) else source,
 612            source_name=source.name if isinstance(source, CloudSource) else None,
 613            api_root=self.api_root,
 614            client_id=self.client_id,
 615            client_secret=self.client_secret,
 616            bearer_token=self.bearer_token,
 617            safe_mode=safe_mode,
 618        )
 619
 620    # Deploy and delete destinations
 621
 622    def permanently_delete_destination(
 623        self,
 624        destination: str | CloudDestination,
 625        *,
 626        safe_mode: bool = True,
 627    ) -> None:
 628        """Delete a deployed destination from the workspace.
 629
 630        You can pass either the `Cache` class or the deployed destination ID as a `str`.
 631
 632        Args:
 633            destination: The destination ID or CloudDestination object to delete
 634            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
 635                (case insensitive) to prevent accidental deletion. Defaults to True.
 636        """
 637        if not isinstance(destination, (str, CloudDestination)):
 638            raise exc.PyAirbyteInputError(
 639                message="Invalid destination type.",
 640                input_value=type(destination).__name__,
 641            )
 642
 643        api_util.delete_destination(
 644            destination_id=(
 645                destination if isinstance(destination, str) else destination.destination_id
 646            ),
 647            destination_name=(
 648                destination.name if isinstance(destination, CloudDestination) else None
 649            ),
 650            api_root=self.api_root,
 651            client_id=self.client_id,
 652            client_secret=self.client_secret,
 653            bearer_token=self.bearer_token,
 654            safe_mode=safe_mode,
 655        )
 656
 657    # Deploy and delete connections
 658
 659    def deploy_connection(
 660        self,
 661        connection_name: str,
 662        *,
 663        source: CloudSource | str,
 664        selected_streams: list[str],
 665        destination: CloudDestination | str,
 666        table_prefix: str | None = None,
 667    ) -> CloudConnection:
 668        """Create a new connection between an already deployed source and destination.
 669
 670        Returns the newly deployed connection object.
 671
 672        Args:
 673            connection_name: The name of the connection.
 674            source: The deployed source. You can pass a source ID or a CloudSource object.
 675            destination: The deployed destination. You can pass a destination ID or a
 676                CloudDestination object.
 677            table_prefix: Optional. The table prefix to use when syncing to the destination.
 678            selected_streams: The selected stream names to sync within the connection.
 679        """
 680        if not selected_streams:
 681            raise exc.PyAirbyteInputError(
 682                guidance="You must provide `selected_streams` when creating a connection."
 683            )
 684
 685        source_id: str = source if isinstance(source, str) else source.connector_id
 686        destination_id: str = (
 687            destination if isinstance(destination, str) else destination.connector_id
 688        )
 689
 690        deployed_connection = api_util.create_connection(
 691            name=connection_name,
 692            source_id=source_id,
 693            destination_id=destination_id,
 694            api_root=self.api_root,
 695            workspace_id=self.workspace_id,
 696            selected_stream_names=selected_streams,
 697            prefix=table_prefix or "",
 698            client_id=self.client_id,
 699            client_secret=self.client_secret,
 700            bearer_token=self.bearer_token,
 701        )
 702
 703        return CloudConnection(
 704            workspace=self,
 705            connection_id=deployed_connection.connection_id,
 706            source=deployed_connection.source_id,
 707            destination=deployed_connection.destination_id,
 708        )
 709
 710    def permanently_delete_connection(
 711        self,
 712        connection: str | CloudConnection,
 713        *,
 714        cascade_delete_source: bool = False,
 715        cascade_delete_destination: bool = False,
 716        safe_mode: bool = True,
 717    ) -> None:
 718        """Delete a deployed connection from the workspace.
 719
 720        Args:
 721            connection: The connection ID or CloudConnection object to delete
 722            cascade_delete_source: If True, also delete the source after deleting the connection
 723            cascade_delete_destination: If True, also delete the destination after deleting
 724                the connection
 725            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
 726                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
 727                to cascade deletes.
 728        """
 729        if connection is None:
 730            raise ValueError("No connection ID provided.")
 731
 732        if isinstance(connection, str):
 733            connection = CloudConnection(
 734                workspace=self,
 735                connection_id=connection,
 736            )
 737
 738        api_util.delete_connection(
 739            connection_id=connection.connection_id,
 740            connection_name=connection.name,
 741            api_root=self.api_root,
 742            workspace_id=self.workspace_id,
 743            client_id=self.client_id,
 744            client_secret=self.client_secret,
 745            bearer_token=self.bearer_token,
 746            safe_mode=safe_mode,
 747        )
 748
 749        if cascade_delete_source:
 750            self.permanently_delete_source(
 751                source=connection.source_id,
 752                safe_mode=safe_mode,
 753            )
 754        if cascade_delete_destination:
 755            self.permanently_delete_destination(
 756                destination=connection.destination_id,
 757                safe_mode=safe_mode,
 758            )
 759
 760    # List sources, destinations, and connections
 761
 762    def list_connections(
 763        self,
 764        name: str | None = None,
 765        *,
 766        name_filter: Callable | None = None,
 767    ) -> list[CloudConnection]:
 768        """List connections by name in the workspace.
 769
 770        TODO: Add pagination support
 771        """
 772        connections = api_util.list_connections(
 773            api_root=self.api_root,
 774            workspace_id=self.workspace_id,
 775            name=name,
 776            name_filter=name_filter,
 777            client_id=self.client_id,
 778            client_secret=self.client_secret,
 779            bearer_token=self.bearer_token,
 780        )
 781        return [
 782            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
 783                workspace=self,
 784                connection_response=connection,
 785            )
 786            for connection in connections
 787            if name is None or connection.name == name
 788        ]
 789
 790    def list_sources(
 791        self,
 792        name: str | None = None,
 793        *,
 794        name_filter: Callable | None = None,
 795    ) -> list[CloudSource]:
 796        """List all sources in the workspace.
 797
 798        TODO: Add pagination support
 799        """
 800        sources = api_util.list_sources(
 801            api_root=self.api_root,
 802            workspace_id=self.workspace_id,
 803            name=name,
 804            name_filter=name_filter,
 805            client_id=self.client_id,
 806            client_secret=self.client_secret,
 807            bearer_token=self.bearer_token,
 808        )
 809        return [
 810            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
 811                workspace=self,
 812                source_response=source,
 813            )
 814            for source in sources
 815            if name is None or source.name == name
 816        ]
 817
 818    def list_destinations(
 819        self,
 820        name: str | None = None,
 821        *,
 822        name_filter: Callable | None = None,
 823    ) -> list[CloudDestination]:
 824        """List all destinations in the workspace.
 825
 826        TODO: Add pagination support
 827        """
 828        destinations = api_util.list_destinations(
 829            api_root=self.api_root,
 830            workspace_id=self.workspace_id,
 831            name=name,
 832            name_filter=name_filter,
 833            client_id=self.client_id,
 834            client_secret=self.client_secret,
 835            bearer_token=self.bearer_token,
 836        )
 837        return [
 838            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
 839                workspace=self,
 840                destination_response=destination,
 841            )
 842            for destination in destinations
 843            if name is None or destination.name == name
 844        ]
 845
 846    def publish_custom_source_definition(
 847        self,
 848        name: str,
 849        *,
 850        manifest_yaml: dict[str, Any] | Path | str | None = None,
 851        docker_image: str | None = None,
 852        docker_tag: str | None = None,
 853        unique: bool = True,
 854        pre_validate: bool = True,
 855        testing_values: dict[str, Any] | None = None,
 856    ) -> CustomCloudSourceDefinition:
 857        """Publish a custom source connector definition.
 858
 859        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
 860        and docker_tag (for Docker connectors), but not both.
 861
 862        Args:
 863            name: Display name for the connector definition
 864            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
 865            docker_image: Docker repository (e.g., 'airbyte/source-custom')
 866            docker_tag: Docker image tag (e.g., '1.0.0')
 867            unique: Whether to enforce name uniqueness
 868            pre_validate: Whether to validate manifest client-side (YAML only)
 869            testing_values: Optional configuration values to use for testing in the
 870                Connector Builder UI. If provided, these values are stored as the complete
 871                testing values object for the connector builder project (replaces any existing
 872                values), allowing immediate test read operations.
 873
 874        Returns:
 875            CustomCloudSourceDefinition object representing the created definition
 876
 877        Raises:
 878            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
 879            AirbyteDuplicateResourcesError: If unique=True and name already exists
 880        """
 881        is_yaml = manifest_yaml is not None
 882        is_docker = docker_image is not None
 883
 884        if is_yaml == is_docker:
 885            raise exc.PyAirbyteInputError(
 886                message=(
 887                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
 888                    "docker_image + docker_tag (for Docker connectors), but not both"
 889                ),
 890                context={
 891                    "manifest_yaml_provided": is_yaml,
 892                    "docker_image_provided": is_docker,
 893                },
 894            )
 895
 896        if is_docker and docker_tag is None:
 897            raise exc.PyAirbyteInputError(
 898                message="docker_tag is required when docker_image is specified",
 899                context={"docker_image": docker_image},
 900            )
 901
 902        if unique:
 903            existing = self.list_custom_source_definitions(
 904                definition_type="yaml" if is_yaml else "docker",
 905            )
 906            if any(d.name == name for d in existing):
 907                raise exc.AirbyteDuplicateResourcesError(
 908                    resource_type="custom_source_definition",
 909                    resource_name=name,
 910                )
 911
 912        if is_yaml:
 913            manifest_dict: dict[str, Any]
 914            if isinstance(manifest_yaml, Path):
 915                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
 916            elif isinstance(manifest_yaml, str):
 917                manifest_dict = yaml.safe_load(manifest_yaml)
 918            elif manifest_yaml is not None:
 919                manifest_dict = manifest_yaml
 920            else:
 921                raise exc.PyAirbyteInputError(
 922                    message="manifest_yaml is required for YAML connectors",
 923                    context={"name": name},
 924                )
 925
 926            if pre_validate:
 927                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
 928
 929            result = api_util.create_custom_yaml_source_definition(
 930                name=name,
 931                workspace_id=self.workspace_id,
 932                manifest=manifest_dict,
 933                api_root=self.api_root,
 934                client_id=self.client_id,
 935                client_secret=self.client_secret,
 936                bearer_token=self.bearer_token,
 937            )
 938            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
 939                self, result
 940            )
 941
 942            # Set testing values if provided
 943            if testing_values is not None:
 944                custom_definition.set_testing_values(testing_values)
 945
 946            return custom_definition
 947
 948        raise NotImplementedError(
 949            "Docker custom source definitions are not yet supported. "
 950            "Only YAML manifest-based custom sources are currently available."
 951        )
 952
 953    def list_custom_source_definitions(
 954        self,
 955        *,
 956        definition_type: Literal["yaml", "docker"],
 957    ) -> list[CustomCloudSourceDefinition]:
 958        """List custom source connector definitions.
 959
 960        Args:
 961            definition_type: Connector type to list ("yaml" or "docker"). Required.
 962
 963        Returns:
 964            List of CustomCloudSourceDefinition objects matching the specified type
 965        """
 966        if definition_type == "yaml":
 967            yaml_definitions = api_util.list_custom_yaml_source_definitions(
 968                workspace_id=self.workspace_id,
 969                api_root=self.api_root,
 970                client_id=self.client_id,
 971                client_secret=self.client_secret,
 972                bearer_token=self.bearer_token,
 973            )
 974            return [
 975                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
 976                for d in yaml_definitions
 977            ]
 978
 979        raise NotImplementedError(
 980            "Docker custom source definitions are not yet supported. "
 981            "Only YAML manifest-based custom sources are currently available."
 982        )
 983
 984    def get_custom_source_definition(
 985        self,
 986        definition_id: str,
 987        *,
 988        definition_type: Literal["yaml", "docker"],
 989    ) -> CustomCloudSourceDefinition:
 990        """Get a specific custom source definition by ID.
 991
 992        Args:
 993            definition_id: The definition ID
 994            definition_type: Connector type ("yaml" or "docker"). Required.
 995
 996        Returns:
 997            CustomCloudSourceDefinition object
 998        """
 999        if definition_type == "yaml":
1000            result = api_util.get_custom_yaml_source_definition(
1001                workspace_id=self.workspace_id,
1002                definition_id=definition_id,
1003                api_root=self.api_root,
1004                client_id=self.client_id,
1005                client_secret=self.client_secret,
1006                bearer_token=self.bearer_token,
1007            )
1008            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1009
1010        raise NotImplementedError(
1011            "Docker custom source definitions are not yet supported. "
1012            "Only YAML manifest-based custom sources are currently available."
1013        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1', bearer_token: airbyte.secrets.SecretString | None = None)
workspace_id: str
client_id: airbyte.secrets.SecretString | None = None
client_secret: airbyte.secrets.SecretString | None = None
api_root: str = 'https://api.airbyte.com/v1'
bearer_token: airbyte.secrets.SecretString | None = None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None) -> CloudWorkspace:
266    @classmethod
267    def from_env(
268        cls,
269        workspace_id: str | None = None,
270        *,
271        api_root: str | None = None,
272    ) -> CloudWorkspace:
273        """Create a CloudWorkspace using credentials from environment variables.
274
275        This factory method resolves credentials from environment variables,
276        providing a convenient way to create a workspace without explicitly
277        passing credentials.
278
279        Two authentication methods are supported (mutually exclusive):
280        1. Bearer token (checked first)
281        2. OAuth2 client credentials (fallback)
282
283        Environment variables used:
284            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
285            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
286            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
287            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
288            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
289
290        Args:
291            workspace_id: The workspace ID. If not provided, will be resolved from
292                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
293            api_root: The API root URL. If not provided, will be resolved from
294                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
295                the Airbyte Cloud API.
296
297        Returns:
298            A CloudWorkspace instance configured with credentials from the environment.
299
300        Raises:
301            PyAirbyteSecretNotFoundError: If required credentials are not found in
302                the environment.
303
304        Example:
305            ```python
306            # With workspace_id from environment
307            workspace = CloudWorkspace.from_env()
308
309            # With explicit workspace_id
310            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
311            ```
312        """
313        resolved_api_root = resolve_cloud_api_url(api_root)
314
315        # Try bearer token first
316        bearer_token = resolve_cloud_bearer_token()
317        if bearer_token:
318            return cls(
319                workspace_id=resolve_cloud_workspace_id(workspace_id),
320                bearer_token=bearer_token,
321                api_root=resolved_api_root,
322            )
323
324        # Fall back to client credentials
325        return cls(
326            workspace_id=resolve_cloud_workspace_id(workspace_id),
327            client_id=resolve_cloud_client_id(),
328            client_secret=resolve_cloud_client_secret(),
329            api_root=resolved_api_root,
330        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
332    @property
333    def workspace_url(self) -> str | None:
334        """The web URL of the workspace."""
335        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> airbyte.cloud.workspaces.CloudOrganization | None:
369    def get_organization(
370        self,
371        *,
372        raise_on_error: bool = True,
373    ) -> CloudOrganization | None:
374        """Get the organization this workspace belongs to.
375
376        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
377        which may not be available with workspace-scoped credentials.
378
379        Args:
380            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
381                If False, returns None instead of raising.
382
383        Returns:
384            CloudOrganization object with organization_id and organization_name,
385            or None if raise_on_error=False and an error occurred.
386
387        Raises:
388            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
389                (e.g., due to insufficient permissions or missing data).
390        """
391        try:
392            info = self._organization_info
393        except (AirbyteError, NotImplementedError):
394            if raise_on_error:
395                raise
396            return None
397
398        organization_id = info.get("organizationId")
399        organization_name = info.get("organizationName")
400
401        # Validate that both organization_id and organization_name are non-null and non-empty
402        if not organization_id or not organization_name:
403            if raise_on_error:
404                raise AirbyteError(
405                    message="Organization info is incomplete.",
406                    context={
407                        "organization_id": organization_id,
408                        "organization_name": organization_name,
409                    },
410                )
411            return None
412
413        return CloudOrganization(
414            organization_id=organization_id,
415            organization_name=organization_name,
416            api_root=self.api_root,
417            client_id=self.client_id,
418            client_secret=self.client_secret,
419            bearer_token=self.bearer_token,
420        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
424    def connect(self) -> None:
425        """Check that the workspace is reachable and raise an exception otherwise.
426
427        Note: It is not necessary to call this method before calling other operations. It
428              serves primarily as a simple check to ensure that the workspace is reachable
429              and credentials are correct.
430        """
431        _ = api_util.get_workspace(
432            api_root=self.api_root,
433            workspace_id=self.workspace_id,
434            client_id=self.client_id,
435            client_secret=self.client_secret,
436            bearer_token=self.bearer_token,
437        )
438        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
442    def get_connection(
443        self,
444        connection_id: str,
445    ) -> CloudConnection:
446        """Get a connection by ID.
447
448        This method does not fetch data from the API. It returns a `CloudConnection` object,
449        which will be loaded lazily as needed.
450        """
451        return CloudConnection(
452            workspace=self,
453            connection_id=connection_id,
454        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
456    def get_source(
457        self,
458        source_id: str,
459    ) -> CloudSource:
460        """Get a source by ID.
461
462        This method does not fetch data from the API. It returns a `CloudSource` object,
463        which will be loaded lazily as needed.
464        """
465        return CloudSource(
466            workspace=self,
467            connector_id=source_id,
468        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
470    def get_destination(
471        self,
472        destination_id: str,
473    ) -> CloudDestination:
474        """Get a destination by ID.
475
476        This method does not fetch data from the API. It returns a `CloudDestination` object,
477        which will be loaded lazily as needed.
478        """
479        return CloudDestination(
480            workspace=self,
481            connector_id=destination_id,
482        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
486    def deploy_source(
487        self,
488        name: str,
489        source: Source,
490        *,
491        unique: bool = True,
492        random_name_suffix: bool = False,
493    ) -> CloudSource:
494        """Deploy a source to the workspace.
495
496        Returns the newly deployed source.
497
498        Args:
499            name: The name to use when deploying.
500            source: The source object to deploy.
501            unique: Whether to require a unique name. If `True`, duplicate names
502                are not allowed. Defaults to `True`.
503            random_name_suffix: Whether to append a random suffix to the name.
504        """
505        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
506        source_config_dict["sourceType"] = source.name.replace("source-", "")
507
508        if random_name_suffix:
509            name += f" (ID: {text_util.generate_random_suffix()})"
510
511        if unique:
512            existing = self.list_sources(name=name)
513            if existing:
514                raise exc.AirbyteDuplicateResourcesError(
515                    resource_type="source",
516                    resource_name=name,
517                )
518
519        deployed_source = api_util.create_source(
520            name=name,
521            api_root=self.api_root,
522            workspace_id=self.workspace_id,
523            config=source_config_dict,
524            client_id=self.client_id,
525            client_secret=self.client_secret,
526            bearer_token=self.bearer_token,
527        )
528        return CloudSource(
529            workspace=self,
530            connector_id=deployed_source.source_id,
531        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
533    def deploy_destination(
534        self,
535        name: str,
536        destination: Destination | dict[str, Any],
537        *,
538        unique: bool = True,
539        random_name_suffix: bool = False,
540    ) -> CloudDestination:
541        """Deploy a destination to the workspace.
542
543        Returns the newly deployed destination ID.
544
545        Args:
546            name: The name to use when deploying.
547            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
548                dictionary of configuration values.
549            unique: Whether to require a unique name. If `True`, duplicate names
550                are not allowed. Defaults to `True`.
551            random_name_suffix: Whether to append a random suffix to the name.
552        """
553        if isinstance(destination, Destination):
554            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
555            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
556            # raise ValueError(destination_conf_dict)
557        else:
558            destination_conf_dict = destination.copy()
559            if "destinationType" not in destination_conf_dict:
560                raise exc.PyAirbyteInputError(
561                    message="Missing `destinationType` in configuration dictionary.",
562                )
563
564        if random_name_suffix:
565            name += f" (ID: {text_util.generate_random_suffix()})"
566
567        if unique:
568            existing = self.list_destinations(name=name)
569            if existing:
570                raise exc.AirbyteDuplicateResourcesError(
571                    resource_type="destination",
572                    resource_name=name,
573                )
574
575        deployed_destination = api_util.create_destination(
576            name=name,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            config=destination_conf_dict,  # Wants a dataclass but accepts dict
580            client_id=self.client_id,
581            client_secret=self.client_secret,
582            bearer_token=self.bearer_token,
583        )
584        return CloudDestination(
585            workspace=self,
586            connector_id=deployed_destination.destination_id,
587        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
589    def permanently_delete_source(
590        self,
591        source: str | CloudSource,
592        *,
593        safe_mode: bool = True,
594    ) -> None:
595        """Delete a source from the workspace.
596
597        You can pass either the source ID `str` or a deployed `Source` object.
598
599        Args:
600            source: The source ID or CloudSource object to delete
601            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
602                (case insensitive) to prevent accidental deletion. Defaults to True.
603        """
604        if not isinstance(source, (str, CloudSource)):
605            raise exc.PyAirbyteInputError(
606                message="Invalid source type.",
607                input_value=type(source).__name__,
608            )
609
610        api_util.delete_source(
611            source_id=source.connector_id if isinstance(source, CloudSource) else source,
612            source_name=source.name if isinstance(source, CloudSource) else None,
613            api_root=self.api_root,
614            client_id=self.client_id,
615            client_secret=self.client_secret,
616            bearer_token=self.bearer_token,
617            safe_mode=safe_mode,
618        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
622    def permanently_delete_destination(
623        self,
624        destination: str | CloudDestination,
625        *,
626        safe_mode: bool = True,
627    ) -> None:
628        """Delete a deployed destination from the workspace.
629
630        You can pass either the `Cache` class or the deployed destination ID as a `str`.
631
632        Args:
633            destination: The destination ID or CloudDestination object to delete
634            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
635                (case insensitive) to prevent accidental deletion. Defaults to True.
636        """
637        if not isinstance(destination, (str, CloudDestination)):
638            raise exc.PyAirbyteInputError(
639                message="Invalid destination type.",
640                input_value=type(destination).__name__,
641            )
642
643        api_util.delete_destination(
644            destination_id=(
645                destination if isinstance(destination, str) else destination.destination_id
646            ),
647            destination_name=(
648                destination.name if isinstance(destination, CloudDestination) else None
649            ),
650            api_root=self.api_root,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654            safe_mode=safe_mode,
655        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
659    def deploy_connection(
660        self,
661        connection_name: str,
662        *,
663        source: CloudSource | str,
664        selected_streams: list[str],
665        destination: CloudDestination | str,
666        table_prefix: str | None = None,
667    ) -> CloudConnection:
668        """Create a new connection between an already deployed source and destination.
669
670        Returns the newly deployed connection object.
671
672        Args:
673            connection_name: The name of the connection.
674            source: The deployed source. You can pass a source ID or a CloudSource object.
675            destination: The deployed destination. You can pass a destination ID or a
676                CloudDestination object.
677            table_prefix: Optional. The table prefix to use when syncing to the destination.
678            selected_streams: The selected stream names to sync within the connection.
679        """
680        if not selected_streams:
681            raise exc.PyAirbyteInputError(
682                guidance="You must provide `selected_streams` when creating a connection."
683            )
684
685        source_id: str = source if isinstance(source, str) else source.connector_id
686        destination_id: str = (
687            destination if isinstance(destination, str) else destination.connector_id
688        )
689
690        deployed_connection = api_util.create_connection(
691            name=connection_name,
692            source_id=source_id,
693            destination_id=destination_id,
694            api_root=self.api_root,
695            workspace_id=self.workspace_id,
696            selected_stream_names=selected_streams,
697            prefix=table_prefix or "",
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701        )
702
703        return CloudConnection(
704            workspace=self,
705            connection_id=deployed_connection.connection_id,
706            source=deployed_connection.source_id,
707            destination=deployed_connection.destination_id,
708        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
710    def permanently_delete_connection(
711        self,
712        connection: str | CloudConnection,
713        *,
714        cascade_delete_source: bool = False,
715        cascade_delete_destination: bool = False,
716        safe_mode: bool = True,
717    ) -> None:
718        """Delete a deployed connection from the workspace.
719
720        Args:
721            connection: The connection ID or CloudConnection object to delete
722            cascade_delete_source: If True, also delete the source after deleting the connection
723            cascade_delete_destination: If True, also delete the destination after deleting
724                the connection
725            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
726                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
727                to cascade deletes.
728        """
729        if connection is None:
730            raise ValueError("No connection ID provided.")
731
732        if isinstance(connection, str):
733            connection = CloudConnection(
734                workspace=self,
735                connection_id=connection,
736            )
737
738        api_util.delete_connection(
739            connection_id=connection.connection_id,
740            connection_name=connection.name,
741            api_root=self.api_root,
742            workspace_id=self.workspace_id,
743            client_id=self.client_id,
744            client_secret=self.client_secret,
745            bearer_token=self.bearer_token,
746            safe_mode=safe_mode,
747        )
748
749        if cascade_delete_source:
750            self.permanently_delete_source(
751                source=connection.source_id,
752                safe_mode=safe_mode,
753            )
754        if cascade_delete_destination:
755            self.permanently_delete_destination(
756                destination=connection.destination_id,
757                safe_mode=safe_mode,
758            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
762    def list_connections(
763        self,
764        name: str | None = None,
765        *,
766        name_filter: Callable | None = None,
767    ) -> list[CloudConnection]:
768        """List connections by name in the workspace.
769
770        TODO: Add pagination support
771        """
772        connections = api_util.list_connections(
773            api_root=self.api_root,
774            workspace_id=self.workspace_id,
775            name=name,
776            name_filter=name_filter,
777            client_id=self.client_id,
778            client_secret=self.client_secret,
779            bearer_token=self.bearer_token,
780        )
781        return [
782            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
783                workspace=self,
784                connection_response=connection,
785            )
786            for connection in connections
787            if name is None or connection.name == name
788        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
790    def list_sources(
791        self,
792        name: str | None = None,
793        *,
794        name_filter: Callable | None = None,
795    ) -> list[CloudSource]:
796        """List all sources in the workspace.
797
798        TODO: Add pagination support
799        """
800        sources = api_util.list_sources(
801            api_root=self.api_root,
802            workspace_id=self.workspace_id,
803            name=name,
804            name_filter=name_filter,
805            client_id=self.client_id,
806            client_secret=self.client_secret,
807            bearer_token=self.bearer_token,
808        )
809        return [
810            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
811                workspace=self,
812                source_response=source,
813            )
814            for source in sources
815            if name is None or source.name == name
816        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
818    def list_destinations(
819        self,
820        name: str | None = None,
821        *,
822        name_filter: Callable | None = None,
823    ) -> list[CloudDestination]:
824        """List all destinations in the workspace.
825
826        TODO: Add pagination support
827        """
828        destinations = api_util.list_destinations(
829            api_root=self.api_root,
830            workspace_id=self.workspace_id,
831            name=name,
832            name_filter=name_filter,
833            client_id=self.client_id,
834            client_secret=self.client_secret,
835            bearer_token=self.bearer_token,
836        )
837        return [
838            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
839                workspace=self,
840                destination_response=destination,
841            )
842            for destination in destinations
843            if name is None or destination.name == name
844        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
846    def publish_custom_source_definition(
847        self,
848        name: str,
849        *,
850        manifest_yaml: dict[str, Any] | Path | str | None = None,
851        docker_image: str | None = None,
852        docker_tag: str | None = None,
853        unique: bool = True,
854        pre_validate: bool = True,
855        testing_values: dict[str, Any] | None = None,
856    ) -> CustomCloudSourceDefinition:
857        """Publish a custom source connector definition.
858
859        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
860        and docker_tag (for Docker connectors), but not both.
861
862        Args:
863            name: Display name for the connector definition
864            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
865            docker_image: Docker repository (e.g., 'airbyte/source-custom')
866            docker_tag: Docker image tag (e.g., '1.0.0')
867            unique: Whether to enforce name uniqueness
868            pre_validate: Whether to validate manifest client-side (YAML only)
869            testing_values: Optional configuration values to use for testing in the
870                Connector Builder UI. If provided, these values are stored as the complete
871                testing values object for the connector builder project (replaces any existing
872                values), allowing immediate test read operations.
873
874        Returns:
875            CustomCloudSourceDefinition object representing the created definition
876
877        Raises:
878            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
879            AirbyteDuplicateResourcesError: If unique=True and name already exists
880        """
881        is_yaml = manifest_yaml is not None
882        is_docker = docker_image is not None
883
884        if is_yaml == is_docker:
885            raise exc.PyAirbyteInputError(
886                message=(
887                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
888                    "docker_image + docker_tag (for Docker connectors), but not both"
889                ),
890                context={
891                    "manifest_yaml_provided": is_yaml,
892                    "docker_image_provided": is_docker,
893                },
894            )
895
896        if is_docker and docker_tag is None:
897            raise exc.PyAirbyteInputError(
898                message="docker_tag is required when docker_image is specified",
899                context={"docker_image": docker_image},
900            )
901
902        if unique:
903            existing = self.list_custom_source_definitions(
904                definition_type="yaml" if is_yaml else "docker",
905            )
906            if any(d.name == name for d in existing):
907                raise exc.AirbyteDuplicateResourcesError(
908                    resource_type="custom_source_definition",
909                    resource_name=name,
910                )
911
912        if is_yaml:
913            manifest_dict: dict[str, Any]
914            if isinstance(manifest_yaml, Path):
915                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
916            elif isinstance(manifest_yaml, str):
917                manifest_dict = yaml.safe_load(manifest_yaml)
918            elif manifest_yaml is not None:
919                manifest_dict = manifest_yaml
920            else:
921                raise exc.PyAirbyteInputError(
922                    message="manifest_yaml is required for YAML connectors",
923                    context={"name": name},
924                )
925
926            if pre_validate:
927                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
928
929            result = api_util.create_custom_yaml_source_definition(
930                name=name,
931                workspace_id=self.workspace_id,
932                manifest=manifest_dict,
933                api_root=self.api_root,
934                client_id=self.client_id,
935                client_secret=self.client_secret,
936                bearer_token=self.bearer_token,
937            )
938            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
939                self, result
940            )
941
942            # Set testing values if provided
943            if testing_values is not None:
944                custom_definition.set_testing_values(testing_values)
945
946            return custom_definition
947
948        raise NotImplementedError(
949            "Docker custom source definitions are not yet supported. "
950            "Only YAML manifest-based custom sources are currently available."
951        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
953    def list_custom_source_definitions(
954        self,
955        *,
956        definition_type: Literal["yaml", "docker"],
957    ) -> list[CustomCloudSourceDefinition]:
958        """List custom source connector definitions.
959
960        Args:
961            definition_type: Connector type to list ("yaml" or "docker"). Required.
962
963        Returns:
964            List of CustomCloudSourceDefinition objects matching the specified type
965        """
966        if definition_type == "yaml":
967            yaml_definitions = api_util.list_custom_yaml_source_definitions(
968                workspace_id=self.workspace_id,
969                api_root=self.api_root,
970                client_id=self.client_id,
971                client_secret=self.client_secret,
972                bearer_token=self.bearer_token,
973            )
974            return [
975                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
976                for d in yaml_definitions
977            ]
978
979        raise NotImplementedError(
980            "Docker custom source definitions are not yet supported. "
981            "Only YAML manifest-based custom sources are currently available."
982        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
 984    def get_custom_source_definition(
 985        self,
 986        definition_id: str,
 987        *,
 988        definition_type: Literal["yaml", "docker"],
 989    ) -> CustomCloudSourceDefinition:
 990        """Get a specific custom source definition by ID.
 991
 992        Args:
 993            definition_id: The definition ID
 994            definition_type: Connector type ("yaml" or "docker"). Required.
 995
 996        Returns:
 997            CustomCloudSourceDefinition object
 998        """
 999        if definition_type == "yaml":
1000            result = api_util.get_custom_yaml_source_definition(
1001                workspace_id=self.workspace_id,
1002                definition_id=definition_id,
1003                api_root=self.api_root,
1004                client_id=self.client_id,
1005                client_secret=self.client_secret,
1006                bearer_token=self.bearer_token,
1007            )
1008            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1009
1010        raise NotImplementedError(
1011            "Docker custom source definitions are not yet supported. "
1012            "Only YAML manifest-based custom sources are currently available."
1013        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object

class CloudConnection:
 21class CloudConnection:  # noqa: PLR0904  # Too many public methods
 22    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 23
 24    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 25    """
 26
 27    def __init__(
 28        self,
 29        workspace: CloudWorkspace,
 30        connection_id: str,
 31        source: str | None = None,
 32        destination: str | None = None,
 33    ) -> None:
 34        """It is not recommended to create a `CloudConnection` object directly.
 35
 36        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 37        """
 38        self.connection_id = connection_id
 39        """The ID of the connection."""
 40
 41        self.workspace = workspace
 42        """The workspace that the connection belongs to."""
 43
 44        self._source_id = source
 45        """The ID of the source."""
 46
 47        self._destination_id = destination
 48        """The ID of the destination."""
 49
 50        self._connection_info: ConnectionResponse | None = None
 51        """The connection info object. (Cached.)"""
 52
 53        self._cloud_source_object: CloudSource | None = None
 54        """The source object. (Cached.)"""
 55
 56        self._cloud_destination_object: CloudDestination | None = None
 57        """The destination object. (Cached.)"""
 58
 59    def _fetch_connection_info(
 60        self,
 61        *,
 62        force_refresh: bool = False,
 63        verify: bool = True,
 64    ) -> ConnectionResponse:
 65        """Fetch and cache connection info from the API.
 66
 67        By default, this method will only fetch from the API if connection info is not
 68        already cached. It also verifies that the connection belongs to the expected
 69        workspace unless verification is explicitly disabled.
 70
 71        Args:
 72            force_refresh: If True, always fetch from the API even if cached.
 73                If False (default), only fetch if not already cached.
 74            verify: If True (default), verify that the connection is valid (e.g., that
 75                the workspace_id matches this object's workspace). Raises an error if
 76                validation fails.
 77
 78        Returns:
 79            The ConnectionResponse from the API.
 80
 81        Raises:
 82            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 83                workspace_id doesn't match the expected workspace.
 84            AirbyteMissingResourceError: If the connection doesn't exist.
 85        """
 86        if not force_refresh and self._connection_info is not None:
 87            # Use cached info, but still verify if requested
 88            if verify:
 89                self._verify_workspace_match(self._connection_info)
 90            return self._connection_info
 91
 92        # Fetch from API
 93        connection_info = api_util.get_connection(
 94            workspace_id=self.workspace.workspace_id,
 95            connection_id=self.connection_id,
 96            api_root=self.workspace.api_root,
 97            client_id=self.workspace.client_id,
 98            client_secret=self.workspace.client_secret,
 99            bearer_token=self.workspace.bearer_token,
100        )
101
102        # Cache the result first (before verification may raise)
103        self._connection_info = connection_info
104
105        # Verify if requested
106        if verify:
107            self._verify_workspace_match(connection_info)
108
109        return connection_info
110
111    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
112        """Verify that the connection belongs to the expected workspace.
113
114        Raises:
115            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
116        """
117        if connection_info.workspace_id != self.workspace.workspace_id:
118            raise AirbyteWorkspaceMismatchError(
119                resource_type="connection",
120                resource_id=self.connection_id,
121                workspace=self.workspace,
122                expected_workspace_id=self.workspace.workspace_id,
123                actual_workspace_id=connection_info.workspace_id,
124                message=(
125                    f"Connection '{self.connection_id}' belongs to workspace "
126                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
127                ),
128            )
129
130    def check_is_valid(self) -> bool:
131        """Check if this connection exists and belongs to the expected workspace.
132
133        This method fetches connection info from the API (if not already cached) and
134        verifies that the connection's workspace_id matches the workspace associated
135        with this CloudConnection object.
136
137        Returns:
138            True if the connection exists and belongs to the expected workspace.
139
140        Raises:
141            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
142            AirbyteMissingResourceError: If the connection doesn't exist.
143        """
144        self._fetch_connection_info(force_refresh=False, verify=True)
145        return True
146
147    @classmethod
148    def _from_connection_response(
149        cls,
150        workspace: CloudWorkspace,
151        connection_response: ConnectionResponse,
152    ) -> CloudConnection:
153        """Create a CloudConnection from a ConnectionResponse."""
154        result = cls(
155            workspace=workspace,
156            connection_id=connection_response.connection_id,
157            source=connection_response.source_id,
158            destination=connection_response.destination_id,
159        )
160        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
161        return result
162
163    # Properties
164
165    @property
166    def name(self) -> str | None:
167        """Get the display name of the connection, if available.
168
169        E.g. "My Postgres to Snowflake", not the connection ID.
170        """
171        if not self._connection_info:
172            self._connection_info = self._fetch_connection_info()
173
174        return self._connection_info.name
175
176    @property
177    def source_id(self) -> str:
178        """The ID of the source."""
179        if not self._source_id:
180            if not self._connection_info:
181                self._connection_info = self._fetch_connection_info()
182
183            self._source_id = self._connection_info.source_id
184
185        return self._source_id
186
187    @property
188    def source(self) -> CloudSource:
189        """Get the source object."""
190        if self._cloud_source_object:
191            return self._cloud_source_object
192
193        self._cloud_source_object = CloudSource(
194            workspace=self.workspace,
195            connector_id=self.source_id,
196        )
197        return self._cloud_source_object
198
199    @property
200    def destination_id(self) -> str:
201        """The ID of the destination."""
202        if not self._destination_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._destination_id = self._connection_info.destination_id
207
208        return self._destination_id
209
210    @property
211    def destination(self) -> CloudDestination:
212        """Get the destination object."""
213        if self._cloud_destination_object:
214            return self._cloud_destination_object
215
216        self._cloud_destination_object = CloudDestination(
217            workspace=self.workspace,
218            connector_id=self.destination_id,
219        )
220        return self._cloud_destination_object
221
222    @property
223    def stream_names(self) -> list[str]:
224        """The stream names."""
225        if not self._connection_info:
226            self._connection_info = self._fetch_connection_info()
227
228        return [stream.name for stream in self._connection_info.configurations.streams or []]
229
230    @property
231    def table_prefix(self) -> str:
232        """The table prefix."""
233        if not self._connection_info:
234            self._connection_info = self._fetch_connection_info()
235
236        return self._connection_info.prefix or ""
237
238    @property
239    def connection_url(self) -> str | None:
240        """The web URL to the connection."""
241        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
242
243    @property
244    def job_history_url(self) -> str | None:
245        """The URL to the job history for the connection."""
246        return f"{self.connection_url}/timeline"
247
248    # Run Sync
249
250    def run_sync(
251        self,
252        *,
253        wait: bool = True,
254        wait_timeout: int = 300,
255    ) -> SyncResult:
256        """Run a sync."""
257        connection_response = api_util.run_connection(
258            connection_id=self.connection_id,
259            api_root=self.workspace.api_root,
260            workspace_id=self.workspace.workspace_id,
261            client_id=self.workspace.client_id,
262            client_secret=self.workspace.client_secret,
263            bearer_token=self.workspace.bearer_token,
264        )
265        sync_result = SyncResult(
266            workspace=self.workspace,
267            connection=self,
268            job_id=connection_response.job_id,
269        )
270
271        if wait:
272            sync_result.wait_for_completion(
273                wait_timeout=wait_timeout,
274                raise_failure=True,
275                raise_timeout=True,
276            )
277
278        return sync_result
279
280    def __repr__(self) -> str:
281        """String representation of the connection."""
282        return (
283            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
284            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
285        )
286
287    # Logs
288
289    def get_previous_sync_logs(
290        self,
291        *,
292        limit: int = 20,
293        offset: int | None = None,
294        from_tail: bool = True,
295    ) -> list[SyncResult]:
296        """Get previous sync jobs for a connection with pagination support.
297
298        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
299        rows_synced, start_time). Full log text can be fetched lazily via
300        `SyncResult.get_full_log_text()`.
301
302        Args:
303            limit: Maximum number of jobs to return. Defaults to 20.
304            offset: Number of jobs to skip from the beginning. Defaults to None (0).
305            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
306                If False, returns jobs ordered oldest-first (createdAt ASC).
307                Defaults to True.
308
309        Returns:
310            A list of SyncResult objects representing the sync jobs.
311        """
312        order_by = (
313            api_util.JOB_ORDER_BY_CREATED_AT_DESC
314            if from_tail
315            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
316        )
317        sync_logs: list[JobResponse] = api_util.get_job_logs(
318            connection_id=self.connection_id,
319            api_root=self.workspace.api_root,
320            workspace_id=self.workspace.workspace_id,
321            limit=limit,
322            offset=offset,
323            order_by=order_by,
324            client_id=self.workspace.client_id,
325            client_secret=self.workspace.client_secret,
326            bearer_token=self.workspace.bearer_token,
327        )
328        return [
329            SyncResult(
330                workspace=self.workspace,
331                connection=self,
332                job_id=sync_log.job_id,
333                _latest_job_info=sync_log,
334            )
335            for sync_log in sync_logs
336        ]
337
338    def get_sync_result(
339        self,
340        job_id: int | None = None,
341    ) -> SyncResult | None:
342        """Get the sync result for the connection.
343
344        If `job_id` is not provided, the most recent sync job will be used.
345
346        Returns `None` if job_id is omitted and no previous jobs are found.
347        """
348        if job_id is None:
349            # Get the most recent sync job
350            results = self.get_previous_sync_logs(
351                limit=1,
352            )
353            if results:
354                return results[0]
355
356            return None
357
358        # Get the sync job by ID (lazy loaded)
359        return SyncResult(
360            workspace=self.workspace,
361            connection=self,
362            job_id=job_id,
363        )
364
365    # Artifacts
366
367    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
368        """Get the connection state artifacts.
369
370        Returns the persisted state for this connection, which can be used
371        when debugging incremental syncs.
372
373        Uses the Config API endpoint: POST /v1/state/get
374
375        Returns:
376            List of state objects for each stream, or None if no state is set.
377        """
378        state_response = api_util.get_connection_state(
379            connection_id=self.connection_id,
380            api_root=self.workspace.api_root,
381            client_id=self.workspace.client_id,
382            client_secret=self.workspace.client_secret,
383            bearer_token=self.workspace.bearer_token,
384        )
385        if state_response.get("stateType") == "not_set":
386            return None
387        return state_response.get("streamState", [])
388
389    def get_catalog_artifact(self) -> dict[str, Any] | None:
390        """Get the configured catalog for this connection.
391
392        Returns the full configured catalog (syncCatalog) for this connection,
393        including stream schemas, sync modes, cursor fields, and primary keys.
394
395        Uses the Config API endpoint: POST /v1/web_backend/connections/get
396
397        Returns:
398            Dictionary containing the configured catalog, or `None` if not found.
399        """
400        connection_response = api_util.get_connection_catalog(
401            connection_id=self.connection_id,
402            api_root=self.workspace.api_root,
403            client_id=self.workspace.client_id,
404            client_secret=self.workspace.client_secret,
405            bearer_token=self.workspace.bearer_token,
406        )
407        return connection_response.get("syncCatalog")
408
409    def rename(self, name: str) -> CloudConnection:
410        """Rename the connection.
411
412        Args:
413            name: New name for the connection
414
415        Returns:
416            Updated CloudConnection object with refreshed info
417        """
418        updated_response = api_util.patch_connection(
419            connection_id=self.connection_id,
420            api_root=self.workspace.api_root,
421            client_id=self.workspace.client_id,
422            client_secret=self.workspace.client_secret,
423            bearer_token=self.workspace.bearer_token,
424            name=name,
425        )
426        self._connection_info = updated_response
427        return self
428
429    def set_table_prefix(self, prefix: str) -> CloudConnection:
430        """Set the table prefix for the connection.
431
432        Args:
433            prefix: New table prefix to use when syncing to the destination
434
435        Returns:
436            Updated CloudConnection object with refreshed info
437        """
438        updated_response = api_util.patch_connection(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            prefix=prefix,
445        )
446        self._connection_info = updated_response
447        return self
448
449    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
450        """Set the selected streams for the connection.
451
452        This is a destructive operation that can break existing connections if the
453        stream selection is changed incorrectly. Use with caution.
454
455        Args:
456            stream_names: List of stream names to sync
457
458        Returns:
459            Updated CloudConnection object with refreshed info
460        """
461        configurations = api_util.build_stream_configurations(stream_names)
462
463        updated_response = api_util.patch_connection(
464            connection_id=self.connection_id,
465            api_root=self.workspace.api_root,
466            client_id=self.workspace.client_id,
467            client_secret=self.workspace.client_secret,
468            bearer_token=self.workspace.bearer_token,
469            configurations=configurations,
470        )
471        self._connection_info = updated_response
472        return self
473
474    # Enable/Disable
475
476    @property
477    def enabled(self) -> bool:
478        """Get the current enabled status of the connection.
479
480        This property always fetches fresh data from the API to ensure accuracy,
481        as another process or user may have toggled the setting.
482
483        Returns:
484            True if the connection status is 'active', False otherwise.
485        """
486        connection_info = self._fetch_connection_info(force_refresh=True)
487        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
488
489    @enabled.setter
490    def enabled(self, value: bool) -> None:
491        """Set the enabled status of the connection.
492
493        Args:
494            value: True to enable (set status to 'active'), False to disable
495                (set status to 'inactive').
496        """
497        self.set_enabled(enabled=value)
498
499    def set_enabled(
500        self,
501        *,
502        enabled: bool,
503        ignore_noop: bool = True,
504    ) -> None:
505        """Set the enabled status of the connection.
506
507        Args:
508            enabled: True to enable (set status to 'active'), False to disable
509                (set status to 'inactive').
510            ignore_noop: If True (default), silently return if the connection is already
511                in the requested state. If False, raise ValueError when the requested
512                state matches the current state.
513
514        Raises:
515            ValueError: If ignore_noop is False and the connection is already in the
516                requested state.
517        """
518        # Always fetch fresh data to check current status
519        connection_info = self._fetch_connection_info(force_refresh=True)
520        current_status = connection_info.status
521        desired_status = (
522            api_util.models.ConnectionStatusEnum.ACTIVE
523            if enabled
524            else api_util.models.ConnectionStatusEnum.INACTIVE
525        )
526
527        if current_status == desired_status:
528            if ignore_noop:
529                return
530            raise ValueError(
531                f"Connection is already {'enabled' if enabled else 'disabled'}. "
532                f"Current status: {current_status}"
533            )
534
535        updated_response = api_util.patch_connection(
536            connection_id=self.connection_id,
537            api_root=self.workspace.api_root,
538            client_id=self.workspace.client_id,
539            client_secret=self.workspace.client_secret,
540            bearer_token=self.workspace.bearer_token,
541            status=desired_status,
542        )
543        self._connection_info = updated_response
544
545    # Scheduling
546
547    def set_schedule(
548        self,
549        cron_expression: str,
550    ) -> None:
551        """Set a cron schedule for the connection.
552
553        Args:
554            cron_expression: A cron expression defining when syncs should run.
555
556        Examples:
557                - "0 0 * * *" - Daily at midnight UTC
558                - "0 */6 * * *" - Every 6 hours
559                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
560        """
561        schedule = api_util.models.AirbyteAPIConnectionSchedule(
562            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
563            cron_expression=cron_expression,
564        )
565        updated_response = api_util.patch_connection(
566            connection_id=self.connection_id,
567            api_root=self.workspace.api_root,
568            client_id=self.workspace.client_id,
569            client_secret=self.workspace.client_secret,
570            bearer_token=self.workspace.bearer_token,
571            schedule=schedule,
572        )
573        self._connection_info = updated_response
574
575    def set_manual_schedule(self) -> None:
576        """Set the connection to manual scheduling.
577
578        Disables automatic syncs. Syncs will only run when manually triggered.
579        """
580        schedule = api_util.models.AirbyteAPIConnectionSchedule(
581            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
582        )
583        updated_response = api_util.patch_connection(
584            connection_id=self.connection_id,
585            api_root=self.workspace.api_root,
586            client_id=self.workspace.client_id,
587            client_secret=self.workspace.client_secret,
588            bearer_token=self.workspace.bearer_token,
589            schedule=schedule,
590        )
591        self._connection_info = updated_response
592
593    # Deletions
594
595    def permanently_delete(
596        self,
597        *,
598        cascade_delete_source: bool = False,
599        cascade_delete_destination: bool = False,
600    ) -> None:
601        """Delete the connection.
602
603        Args:
604            cascade_delete_source: Whether to also delete the source.
605            cascade_delete_destination: Whether to also delete the destination.
606        """
607        self.workspace.permanently_delete_connection(self)
608
609        if cascade_delete_source:
610            self.workspace.permanently_delete_source(self.source_id)
611
612        if cascade_delete_destination:
613            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
27    def __init__(
28        self,
29        workspace: CloudWorkspace,
30        connection_id: str,
31        source: str | None = None,
32        destination: str | None = None,
33    ) -> None:
34        """It is not recommended to create a `CloudConnection` object directly.
35
36        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
37        """
38        self.connection_id = connection_id
39        """The ID of the connection."""
40
41        self.workspace = workspace
42        """The workspace that the connection belongs to."""
43
44        self._source_id = source
45        """The ID of the source."""
46
47        self._destination_id = destination
48        """The ID of the destination."""
49
50        self._connection_info: ConnectionResponse | None = None
51        """The connection info object. (Cached.)"""
52
53        self._cloud_source_object: CloudSource | None = None
54        """The source object. (Cached.)"""
55
56        self._cloud_destination_object: CloudDestination | None = None
57        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
130    def check_is_valid(self) -> bool:
131        """Check if this connection exists and belongs to the expected workspace.
132
133        This method fetches connection info from the API (if not already cached) and
134        verifies that the connection's workspace_id matches the workspace associated
135        with this CloudConnection object.
136
137        Returns:
138            True if the connection exists and belongs to the expected workspace.
139
140        Raises:
141            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
142            AirbyteMissingResourceError: If the connection doesn't exist.
143        """
144        self._fetch_connection_info(force_refresh=False, verify=True)
145        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
165    @property
166    def name(self) -> str | None:
167        """Get the display name of the connection, if available.
168
169        E.g. "My Postgres to Snowflake", not the connection ID.
170        """
171        if not self._connection_info:
172            self._connection_info = self._fetch_connection_info()
173
174        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
176    @property
177    def source_id(self) -> str:
178        """The ID of the source."""
179        if not self._source_id:
180            if not self._connection_info:
181                self._connection_info = self._fetch_connection_info()
182
183            self._source_id = self._connection_info.source_id
184
185        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
187    @property
188    def source(self) -> CloudSource:
189        """Get the source object."""
190        if self._cloud_source_object:
191            return self._cloud_source_object
192
193        self._cloud_source_object = CloudSource(
194            workspace=self.workspace,
195            connector_id=self.source_id,
196        )
197        return self._cloud_source_object

Get the source object.

destination_id: str
199    @property
200    def destination_id(self) -> str:
201        """The ID of the destination."""
202        if not self._destination_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._destination_id = self._connection_info.destination_id
207
208        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
210    @property
211    def destination(self) -> CloudDestination:
212        """Get the destination object."""
213        if self._cloud_destination_object:
214            return self._cloud_destination_object
215
216        self._cloud_destination_object = CloudDestination(
217            workspace=self.workspace,
218            connector_id=self.destination_id,
219        )
220        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
222    @property
223    def stream_names(self) -> list[str]:
224        """The stream names."""
225        if not self._connection_info:
226            self._connection_info = self._fetch_connection_info()
227
228        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
230    @property
231    def table_prefix(self) -> str:
232        """The table prefix."""
233        if not self._connection_info:
234            self._connection_info = self._fetch_connection_info()
235
236        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
238    @property
239    def connection_url(self) -> str | None:
240        """The web URL to the connection."""
241        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
243    @property
244    def job_history_url(self) -> str | None:
245        """The URL to the job history for the connection."""
246        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
250    def run_sync(
251        self,
252        *,
253        wait: bool = True,
254        wait_timeout: int = 300,
255    ) -> SyncResult:
256        """Run a sync."""
257        connection_response = api_util.run_connection(
258            connection_id=self.connection_id,
259            api_root=self.workspace.api_root,
260            workspace_id=self.workspace.workspace_id,
261            client_id=self.workspace.client_id,
262            client_secret=self.workspace.client_secret,
263            bearer_token=self.workspace.bearer_token,
264        )
265        sync_result = SyncResult(
266            workspace=self.workspace,
267            connection=self,
268            job_id=connection_response.job_id,
269        )
270
271        if wait:
272            sync_result.wait_for_completion(
273                wait_timeout=wait_timeout,
274                raise_failure=True,
275                raise_timeout=True,
276            )
277
278        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True) -> list[SyncResult]:
289    def get_previous_sync_logs(
290        self,
291        *,
292        limit: int = 20,
293        offset: int | None = None,
294        from_tail: bool = True,
295    ) -> list[SyncResult]:
296        """Get previous sync jobs for a connection with pagination support.
297
298        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
299        rows_synced, start_time). Full log text can be fetched lazily via
300        `SyncResult.get_full_log_text()`.
301
302        Args:
303            limit: Maximum number of jobs to return. Defaults to 20.
304            offset: Number of jobs to skip from the beginning. Defaults to None (0).
305            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
306                If False, returns jobs ordered oldest-first (createdAt ASC).
307                Defaults to True.
308
309        Returns:
310            A list of SyncResult objects representing the sync jobs.
311        """
312        order_by = (
313            api_util.JOB_ORDER_BY_CREATED_AT_DESC
314            if from_tail
315            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
316        )
317        sync_logs: list[JobResponse] = api_util.get_job_logs(
318            connection_id=self.connection_id,
319            api_root=self.workspace.api_root,
320            workspace_id=self.workspace.workspace_id,
321            limit=limit,
322            offset=offset,
323            order_by=order_by,
324            client_id=self.workspace.client_id,
325            client_secret=self.workspace.client_secret,
326            bearer_token=self.workspace.bearer_token,
327        )
328        return [
329            SyncResult(
330                workspace=self.workspace,
331                connection=self,
332                job_id=sync_log.job_id,
333                _latest_job_info=sync_log,
334            )
335            for sync_log in sync_logs
336        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
338    def get_sync_result(
339        self,
340        job_id: int | None = None,
341    ) -> SyncResult | None:
342        """Get the sync result for the connection.
343
344        If `job_id` is not provided, the most recent sync job will be used.
345
346        Returns `None` if job_id is omitted and no previous jobs are found.
347        """
348        if job_id is None:
349            # Get the most recent sync job
350            results = self.get_previous_sync_logs(
351                limit=1,
352            )
353            if results:
354                return results[0]
355
356            return None
357
358        # Get the sync job by ID (lazy loaded)
359        return SyncResult(
360            workspace=self.workspace,
361            connection=self,
362            job_id=job_id,
363        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
367    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
368        """Get the connection state artifacts.
369
370        Returns the persisted state for this connection, which can be used
371        when debugging incremental syncs.
372
373        Uses the Config API endpoint: POST /v1/state/get
374
375        Returns:
376            List of state objects for each stream, or None if no state is set.
377        """
378        state_response = api_util.get_connection_state(
379            connection_id=self.connection_id,
380            api_root=self.workspace.api_root,
381            client_id=self.workspace.client_id,
382            client_secret=self.workspace.client_secret,
383            bearer_token=self.workspace.bearer_token,
384        )
385        if state_response.get("stateType") == "not_set":
386            return None
387        return state_response.get("streamState", [])

Get the connection state artifacts.

Returns the persisted state for this connection, which can be used when debugging incremental syncs.

Uses the Config API endpoint: POST /v1/state/get

Returns:

List of state objects for each stream, or None if no state is set.

def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
389    def get_catalog_artifact(self) -> dict[str, Any] | None:
390        """Get the configured catalog for this connection.
391
392        Returns the full configured catalog (syncCatalog) for this connection,
393        including stream schemas, sync modes, cursor fields, and primary keys.
394
395        Uses the Config API endpoint: POST /v1/web_backend/connections/get
396
397        Returns:
398            Dictionary containing the configured catalog, or `None` if not found.
399        """
400        connection_response = api_util.get_connection_catalog(
401            connection_id=self.connection_id,
402            api_root=self.workspace.api_root,
403            client_id=self.workspace.client_id,
404            client_secret=self.workspace.client_secret,
405            bearer_token=self.workspace.bearer_token,
406        )
407        return connection_response.get("syncCatalog")

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def rename(self, name: str) -> CloudConnection:
409    def rename(self, name: str) -> CloudConnection:
410        """Rename the connection.
411
412        Args:
413            name: New name for the connection
414
415        Returns:
416            Updated CloudConnection object with refreshed info
417        """
418        updated_response = api_util.patch_connection(
419            connection_id=self.connection_id,
420            api_root=self.workspace.api_root,
421            client_id=self.workspace.client_id,
422            client_secret=self.workspace.client_secret,
423            bearer_token=self.workspace.bearer_token,
424            name=name,
425        )
426        self._connection_info = updated_response
427        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
429    def set_table_prefix(self, prefix: str) -> CloudConnection:
430        """Set the table prefix for the connection.
431
432        Args:
433            prefix: New table prefix to use when syncing to the destination
434
435        Returns:
436            Updated CloudConnection object with refreshed info
437        """
438        updated_response = api_util.patch_connection(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            prefix=prefix,
445        )
446        self._connection_info = updated_response
447        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
449    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
450        """Set the selected streams for the connection.
451
452        This is a destructive operation that can break existing connections if the
453        stream selection is changed incorrectly. Use with caution.
454
455        Args:
456            stream_names: List of stream names to sync
457
458        Returns:
459            Updated CloudConnection object with refreshed info
460        """
461        configurations = api_util.build_stream_configurations(stream_names)
462
463        updated_response = api_util.patch_connection(
464            connection_id=self.connection_id,
465            api_root=self.workspace.api_root,
466            client_id=self.workspace.client_id,
467            client_secret=self.workspace.client_secret,
468            bearer_token=self.workspace.bearer_token,
469            configurations=configurations,
470        )
471        self._connection_info = updated_response
472        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
476    @property
477    def enabled(self) -> bool:
478        """Get the current enabled status of the connection.
479
480        This property always fetches fresh data from the API to ensure accuracy,
481        as another process or user may have toggled the setting.
482
483        Returns:
484            True if the connection status is 'active', False otherwise.
485        """
486        connection_info = self._fetch_connection_info(force_refresh=True)
487        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
499    def set_enabled(
500        self,
501        *,
502        enabled: bool,
503        ignore_noop: bool = True,
504    ) -> None:
505        """Set the enabled status of the connection.
506
507        Args:
508            enabled: True to enable (set status to 'active'), False to disable
509                (set status to 'inactive').
510            ignore_noop: If True (default), silently return if the connection is already
511                in the requested state. If False, raise ValueError when the requested
512                state matches the current state.
513
514        Raises:
515            ValueError: If ignore_noop is False and the connection is already in the
516                requested state.
517        """
518        # Always fetch fresh data to check current status
519        connection_info = self._fetch_connection_info(force_refresh=True)
520        current_status = connection_info.status
521        desired_status = (
522            api_util.models.ConnectionStatusEnum.ACTIVE
523            if enabled
524            else api_util.models.ConnectionStatusEnum.INACTIVE
525        )
526
527        if current_status == desired_status:
528            if ignore_noop:
529                return
530            raise ValueError(
531                f"Connection is already {'enabled' if enabled else 'disabled'}. "
532                f"Current status: {current_status}"
533            )
534
535        updated_response = api_util.patch_connection(
536            connection_id=self.connection_id,
537            api_root=self.workspace.api_root,
538            client_id=self.workspace.client_id,
539            client_secret=self.workspace.client_secret,
540            bearer_token=self.workspace.bearer_token,
541            status=desired_status,
542        )
543        self._connection_info = updated_response

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
547    def set_schedule(
548        self,
549        cron_expression: str,
550    ) -> None:
551        """Set a cron schedule for the connection.
552
553        Args:
554            cron_expression: A cron expression defining when syncs should run.
555
556        Examples:
557                - "0 0 * * *" - Daily at midnight UTC
558                - "0 */6 * * *" - Every 6 hours
559                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
560        """
561        schedule = api_util.models.AirbyteAPIConnectionSchedule(
562            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
563            cron_expression=cron_expression,
564        )
565        updated_response = api_util.patch_connection(
566            connection_id=self.connection_id,
567            api_root=self.workspace.api_root,
568            client_id=self.workspace.client_id,
569            client_secret=self.workspace.client_secret,
570            bearer_token=self.workspace.bearer_token,
571            schedule=schedule,
572        )
573        self._connection_info = updated_response

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
575    def set_manual_schedule(self) -> None:
576        """Set the connection to manual scheduling.
577
578        Disables automatic syncs. Syncs will only run when manually triggered.
579        """
580        schedule = api_util.models.AirbyteAPIConnectionSchedule(
581            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
582        )
583        updated_response = api_util.patch_connection(
584            connection_id=self.connection_id,
585            api_root=self.workspace.api_root,
586            client_id=self.workspace.client_id,
587            client_secret=self.workspace.client_secret,
588            bearer_token=self.workspace.bearer_token,
589            schedule=schedule,
590        )
591        self._connection_info = updated_response

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
595    def permanently_delete(
596        self,
597        *,
598        cascade_delete_source: bool = False,
599        cascade_delete_destination: bool = False,
600    ) -> None:
601        """Delete the connection.
602
603        Args:
604            cascade_delete_source: Whether to also delete the source.
605            cascade_delete_destination: Whether to also delete the destination.
606        """
607        self.workspace.permanently_delete_connection(self)
608
609        if cascade_delete_source:
610            self.workspace.permanently_delete_source(self.source_id)
611
612        if cascade_delete_destination:
613            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class CloudClientConfig:
 57@dataclass
 58class CloudClientConfig:
 59    """Client configuration for Airbyte Cloud API.
 60
 61    This class encapsulates the authentication and API configuration needed to connect
 62    to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually
 63    exclusive authentication methods:
 64
 65    1. OAuth2 client credentials flow (client_id + client_secret)
 66    2. Bearer token authentication
 67
 68    Exactly one authentication method must be provided. Providing both or neither
 69    will raise a validation error.
 70
 71    Attributes:
 72        client_id: OAuth2 client ID for client credentials flow.
 73        client_secret: OAuth2 client secret for client credentials flow.
 74        bearer_token: Pre-generated bearer token for direct authentication.
 75        api_root: The API root URL. Defaults to Airbyte Cloud API.
 76    """
 77
 78    client_id: SecretString | None = None
 79    """OAuth2 client ID for client credentials authentication."""
 80
 81    client_secret: SecretString | None = None
 82    """OAuth2 client secret for client credentials authentication."""
 83
 84    bearer_token: SecretString | None = None
 85    """Bearer token for direct authentication (alternative to client credentials)."""
 86
 87    api_root: str = api_util.CLOUD_API_ROOT
 88    """The API root URL. Defaults to Airbyte Cloud API."""
 89
 90    def __post_init__(self) -> None:
 91        """Validate credentials and ensure secrets are properly wrapped."""
 92        # Wrap secrets in SecretString if they aren't already
 93        if self.client_id is not None:
 94            self.client_id = SecretString(self.client_id)
 95        if self.client_secret is not None:
 96            self.client_secret = SecretString(self.client_secret)
 97        if self.bearer_token is not None:
 98            self.bearer_token = SecretString(self.bearer_token)
 99
100        # Validate mutual exclusivity
101        has_client_credentials = self.client_id is not None or self.client_secret is not None
102        has_bearer_token = self.bearer_token is not None
103
104        if has_client_credentials and has_bearer_token:
105            raise PyAirbyteInputError(
106                message="Cannot use both client credentials and bearer token authentication.",
107                guidance=(
108                    "Provide either client_id and client_secret together, "
109                    "or bearer_token alone, but not both."
110                ),
111            )
112
113        if has_client_credentials and (self.client_id is None or self.client_secret is None):
114            # If using client credentials, both must be provided
115            raise PyAirbyteInputError(
116                message="Incomplete client credentials.",
117                guidance=(
118                    "When using client credentials authentication, "
119                    "both client_id and client_secret must be provided."
120                ),
121            )
122
123        if not has_client_credentials and not has_bearer_token:
124            raise PyAirbyteInputError(
125                message="No authentication credentials provided.",
126                guidance=(
127                    "Provide either client_id and client_secret together for OAuth2 "
128                    "client credentials flow, or bearer_token for direct authentication."
129                ),
130            )
131
132    @property
133    def uses_bearer_token(self) -> bool:
134        """Return True if using bearer token authentication."""
135        return self.bearer_token is not None
136
137    @property
138    def uses_client_credentials(self) -> bool:
139        """Return True if using client credentials authentication."""
140        return self.client_id is not None and self.client_secret is not None
141
142    @classmethod
143    def from_env(
144        cls,
145        *,
146        api_root: str | None = None,
147    ) -> CloudClientConfig:
148        """Create CloudClientConfig from environment variables.
149
150        This factory method resolves credentials from environment variables,
151        providing a convenient way to create credentials without explicitly
152        passing secrets.
153
154        Environment variables used:
155            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
156            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
157            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
158            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
159
160        The method will first check for a bearer token. If not found, it will
161        attempt to use client credentials.
162
163        Args:
164            api_root: The API root URL. If not provided, will be resolved from
165                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
166                the Airbyte Cloud API.
167
168        Returns:
169            A CloudClientConfig instance configured with credentials from the environment.
170
171        Raises:
172            PyAirbyteSecretNotFoundError: If required credentials are not found in
173                the environment.
174        """
175        resolved_api_root = resolve_cloud_api_url(api_root)
176
177        # Try bearer token first
178        bearer_token = resolve_cloud_bearer_token()
179        if bearer_token:
180            return cls(
181                bearer_token=bearer_token,
182                api_root=resolved_api_root,
183            )
184
185        # Fall back to client credentials
186        return cls(
187            client_id=resolve_cloud_client_id(),
188            client_secret=resolve_cloud_client_secret(),
189            api_root=resolved_api_root,
190        )

Client configuration for Airbyte Cloud API.

This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:

  1. OAuth2 client credentials flow (client_id + client_secret)
  2. Bearer token authentication

Exactly one authentication method must be provided. Providing both or neither will raise a validation error.

Attributes:
  • client_id: OAuth2 client ID for client credentials flow.
  • client_secret: OAuth2 client secret for client credentials flow.
  • bearer_token: Pre-generated bearer token for direct authentication.
  • api_root: The API root URL. Defaults to Airbyte Cloud API.
CloudClientConfig( client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, bearer_token: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1')
client_id: airbyte.secrets.SecretString | None = None

OAuth2 client ID for client credentials authentication.

client_secret: airbyte.secrets.SecretString | None = None

OAuth2 client secret for client credentials authentication.

bearer_token: airbyte.secrets.SecretString | None = None

Bearer token for direct authentication (alternative to client credentials).

api_root: str = 'https://api.airbyte.com/v1'

The API root URL. Defaults to Airbyte Cloud API.

uses_bearer_token: bool
132    @property
133    def uses_bearer_token(self) -> bool:
134        """Return True if using bearer token authentication."""
135        return self.bearer_token is not None

Return True if using bearer token authentication.

uses_client_credentials: bool
137    @property
138    def uses_client_credentials(self) -> bool:
139        """Return True if using client credentials authentication."""
140        return self.client_id is not None and self.client_secret is not None

Return True if using client credentials authentication.

@classmethod
def from_env( cls, *, api_root: str | None = None) -> CloudClientConfig:
142    @classmethod
143    def from_env(
144        cls,
145        *,
146        api_root: str | None = None,
147    ) -> CloudClientConfig:
148        """Create CloudClientConfig from environment variables.
149
150        This factory method resolves credentials from environment variables,
151        providing a convenient way to create credentials without explicitly
152        passing secrets.
153
154        Environment variables used:
155            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
156            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
157            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
158            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
159
160        The method will first check for a bearer token. If not found, it will
161        attempt to use client credentials.
162
163        Args:
164            api_root: The API root URL. If not provided, will be resolved from
165                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
166                the Airbyte Cloud API.
167
168        Returns:
169            A CloudClientConfig instance configured with credentials from the environment.
170
171        Raises:
172            PyAirbyteSecretNotFoundError: If required credentials are not found in
173                the environment.
174        """
175        resolved_api_root = resolve_cloud_api_url(api_root)
176
177        # Try bearer token first
178        bearer_token = resolve_cloud_bearer_token()
179        if bearer_token:
180            return cls(
181                bearer_token=bearer_token,
182                api_root=resolved_api_root,
183            )
184
185        # Fall back to client credentials
186        return cls(
187            client_id=resolve_cloud_client_id(),
188            client_secret=resolve_cloud_client_secret(),
189            api_root=resolved_api_root,
190        )

Create CloudClientConfig from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.

Environment variables used:
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).

The method will first check for a bearer token. If not found, it will attempt to use client credentials.

Arguments:
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudClientConfig instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259            bearer_token=self.workspace.bearer_token,
260        )
261        return self._connection_response
262
263    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
264        """Return the destination configuration for the sync job."""
265        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
266        destination_response = api_util.get_destination(
267            destination_id=connection_info.destination_id,
268            api_root=self.workspace.api_root,
269            client_id=self.workspace.client_id,
270            client_secret=self.workspace.client_secret,
271            bearer_token=self.workspace.bearer_token,
272        )
273        return asdict(destination_response.configuration)
274
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES
278
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status
282
283    def _fetch_latest_job_info(self) -> JobResponse:
284        """Return the job info for the sync job."""
285        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
286            return self._latest_job_info
287
288        self._latest_job_info = api_util.get_job_info(
289            job_id=self.job_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            bearer_token=self.workspace.bearer_token,
294        )
295        return self._latest_job_info
296
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0
301
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0
306
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise
326
327    def _fetch_job_with_attempts(self) -> dict[str, Any]:
328        """Fetch job info with attempts from Config API using lazy loading pattern."""
329        if self._job_with_attempts_info is not None:
330            return self._job_with_attempts_info
331
332        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
333            api_root=self.workspace.api_root,
334            path="/jobs/get",
335            json={
336                "id": self.job_id,
337            },
338            client_id=self.workspace.client_id,
339            client_secret=self.workspace.client_secret,
340            bearer_token=self.workspace.bearer_token,
341        )
342        return self._job_with_attempts_info
343
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]
359
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )
385
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
417
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache
426
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()
430
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
434
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)
441
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )
456
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()
461
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name
466
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names
471
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)
483
484    class _SyncResultStreams(Mapping[str, CachedDataset]):
485        """A mapping of stream names to cached datasets."""
486
487        def __init__(
488            self,
489            parent: SyncResult,
490            /,
491        ) -> None:
492            self.parent: SyncResult = parent
493
494        def __getitem__(self, key: str) -> CachedDataset:
495            return self.parent.get_dataset(stream_name=key)
496
497        def __iter__(self) -> Iterator[str]:
498            return iter(self.parent.stream_names)
499
500        def __len__(self) -> int:
501            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>