airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.client_config import CloudClientConfig
57from airbyte.cloud.connections import CloudConnection
58from airbyte.cloud.constants import JobStatusEnum
59from airbyte.cloud.sync_results import SyncResult
60from airbyte.cloud.workspaces import CloudWorkspace
61
62
63# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
64if TYPE_CHECKING:
65    # ruff: noqa: TC004
66    from airbyte.cloud import (
67        client_config,
68        connections,
69        constants,
70        sync_results,
71        workspaces,
72    )
73
74
75__all__ = [
76    # Submodules
77    "workspaces",
78    "connections",
79    "constants",
80    "client_config",
81    "sync_results",
82    # Classes
83    "CloudWorkspace",
84    "CloudConnection",
85    "CloudClientConfig",
86    "SyncResult",
87    # Enums
88    "JobStatusEnum",
89]
@dataclass
class CloudWorkspace:
 210@dataclass
 211class CloudWorkspace:
 212    """A remote workspace on the Airbyte Cloud.
 213
 214    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 215    instances, both OSS and Enterprise.
 216
 217    Two authentication methods are supported (mutually exclusive):
 218    1. OAuth2 client credentials (client_id + client_secret)
 219    2. Bearer token authentication
 220
 221    Example with client credentials:
 222        ```python
 223        workspace = CloudWorkspace(
 224            workspace_id="...",
 225            client_id="...",
 226            client_secret="...",
 227        )
 228        ```
 229
 230    Example with bearer token:
 231        ```python
 232        workspace = CloudWorkspace(
 233            workspace_id="...",
 234            bearer_token="...",
 235        )
 236        ```
 237    """
 238
 239    workspace_id: str
 240    client_id: SecretString | None = None
 241    client_secret: SecretString | None = None
 242    api_root: str = api_util.CLOUD_API_ROOT
 243    bearer_token: SecretString | None = None
 244
 245    # Internal credentials object (set in __post_init__, excluded from __init__)
 246    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
 247
 248    def __post_init__(self) -> None:
 249        """Validate and initialize credentials."""
 250        # Wrap secrets in SecretString if provided
 251        if self.client_id is not None:
 252            self.client_id = SecretString(self.client_id)
 253        if self.client_secret is not None:
 254            self.client_secret = SecretString(self.client_secret)
 255        if self.bearer_token is not None:
 256            self.bearer_token = SecretString(self.bearer_token)
 257
 258        # Create internal CloudClientConfig object (validates mutual exclusivity)
 259        self._credentials = CloudClientConfig(
 260            client_id=self.client_id,
 261            client_secret=self.client_secret,
 262            bearer_token=self.bearer_token,
 263            api_root=self.api_root,
 264        )
 265
 266    @classmethod
 267    def from_env(
 268        cls,
 269        workspace_id: str | None = None,
 270        *,
 271        api_root: str | None = None,
 272    ) -> CloudWorkspace:
 273        """Create a CloudWorkspace using credentials from environment variables.
 274
 275        This factory method resolves credentials from environment variables,
 276        providing a convenient way to create a workspace without explicitly
 277        passing credentials.
 278
 279        Two authentication methods are supported (mutually exclusive):
 280        1. Bearer token (checked first)
 281        2. OAuth2 client credentials (fallback)
 282
 283        Environment variables used:
 284            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
 285            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
 286            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
 287            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
 288            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
 289
 290        Args:
 291            workspace_id: The workspace ID. If not provided, will be resolved from
 292                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
 293            api_root: The API root URL. If not provided, will be resolved from
 294                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
 295                the Airbyte Cloud API.
 296
 297        Returns:
 298            A CloudWorkspace instance configured with credentials from the environment.
 299
 300        Raises:
 301            PyAirbyteSecretNotFoundError: If required credentials are not found in
 302                the environment.
 303
 304        Example:
 305            ```python
 306            # With workspace_id from environment
 307            workspace = CloudWorkspace.from_env()
 308
 309            # With explicit workspace_id
 310            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
 311            ```
 312        """
 313        resolved_api_root = resolve_cloud_api_url(api_root)
 314
 315        # Try bearer token first
 316        bearer_token = resolve_cloud_bearer_token()
 317        if bearer_token:
 318            return cls(
 319                workspace_id=resolve_cloud_workspace_id(workspace_id),
 320                bearer_token=bearer_token,
 321                api_root=resolved_api_root,
 322            )
 323
 324        # Fall back to client credentials
 325        return cls(
 326            workspace_id=resolve_cloud_workspace_id(workspace_id),
 327            client_id=resolve_cloud_client_id(),
 328            client_secret=resolve_cloud_client_secret(),
 329            api_root=resolved_api_root,
 330        )
 331
 332    @property
 333    def workspace_url(self) -> str | None:
 334        """The web URL of the workspace."""
 335        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 336
 337    @cached_property
 338    def _organization_info(self) -> dict[str, Any]:
 339        """Fetch and cache organization info for this workspace.
 340
 341        Uses the Config API endpoint for an efficient O(1) lookup.
 342        This is an internal method; use get_organization() for public access.
 343        """
 344        return api_util.get_workspace_organization_info(
 345            workspace_id=self.workspace_id,
 346            api_root=self.api_root,
 347            client_id=self.client_id,
 348            client_secret=self.client_secret,
 349            bearer_token=self.bearer_token,
 350        )
 351
 352    @overload
 353    def get_organization(self) -> CloudOrganization: ...
 354
 355    @overload
 356    def get_organization(
 357        self,
 358        *,
 359        raise_on_error: Literal[True],
 360    ) -> CloudOrganization: ...
 361
 362    @overload
 363    def get_organization(
 364        self,
 365        *,
 366        raise_on_error: Literal[False],
 367    ) -> CloudOrganization | None: ...
 368
 369    def get_organization(
 370        self,
 371        *,
 372        raise_on_error: bool = True,
 373    ) -> CloudOrganization | None:
 374        """Get the organization this workspace belongs to.
 375
 376        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
 377        which may not be available with workspace-scoped credentials.
 378
 379        Args:
 380            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
 381                If False, returns None instead of raising.
 382
 383        Returns:
 384            CloudOrganization object with organization_id and organization_name,
 385            or None if raise_on_error=False and an error occurred.
 386
 387        Raises:
 388            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
 389                (e.g., due to insufficient permissions or missing data).
 390        """
 391        try:
 392            info = self._organization_info
 393        except (AirbyteError, NotImplementedError):
 394            if raise_on_error:
 395                raise
 396            return None
 397
 398        organization_id = info.get("organizationId")
 399        organization_name = info.get("organizationName")
 400
 401        # Validate that both organization_id and organization_name are non-null and non-empty
 402        if not organization_id or not organization_name:
 403            if raise_on_error:
 404                raise AirbyteError(
 405                    message="Organization info is incomplete.",
 406                    context={
 407                        "organization_id": organization_id,
 408                        "organization_name": organization_name,
 409                    },
 410                )
 411            return None
 412
 413        return CloudOrganization(
 414            organization_id=organization_id,
 415            organization_name=organization_name,
 416            api_root=self.api_root,
 417            client_id=self.client_id,
 418            client_secret=self.client_secret,
 419            bearer_token=self.bearer_token,
 420        )
 421
 422    # Test connection and creds
 423
 424    def connect(self) -> None:
 425        """Check that the workspace is reachable and raise an exception otherwise.
 426
 427        Note: It is not necessary to call this method before calling other operations. It
 428              serves primarily as a simple check to ensure that the workspace is reachable
 429              and credentials are correct.
 430        """
 431        _ = api_util.get_workspace(
 432            api_root=self.api_root,
 433            workspace_id=self.workspace_id,
 434            client_id=self.client_id,
 435            client_secret=self.client_secret,
 436            bearer_token=self.bearer_token,
 437        )
 438        print(f"Successfully connected to workspace: {self.workspace_url}")
 439
 440    # Get sources, destinations, and connections
 441
 442    def get_connection(
 443        self,
 444        connection_id: str,
 445    ) -> CloudConnection:
 446        """Get a connection by ID.
 447
 448        This method does not fetch data from the API. It returns a `CloudConnection` object,
 449        which will be loaded lazily as needed.
 450        """
 451        return CloudConnection(
 452            workspace=self,
 453            connection_id=connection_id,
 454        )
 455
 456    def get_source(
 457        self,
 458        source_id: str,
 459    ) -> CloudSource:
 460        """Get a source by ID.
 461
 462        This method does not fetch data from the API. It returns a `CloudSource` object,
 463        which will be loaded lazily as needed.
 464        """
 465        return CloudSource(
 466            workspace=self,
 467            connector_id=source_id,
 468        )
 469
 470    def get_destination(
 471        self,
 472        destination_id: str,
 473    ) -> CloudDestination:
 474        """Get a destination by ID.
 475
 476        This method does not fetch data from the API. It returns a `CloudDestination` object,
 477        which will be loaded lazily as needed.
 478        """
 479        return CloudDestination(
 480            workspace=self,
 481            connector_id=destination_id,
 482        )
 483
 484    # Deploy sources and destinations
 485
 486    def deploy_source(
 487        self,
 488        name: str,
 489        source: Source,
 490        *,
 491        unique: bool = True,
 492        random_name_suffix: bool = False,
 493    ) -> CloudSource:
 494        """Deploy a source to the workspace.
 495
 496        Returns the newly deployed source.
 497
 498        Args:
 499            name: The name to use when deploying.
 500            source: The source object to deploy.
 501            unique: Whether to require a unique name. If `True`, duplicate names
 502                are not allowed. Defaults to `True`.
 503            random_name_suffix: Whether to append a random suffix to the name.
 504        """
 505        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 506        source_config_dict["sourceType"] = source.name.replace("source-", "")
 507
 508        if random_name_suffix:
 509            name += f" (ID: {text_util.generate_random_suffix()})"
 510
 511        if unique:
 512            existing = self.list_sources(name=name)
 513            if existing:
 514                raise exc.AirbyteDuplicateResourcesError(
 515                    resource_type="source",
 516                    resource_name=name,
 517                )
 518
 519        deployed_source = api_util.create_source(
 520            name=name,
 521            api_root=self.api_root,
 522            workspace_id=self.workspace_id,
 523            config=source_config_dict,
 524            client_id=self.client_id,
 525            client_secret=self.client_secret,
 526            bearer_token=self.bearer_token,
 527        )
 528        return CloudSource(
 529            workspace=self,
 530            connector_id=deployed_source.source_id,
 531        )
 532
 533    def deploy_destination(
 534        self,
 535        name: str,
 536        destination: Destination | dict[str, Any],
 537        *,
 538        unique: bool = True,
 539        random_name_suffix: bool = False,
 540    ) -> CloudDestination:
 541        """Deploy a destination to the workspace.
 542
 543        Returns the newly deployed destination ID.
 544
 545        Args:
 546            name: The name to use when deploying.
 547            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
 548                dictionary of configuration values.
 549            unique: Whether to require a unique name. If `True`, duplicate names
 550                are not allowed. Defaults to `True`.
 551            random_name_suffix: Whether to append a random suffix to the name.
 552        """
 553        if isinstance(destination, Destination):
 554            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
 555            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
 556            # raise ValueError(destination_conf_dict)
 557        else:
 558            destination_conf_dict = destination.copy()
 559            if "destinationType" not in destination_conf_dict:
 560                raise exc.PyAirbyteInputError(
 561                    message="Missing `destinationType` in configuration dictionary.",
 562                )
 563
 564        if random_name_suffix:
 565            name += f" (ID: {text_util.generate_random_suffix()})"
 566
 567        if unique:
 568            existing = self.list_destinations(name=name)
 569            if existing:
 570                raise exc.AirbyteDuplicateResourcesError(
 571                    resource_type="destination",
 572                    resource_name=name,
 573                )
 574
 575        deployed_destination = api_util.create_destination(
 576            name=name,
 577            api_root=self.api_root,
 578            workspace_id=self.workspace_id,
 579            config=destination_conf_dict,  # Wants a dataclass but accepts dict
 580            client_id=self.client_id,
 581            client_secret=self.client_secret,
 582            bearer_token=self.bearer_token,
 583        )
 584        return CloudDestination(
 585            workspace=self,
 586            connector_id=deployed_destination.destination_id,
 587        )
 588
 589    def permanently_delete_source(
 590        self,
 591        source: str | CloudSource,
 592        *,
 593        safe_mode: bool = True,
 594    ) -> None:
 595        """Delete a source from the workspace.
 596
 597        You can pass either the source ID `str` or a deployed `Source` object.
 598
 599        Args:
 600            source: The source ID or CloudSource object to delete
 601            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
 602                (case insensitive) to prevent accidental deletion. Defaults to True.
 603        """
 604        if not isinstance(source, (str, CloudSource)):
 605            raise exc.PyAirbyteInputError(
 606                message="Invalid source type.",
 607                input_value=type(source).__name__,
 608            )
 609
 610        api_util.delete_source(
 611            source_id=source.connector_id if isinstance(source, CloudSource) else source,
 612            source_name=source.name if isinstance(source, CloudSource) else None,
 613            api_root=self.api_root,
 614            client_id=self.client_id,
 615            client_secret=self.client_secret,
 616            bearer_token=self.bearer_token,
 617            safe_mode=safe_mode,
 618        )
 619
 620    # Deploy and delete destinations
 621
 622    def permanently_delete_destination(
 623        self,
 624        destination: str | CloudDestination,
 625        *,
 626        safe_mode: bool = True,
 627    ) -> None:
 628        """Delete a deployed destination from the workspace.
 629
 630        You can pass either the `Cache` class or the deployed destination ID as a `str`.
 631
 632        Args:
 633            destination: The destination ID or CloudDestination object to delete
 634            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
 635                (case insensitive) to prevent accidental deletion. Defaults to True.
 636        """
 637        if not isinstance(destination, (str, CloudDestination)):
 638            raise exc.PyAirbyteInputError(
 639                message="Invalid destination type.",
 640                input_value=type(destination).__name__,
 641            )
 642
 643        api_util.delete_destination(
 644            destination_id=(
 645                destination if isinstance(destination, str) else destination.destination_id
 646            ),
 647            destination_name=(
 648                destination.name if isinstance(destination, CloudDestination) else None
 649            ),
 650            api_root=self.api_root,
 651            client_id=self.client_id,
 652            client_secret=self.client_secret,
 653            bearer_token=self.bearer_token,
 654            safe_mode=safe_mode,
 655        )
 656
 657    # Deploy and delete connections
 658
 659    def deploy_connection(
 660        self,
 661        connection_name: str,
 662        *,
 663        source: CloudSource | str,
 664        selected_streams: list[str],
 665        destination: CloudDestination | str,
 666        table_prefix: str | None = None,
 667    ) -> CloudConnection:
 668        """Create a new connection between an already deployed source and destination.
 669
 670        Returns the newly deployed connection object.
 671
 672        Args:
 673            connection_name: The name of the connection.
 674            source: The deployed source. You can pass a source ID or a CloudSource object.
 675            destination: The deployed destination. You can pass a destination ID or a
 676                CloudDestination object.
 677            table_prefix: Optional. The table prefix to use when syncing to the destination.
 678            selected_streams: The selected stream names to sync within the connection.
 679        """
 680        if not selected_streams:
 681            raise exc.PyAirbyteInputError(
 682                guidance="You must provide `selected_streams` when creating a connection."
 683            )
 684
 685        source_id: str = source if isinstance(source, str) else source.connector_id
 686        destination_id: str = (
 687            destination if isinstance(destination, str) else destination.connector_id
 688        )
 689
 690        deployed_connection = api_util.create_connection(
 691            name=connection_name,
 692            source_id=source_id,
 693            destination_id=destination_id,
 694            api_root=self.api_root,
 695            workspace_id=self.workspace_id,
 696            selected_stream_names=selected_streams,
 697            prefix=table_prefix or "",
 698            client_id=self.client_id,
 699            client_secret=self.client_secret,
 700            bearer_token=self.bearer_token,
 701        )
 702
 703        return CloudConnection(
 704            workspace=self,
 705            connection_id=deployed_connection.connection_id,
 706            source=deployed_connection.source_id,
 707            destination=deployed_connection.destination_id,
 708        )
 709
 710    def permanently_delete_connection(
 711        self,
 712        connection: str | CloudConnection,
 713        *,
 714        cascade_delete_source: bool = False,
 715        cascade_delete_destination: bool = False,
 716        safe_mode: bool = True,
 717    ) -> None:
 718        """Delete a deployed connection from the workspace.
 719
 720        Args:
 721            connection: The connection ID or CloudConnection object to delete
 722            cascade_delete_source: If True, also delete the source after deleting the connection
 723            cascade_delete_destination: If True, also delete the destination after deleting
 724                the connection
 725            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
 726                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
 727                to cascade deletes.
 728        """
 729        if connection is None:
 730            raise ValueError("No connection ID provided.")
 731
 732        if isinstance(connection, str):
 733            connection = CloudConnection(
 734                workspace=self,
 735                connection_id=connection,
 736            )
 737
 738        api_util.delete_connection(
 739            connection_id=connection.connection_id,
 740            connection_name=connection.name,
 741            api_root=self.api_root,
 742            workspace_id=self.workspace_id,
 743            client_id=self.client_id,
 744            client_secret=self.client_secret,
 745            bearer_token=self.bearer_token,
 746            safe_mode=safe_mode,
 747        )
 748
 749        if cascade_delete_source:
 750            self.permanently_delete_source(
 751                source=connection.source_id,
 752                safe_mode=safe_mode,
 753            )
 754        if cascade_delete_destination:
 755            self.permanently_delete_destination(
 756                destination=connection.destination_id,
 757                safe_mode=safe_mode,
 758            )
 759
 760    # List sources, destinations, and connections
 761
 762    def list_connections(
 763        self,
 764        name: str | None = None,
 765        *,
 766        name_filter: Callable | None = None,
 767    ) -> list[CloudConnection]:
 768        """List connections by name in the workspace.
 769
 770        TODO: Add pagination support
 771        """
 772        connections = api_util.list_connections(
 773            api_root=self.api_root,
 774            workspace_id=self.workspace_id,
 775            name=name,
 776            name_filter=name_filter,
 777            client_id=self.client_id,
 778            client_secret=self.client_secret,
 779            bearer_token=self.bearer_token,
 780        )
 781        return [
 782            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
 783                workspace=self,
 784                connection_response=connection,
 785            )
 786            for connection in connections
 787            if name is None or connection.name == name
 788        ]
 789
 790    def list_sources(
 791        self,
 792        name: str | None = None,
 793        *,
 794        name_filter: Callable | None = None,
 795    ) -> list[CloudSource]:
 796        """List all sources in the workspace.
 797
 798        TODO: Add pagination support
 799        """
 800        sources = api_util.list_sources(
 801            api_root=self.api_root,
 802            workspace_id=self.workspace_id,
 803            name=name,
 804            name_filter=name_filter,
 805            client_id=self.client_id,
 806            client_secret=self.client_secret,
 807            bearer_token=self.bearer_token,
 808        )
 809        return [
 810            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
 811                workspace=self,
 812                source_response=source,
 813            )
 814            for source in sources
 815            if name is None or source.name == name
 816        ]
 817
 818    def list_destinations(
 819        self,
 820        name: str | None = None,
 821        *,
 822        name_filter: Callable | None = None,
 823    ) -> list[CloudDestination]:
 824        """List all destinations in the workspace.
 825
 826        TODO: Add pagination support
 827        """
 828        destinations = api_util.list_destinations(
 829            api_root=self.api_root,
 830            workspace_id=self.workspace_id,
 831            name=name,
 832            name_filter=name_filter,
 833            client_id=self.client_id,
 834            client_secret=self.client_secret,
 835            bearer_token=self.bearer_token,
 836        )
 837        return [
 838            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
 839                workspace=self,
 840                destination_response=destination,
 841            )
 842            for destination in destinations
 843            if name is None or destination.name == name
 844        ]
 845
 846    def publish_custom_source_definition(
 847        self,
 848        name: str,
 849        *,
 850        manifest_yaml: dict[str, Any] | Path | str | None = None,
 851        docker_image: str | None = None,
 852        docker_tag: str | None = None,
 853        unique: bool = True,
 854        pre_validate: bool = True,
 855        testing_values: dict[str, Any] | None = None,
 856    ) -> CustomCloudSourceDefinition:
 857        """Publish a custom source connector definition.
 858
 859        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
 860        and docker_tag (for Docker connectors), but not both.
 861
 862        Args:
 863            name: Display name for the connector definition
 864            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
 865            docker_image: Docker repository (e.g., 'airbyte/source-custom')
 866            docker_tag: Docker image tag (e.g., '1.0.0')
 867            unique: Whether to enforce name uniqueness
 868            pre_validate: Whether to validate manifest client-side (YAML only)
 869            testing_values: Optional configuration values to use for testing in the
 870                Connector Builder UI. If provided, these values are stored as the complete
 871                testing values object for the connector builder project (replaces any existing
 872                values), allowing immediate test read operations.
 873
 874        Returns:
 875            CustomCloudSourceDefinition object representing the created definition
 876
 877        Raises:
 878            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
 879            AirbyteDuplicateResourcesError: If unique=True and name already exists
 880        """
 881        is_yaml = manifest_yaml is not None
 882        is_docker = docker_image is not None
 883
 884        if is_yaml == is_docker:
 885            raise exc.PyAirbyteInputError(
 886                message=(
 887                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
 888                    "docker_image + docker_tag (for Docker connectors), but not both"
 889                ),
 890                context={
 891                    "manifest_yaml_provided": is_yaml,
 892                    "docker_image_provided": is_docker,
 893                },
 894            )
 895
 896        if is_docker and docker_tag is None:
 897            raise exc.PyAirbyteInputError(
 898                message="docker_tag is required when docker_image is specified",
 899                context={"docker_image": docker_image},
 900            )
 901
 902        if unique:
 903            existing = self.list_custom_source_definitions(
 904                definition_type="yaml" if is_yaml else "docker",
 905            )
 906            if any(d.name == name for d in existing):
 907                raise exc.AirbyteDuplicateResourcesError(
 908                    resource_type="custom_source_definition",
 909                    resource_name=name,
 910                )
 911
 912        if is_yaml:
 913            manifest_dict: dict[str, Any]
 914            if isinstance(manifest_yaml, Path):
 915                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
 916            elif isinstance(manifest_yaml, str):
 917                manifest_dict = yaml.safe_load(manifest_yaml)
 918            elif manifest_yaml is not None:
 919                manifest_dict = manifest_yaml
 920            else:
 921                raise exc.PyAirbyteInputError(
 922                    message="manifest_yaml is required for YAML connectors",
 923                    context={"name": name},
 924                )
 925
 926            if pre_validate:
 927                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
 928
 929            result = api_util.create_custom_yaml_source_definition(
 930                name=name,
 931                workspace_id=self.workspace_id,
 932                manifest=manifest_dict,
 933                api_root=self.api_root,
 934                client_id=self.client_id,
 935                client_secret=self.client_secret,
 936                bearer_token=self.bearer_token,
 937            )
 938            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
 939                self, result
 940            )
 941
 942            # Set testing values if provided
 943            if testing_values is not None:
 944                custom_definition.set_testing_values(testing_values)
 945
 946            return custom_definition
 947
 948        raise NotImplementedError(
 949            "Docker custom source definitions are not yet supported. "
 950            "Only YAML manifest-based custom sources are currently available."
 951        )
 952
 953    def list_custom_source_definitions(
 954        self,
 955        *,
 956        definition_type: Literal["yaml", "docker"],
 957    ) -> list[CustomCloudSourceDefinition]:
 958        """List custom source connector definitions.
 959
 960        Args:
 961            definition_type: Connector type to list ("yaml" or "docker"). Required.
 962
 963        Returns:
 964            List of CustomCloudSourceDefinition objects matching the specified type
 965        """
 966        if definition_type == "yaml":
 967            yaml_definitions = api_util.list_custom_yaml_source_definitions(
 968                workspace_id=self.workspace_id,
 969                api_root=self.api_root,
 970                client_id=self.client_id,
 971                client_secret=self.client_secret,
 972                bearer_token=self.bearer_token,
 973            )
 974            return [
 975                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
 976                for d in yaml_definitions
 977            ]
 978
 979        raise NotImplementedError(
 980            "Docker custom source definitions are not yet supported. "
 981            "Only YAML manifest-based custom sources are currently available."
 982        )
 983
 984    def get_custom_source_definition(
 985        self,
 986        definition_id: str,
 987        *,
 988        definition_type: Literal["yaml", "docker"],
 989    ) -> CustomCloudSourceDefinition:
 990        """Get a specific custom source definition by ID.
 991
 992        Args:
 993            definition_id: The definition ID
 994            definition_type: Connector type ("yaml" or "docker"). Required.
 995
 996        Returns:
 997            CustomCloudSourceDefinition object
 998        """
 999        if definition_type == "yaml":
1000            result = api_util.get_custom_yaml_source_definition(
1001                workspace_id=self.workspace_id,
1002                definition_id=definition_id,
1003                api_root=self.api_root,
1004                client_id=self.client_id,
1005                client_secret=self.client_secret,
1006                bearer_token=self.bearer_token,
1007            )
1008            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1009
1010        raise NotImplementedError(
1011            "Docker custom source definitions are not yet supported. "
1012            "Only YAML manifest-based custom sources are currently available."
1013        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1', bearer_token: airbyte.secrets.SecretString | None = None)
workspace_id: str
client_id: airbyte.secrets.SecretString | None = None
client_secret: airbyte.secrets.SecretString | None = None
api_root: str = 'https://api.airbyte.com/v1'
bearer_token: airbyte.secrets.SecretString | None = None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None) -> CloudWorkspace:
266    @classmethod
267    def from_env(
268        cls,
269        workspace_id: str | None = None,
270        *,
271        api_root: str | None = None,
272    ) -> CloudWorkspace:
273        """Create a CloudWorkspace using credentials from environment variables.
274
275        This factory method resolves credentials from environment variables,
276        providing a convenient way to create a workspace without explicitly
277        passing credentials.
278
279        Two authentication methods are supported (mutually exclusive):
280        1. Bearer token (checked first)
281        2. OAuth2 client credentials (fallback)
282
283        Environment variables used:
284            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
285            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
286            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
287            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
288            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
289
290        Args:
291            workspace_id: The workspace ID. If not provided, will be resolved from
292                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
293            api_root: The API root URL. If not provided, will be resolved from
294                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
295                the Airbyte Cloud API.
296
297        Returns:
298            A CloudWorkspace instance configured with credentials from the environment.
299
300        Raises:
301            PyAirbyteSecretNotFoundError: If required credentials are not found in
302                the environment.
303
304        Example:
305            ```python
306            # With workspace_id from environment
307            workspace = CloudWorkspace.from_env()
308
309            # With explicit workspace_id
310            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
311            ```
312        """
313        resolved_api_root = resolve_cloud_api_url(api_root)
314
315        # Try bearer token first
316        bearer_token = resolve_cloud_bearer_token()
317        if bearer_token:
318            return cls(
319                workspace_id=resolve_cloud_workspace_id(workspace_id),
320                bearer_token=bearer_token,
321                api_root=resolved_api_root,
322            )
323
324        # Fall back to client credentials
325        return cls(
326            workspace_id=resolve_cloud_workspace_id(workspace_id),
327            client_id=resolve_cloud_client_id(),
328            client_secret=resolve_cloud_client_secret(),
329            api_root=resolved_api_root,
330        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
332    @property
333    def workspace_url(self) -> str | None:
334        """The web URL of the workspace."""
335        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> airbyte.cloud.workspaces.CloudOrganization | None:
369    def get_organization(
370        self,
371        *,
372        raise_on_error: bool = True,
373    ) -> CloudOrganization | None:
374        """Get the organization this workspace belongs to.
375
376        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
377        which may not be available with workspace-scoped credentials.
378
379        Args:
380            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
381                If False, returns None instead of raising.
382
383        Returns:
384            CloudOrganization object with organization_id and organization_name,
385            or None if raise_on_error=False and an error occurred.
386
387        Raises:
388            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
389                (e.g., due to insufficient permissions or missing data).
390        """
391        try:
392            info = self._organization_info
393        except (AirbyteError, NotImplementedError):
394            if raise_on_error:
395                raise
396            return None
397
398        organization_id = info.get("organizationId")
399        organization_name = info.get("organizationName")
400
401        # Validate that both organization_id and organization_name are non-null and non-empty
402        if not organization_id or not organization_name:
403            if raise_on_error:
404                raise AirbyteError(
405                    message="Organization info is incomplete.",
406                    context={
407                        "organization_id": organization_id,
408                        "organization_name": organization_name,
409                    },
410                )
411            return None
412
413        return CloudOrganization(
414            organization_id=organization_id,
415            organization_name=organization_name,
416            api_root=self.api_root,
417            client_id=self.client_id,
418            client_secret=self.client_secret,
419            bearer_token=self.bearer_token,
420        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
424    def connect(self) -> None:
425        """Check that the workspace is reachable and raise an exception otherwise.
426
427        Note: It is not necessary to call this method before calling other operations. It
428              serves primarily as a simple check to ensure that the workspace is reachable
429              and credentials are correct.
430        """
431        _ = api_util.get_workspace(
432            api_root=self.api_root,
433            workspace_id=self.workspace_id,
434            client_id=self.client_id,
435            client_secret=self.client_secret,
436            bearer_token=self.bearer_token,
437        )
438        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
442    def get_connection(
443        self,
444        connection_id: str,
445    ) -> CloudConnection:
446        """Get a connection by ID.
447
448        This method does not fetch data from the API. It returns a `CloudConnection` object,
449        which will be loaded lazily as needed.
450        """
451        return CloudConnection(
452            workspace=self,
453            connection_id=connection_id,
454        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
456    def get_source(
457        self,
458        source_id: str,
459    ) -> CloudSource:
460        """Get a source by ID.
461
462        This method does not fetch data from the API. It returns a `CloudSource` object,
463        which will be loaded lazily as needed.
464        """
465        return CloudSource(
466            workspace=self,
467            connector_id=source_id,
468        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
470    def get_destination(
471        self,
472        destination_id: str,
473    ) -> CloudDestination:
474        """Get a destination by ID.
475
476        This method does not fetch data from the API. It returns a `CloudDestination` object,
477        which will be loaded lazily as needed.
478        """
479        return CloudDestination(
480            workspace=self,
481            connector_id=destination_id,
482        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
486    def deploy_source(
487        self,
488        name: str,
489        source: Source,
490        *,
491        unique: bool = True,
492        random_name_suffix: bool = False,
493    ) -> CloudSource:
494        """Deploy a source to the workspace.
495
496        Returns the newly deployed source.
497
498        Args:
499            name: The name to use when deploying.
500            source: The source object to deploy.
501            unique: Whether to require a unique name. If `True`, duplicate names
502                are not allowed. Defaults to `True`.
503            random_name_suffix: Whether to append a random suffix to the name.
504        """
505        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
506        source_config_dict["sourceType"] = source.name.replace("source-", "")
507
508        if random_name_suffix:
509            name += f" (ID: {text_util.generate_random_suffix()})"
510
511        if unique:
512            existing = self.list_sources(name=name)
513            if existing:
514                raise exc.AirbyteDuplicateResourcesError(
515                    resource_type="source",
516                    resource_name=name,
517                )
518
519        deployed_source = api_util.create_source(
520            name=name,
521            api_root=self.api_root,
522            workspace_id=self.workspace_id,
523            config=source_config_dict,
524            client_id=self.client_id,
525            client_secret=self.client_secret,
526            bearer_token=self.bearer_token,
527        )
528        return CloudSource(
529            workspace=self,
530            connector_id=deployed_source.source_id,
531        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
533    def deploy_destination(
534        self,
535        name: str,
536        destination: Destination | dict[str, Any],
537        *,
538        unique: bool = True,
539        random_name_suffix: bool = False,
540    ) -> CloudDestination:
541        """Deploy a destination to the workspace.
542
543        Returns the newly deployed destination ID.
544
545        Args:
546            name: The name to use when deploying.
547            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
548                dictionary of configuration values.
549            unique: Whether to require a unique name. If `True`, duplicate names
550                are not allowed. Defaults to `True`.
551            random_name_suffix: Whether to append a random suffix to the name.
552        """
553        if isinstance(destination, Destination):
554            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
555            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
556            # raise ValueError(destination_conf_dict)
557        else:
558            destination_conf_dict = destination.copy()
559            if "destinationType" not in destination_conf_dict:
560                raise exc.PyAirbyteInputError(
561                    message="Missing `destinationType` in configuration dictionary.",
562                )
563
564        if random_name_suffix:
565            name += f" (ID: {text_util.generate_random_suffix()})"
566
567        if unique:
568            existing = self.list_destinations(name=name)
569            if existing:
570                raise exc.AirbyteDuplicateResourcesError(
571                    resource_type="destination",
572                    resource_name=name,
573                )
574
575        deployed_destination = api_util.create_destination(
576            name=name,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            config=destination_conf_dict,  # Wants a dataclass but accepts dict
580            client_id=self.client_id,
581            client_secret=self.client_secret,
582            bearer_token=self.bearer_token,
583        )
584        return CloudDestination(
585            workspace=self,
586            connector_id=deployed_destination.destination_id,
587        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
589    def permanently_delete_source(
590        self,
591        source: str | CloudSource,
592        *,
593        safe_mode: bool = True,
594    ) -> None:
595        """Delete a source from the workspace.
596
597        You can pass either the source ID `str` or a deployed `Source` object.
598
599        Args:
600            source: The source ID or CloudSource object to delete
601            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
602                (case insensitive) to prevent accidental deletion. Defaults to True.
603        """
604        if not isinstance(source, (str, CloudSource)):
605            raise exc.PyAirbyteInputError(
606                message="Invalid source type.",
607                input_value=type(source).__name__,
608            )
609
610        api_util.delete_source(
611            source_id=source.connector_id if isinstance(source, CloudSource) else source,
612            source_name=source.name if isinstance(source, CloudSource) else None,
613            api_root=self.api_root,
614            client_id=self.client_id,
615            client_secret=self.client_secret,
616            bearer_token=self.bearer_token,
617            safe_mode=safe_mode,
618        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
622    def permanently_delete_destination(
623        self,
624        destination: str | CloudDestination,
625        *,
626        safe_mode: bool = True,
627    ) -> None:
628        """Delete a deployed destination from the workspace.
629
630        You can pass either the `Cache` class or the deployed destination ID as a `str`.
631
632        Args:
633            destination: The destination ID or CloudDestination object to delete
634            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
635                (case insensitive) to prevent accidental deletion. Defaults to True.
636        """
637        if not isinstance(destination, (str, CloudDestination)):
638            raise exc.PyAirbyteInputError(
639                message="Invalid destination type.",
640                input_value=type(destination).__name__,
641            )
642
643        api_util.delete_destination(
644            destination_id=(
645                destination if isinstance(destination, str) else destination.destination_id
646            ),
647            destination_name=(
648                destination.name if isinstance(destination, CloudDestination) else None
649            ),
650            api_root=self.api_root,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654            safe_mode=safe_mode,
655        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
659    def deploy_connection(
660        self,
661        connection_name: str,
662        *,
663        source: CloudSource | str,
664        selected_streams: list[str],
665        destination: CloudDestination | str,
666        table_prefix: str | None = None,
667    ) -> CloudConnection:
668        """Create a new connection between an already deployed source and destination.
669
670        Returns the newly deployed connection object.
671
672        Args:
673            connection_name: The name of the connection.
674            source: The deployed source. You can pass a source ID or a CloudSource object.
675            destination: The deployed destination. You can pass a destination ID or a
676                CloudDestination object.
677            table_prefix: Optional. The table prefix to use when syncing to the destination.
678            selected_streams: The selected stream names to sync within the connection.
679        """
680        if not selected_streams:
681            raise exc.PyAirbyteInputError(
682                guidance="You must provide `selected_streams` when creating a connection."
683            )
684
685        source_id: str = source if isinstance(source, str) else source.connector_id
686        destination_id: str = (
687            destination if isinstance(destination, str) else destination.connector_id
688        )
689
690        deployed_connection = api_util.create_connection(
691            name=connection_name,
692            source_id=source_id,
693            destination_id=destination_id,
694            api_root=self.api_root,
695            workspace_id=self.workspace_id,
696            selected_stream_names=selected_streams,
697            prefix=table_prefix or "",
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701        )
702
703        return CloudConnection(
704            workspace=self,
705            connection_id=deployed_connection.connection_id,
706            source=deployed_connection.source_id,
707            destination=deployed_connection.destination_id,
708        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
710    def permanently_delete_connection(
711        self,
712        connection: str | CloudConnection,
713        *,
714        cascade_delete_source: bool = False,
715        cascade_delete_destination: bool = False,
716        safe_mode: bool = True,
717    ) -> None:
718        """Delete a deployed connection from the workspace.
719
720        Args:
721            connection: The connection ID or CloudConnection object to delete
722            cascade_delete_source: If True, also delete the source after deleting the connection
723            cascade_delete_destination: If True, also delete the destination after deleting
724                the connection
725            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
726                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
727                to cascade deletes.
728        """
729        if connection is None:
730            raise ValueError("No connection ID provided.")
731
732        if isinstance(connection, str):
733            connection = CloudConnection(
734                workspace=self,
735                connection_id=connection,
736            )
737
738        api_util.delete_connection(
739            connection_id=connection.connection_id,
740            connection_name=connection.name,
741            api_root=self.api_root,
742            workspace_id=self.workspace_id,
743            client_id=self.client_id,
744            client_secret=self.client_secret,
745            bearer_token=self.bearer_token,
746            safe_mode=safe_mode,
747        )
748
749        if cascade_delete_source:
750            self.permanently_delete_source(
751                source=connection.source_id,
752                safe_mode=safe_mode,
753            )
754        if cascade_delete_destination:
755            self.permanently_delete_destination(
756                destination=connection.destination_id,
757                safe_mode=safe_mode,
758            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
762    def list_connections(
763        self,
764        name: str | None = None,
765        *,
766        name_filter: Callable | None = None,
767    ) -> list[CloudConnection]:
768        """List connections by name in the workspace.
769
770        TODO: Add pagination support
771        """
772        connections = api_util.list_connections(
773            api_root=self.api_root,
774            workspace_id=self.workspace_id,
775            name=name,
776            name_filter=name_filter,
777            client_id=self.client_id,
778            client_secret=self.client_secret,
779            bearer_token=self.bearer_token,
780        )
781        return [
782            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
783                workspace=self,
784                connection_response=connection,
785            )
786            for connection in connections
787            if name is None or connection.name == name
788        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
790    def list_sources(
791        self,
792        name: str | None = None,
793        *,
794        name_filter: Callable | None = None,
795    ) -> list[CloudSource]:
796        """List all sources in the workspace.
797
798        TODO: Add pagination support
799        """
800        sources = api_util.list_sources(
801            api_root=self.api_root,
802            workspace_id=self.workspace_id,
803            name=name,
804            name_filter=name_filter,
805            client_id=self.client_id,
806            client_secret=self.client_secret,
807            bearer_token=self.bearer_token,
808        )
809        return [
810            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
811                workspace=self,
812                source_response=source,
813            )
814            for source in sources
815            if name is None or source.name == name
816        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
818    def list_destinations(
819        self,
820        name: str | None = None,
821        *,
822        name_filter: Callable | None = None,
823    ) -> list[CloudDestination]:
824        """List all destinations in the workspace.
825
826        TODO: Add pagination support
827        """
828        destinations = api_util.list_destinations(
829            api_root=self.api_root,
830            workspace_id=self.workspace_id,
831            name=name,
832            name_filter=name_filter,
833            client_id=self.client_id,
834            client_secret=self.client_secret,
835            bearer_token=self.bearer_token,
836        )
837        return [
838            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
839                workspace=self,
840                destination_response=destination,
841            )
842            for destination in destinations
843            if name is None or destination.name == name
844        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
846    def publish_custom_source_definition(
847        self,
848        name: str,
849        *,
850        manifest_yaml: dict[str, Any] | Path | str | None = None,
851        docker_image: str | None = None,
852        docker_tag: str | None = None,
853        unique: bool = True,
854        pre_validate: bool = True,
855        testing_values: dict[str, Any] | None = None,
856    ) -> CustomCloudSourceDefinition:
857        """Publish a custom source connector definition.
858
859        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
860        and docker_tag (for Docker connectors), but not both.
861
862        Args:
863            name: Display name for the connector definition
864            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
865            docker_image: Docker repository (e.g., 'airbyte/source-custom')
866            docker_tag: Docker image tag (e.g., '1.0.0')
867            unique: Whether to enforce name uniqueness
868            pre_validate: Whether to validate manifest client-side (YAML only)
869            testing_values: Optional configuration values to use for testing in the
870                Connector Builder UI. If provided, these values are stored as the complete
871                testing values object for the connector builder project (replaces any existing
872                values), allowing immediate test read operations.
873
874        Returns:
875            CustomCloudSourceDefinition object representing the created definition
876
877        Raises:
878            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
879            AirbyteDuplicateResourcesError: If unique=True and name already exists
880        """
881        is_yaml = manifest_yaml is not None
882        is_docker = docker_image is not None
883
884        if is_yaml == is_docker:
885            raise exc.PyAirbyteInputError(
886                message=(
887                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
888                    "docker_image + docker_tag (for Docker connectors), but not both"
889                ),
890                context={
891                    "manifest_yaml_provided": is_yaml,
892                    "docker_image_provided": is_docker,
893                },
894            )
895
896        if is_docker and docker_tag is None:
897            raise exc.PyAirbyteInputError(
898                message="docker_tag is required when docker_image is specified",
899                context={"docker_image": docker_image},
900            )
901
902        if unique:
903            existing = self.list_custom_source_definitions(
904                definition_type="yaml" if is_yaml else "docker",
905            )
906            if any(d.name == name for d in existing):
907                raise exc.AirbyteDuplicateResourcesError(
908                    resource_type="custom_source_definition",
909                    resource_name=name,
910                )
911
912        if is_yaml:
913            manifest_dict: dict[str, Any]
914            if isinstance(manifest_yaml, Path):
915                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
916            elif isinstance(manifest_yaml, str):
917                manifest_dict = yaml.safe_load(manifest_yaml)
918            elif manifest_yaml is not None:
919                manifest_dict = manifest_yaml
920            else:
921                raise exc.PyAirbyteInputError(
922                    message="manifest_yaml is required for YAML connectors",
923                    context={"name": name},
924                )
925
926            if pre_validate:
927                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
928
929            result = api_util.create_custom_yaml_source_definition(
930                name=name,
931                workspace_id=self.workspace_id,
932                manifest=manifest_dict,
933                api_root=self.api_root,
934                client_id=self.client_id,
935                client_secret=self.client_secret,
936                bearer_token=self.bearer_token,
937            )
938            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
939                self, result
940            )
941
942            # Set testing values if provided
943            if testing_values is not None:
944                custom_definition.set_testing_values(testing_values)
945
946            return custom_definition
947
948        raise NotImplementedError(
949            "Docker custom source definitions are not yet supported. "
950            "Only YAML manifest-based custom sources are currently available."
951        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
953    def list_custom_source_definitions(
954        self,
955        *,
956        definition_type: Literal["yaml", "docker"],
957    ) -> list[CustomCloudSourceDefinition]:
958        """List custom source connector definitions.
959
960        Args:
961            definition_type: Connector type to list ("yaml" or "docker"). Required.
962
963        Returns:
964            List of CustomCloudSourceDefinition objects matching the specified type
965        """
966        if definition_type == "yaml":
967            yaml_definitions = api_util.list_custom_yaml_source_definitions(
968                workspace_id=self.workspace_id,
969                api_root=self.api_root,
970                client_id=self.client_id,
971                client_secret=self.client_secret,
972                bearer_token=self.bearer_token,
973            )
974            return [
975                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
976                for d in yaml_definitions
977            ]
978
979        raise NotImplementedError(
980            "Docker custom source definitions are not yet supported. "
981            "Only YAML manifest-based custom sources are currently available."
982        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
 984    def get_custom_source_definition(
 985        self,
 986        definition_id: str,
 987        *,
 988        definition_type: Literal["yaml", "docker"],
 989    ) -> CustomCloudSourceDefinition:
 990        """Get a specific custom source definition by ID.
 991
 992        Args:
 993            definition_id: The definition ID
 994            definition_type: Connector type ("yaml" or "docker"). Required.
 995
 996        Returns:
 997            CustomCloudSourceDefinition object
 998        """
 999        if definition_type == "yaml":
1000            result = api_util.get_custom_yaml_source_definition(
1001                workspace_id=self.workspace_id,
1002                definition_id=definition_id,
1003                api_root=self.api_root,
1004                client_id=self.client_id,
1005                client_secret=self.client_secret,
1006                bearer_token=self.bearer_token,
1007            )
1008            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
1009
1010        raise NotImplementedError(
1011            "Docker custom source definitions are not yet supported. "
1012            "Only YAML manifest-based custom sources are currently available."
1013        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object

class CloudConnection:
 32class CloudConnection:  # noqa: PLR0904  # Too many public methods
 33    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 34
 35    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 36    """
 37
 38    def __init__(
 39        self,
 40        workspace: CloudWorkspace,
 41        connection_id: str,
 42        source: str | None = None,
 43        destination: str | None = None,
 44    ) -> None:
 45        """It is not recommended to create a `CloudConnection` object directly.
 46
 47        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 48        """
 49        self.connection_id = connection_id
 50        """The ID of the connection."""
 51
 52        self.workspace = workspace
 53        """The workspace that the connection belongs to."""
 54
 55        self._source_id = source
 56        """The ID of the source."""
 57
 58        self._destination_id = destination
 59        """The ID of the destination."""
 60
 61        self._connection_info: ConnectionResponse | None = None
 62        """The connection info object. (Cached.)"""
 63
 64        self._cloud_source_object: CloudSource | None = None
 65        """The source object. (Cached.)"""
 66
 67        self._cloud_destination_object: CloudDestination | None = None
 68        """The destination object. (Cached.)"""
 69
 70    def _fetch_connection_info(
 71        self,
 72        *,
 73        force_refresh: bool = False,
 74        verify: bool = True,
 75    ) -> ConnectionResponse:
 76        """Fetch and cache connection info from the API.
 77
 78        By default, this method will only fetch from the API if connection info is not
 79        already cached. It also verifies that the connection belongs to the expected
 80        workspace unless verification is explicitly disabled.
 81
 82        Args:
 83            force_refresh: If True, always fetch from the API even if cached.
 84                If False (default), only fetch if not already cached.
 85            verify: If True (default), verify that the connection is valid (e.g., that
 86                the workspace_id matches this object's workspace). Raises an error if
 87                validation fails.
 88
 89        Returns:
 90            The ConnectionResponse from the API.
 91
 92        Raises:
 93            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 94                workspace_id doesn't match the expected workspace.
 95            AirbyteMissingResourceError: If the connection doesn't exist.
 96        """
 97        if not force_refresh and self._connection_info is not None:
 98            # Use cached info, but still verify if requested
 99            if verify:
100                self._verify_workspace_match(self._connection_info)
101            return self._connection_info
102
103        # Fetch from API
104        connection_info = api_util.get_connection(
105            workspace_id=self.workspace.workspace_id,
106            connection_id=self.connection_id,
107            api_root=self.workspace.api_root,
108            client_id=self.workspace.client_id,
109            client_secret=self.workspace.client_secret,
110            bearer_token=self.workspace.bearer_token,
111        )
112
113        # Cache the result first (before verification may raise)
114        self._connection_info = connection_info
115
116        # Verify if requested
117        if verify:
118            self._verify_workspace_match(connection_info)
119
120        return connection_info
121
122    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
123        """Verify that the connection belongs to the expected workspace.
124
125        Raises:
126            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
127        """
128        if connection_info.workspace_id != self.workspace.workspace_id:
129            raise AirbyteWorkspaceMismatchError(
130                resource_type="connection",
131                resource_id=self.connection_id,
132                workspace=self.workspace,
133                expected_workspace_id=self.workspace.workspace_id,
134                actual_workspace_id=connection_info.workspace_id,
135                message=(
136                    f"Connection '{self.connection_id}' belongs to workspace "
137                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
138                ),
139            )
140
141    def check_is_valid(self) -> bool:
142        """Check if this connection exists and belongs to the expected workspace.
143
144        This method fetches connection info from the API (if not already cached) and
145        verifies that the connection's workspace_id matches the workspace associated
146        with this CloudConnection object.
147
148        Returns:
149            True if the connection exists and belongs to the expected workspace.
150
151        Raises:
152            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
153            AirbyteMissingResourceError: If the connection doesn't exist.
154        """
155        self._fetch_connection_info(force_refresh=False, verify=True)
156        return True
157
158    @classmethod
159    def _from_connection_response(
160        cls,
161        workspace: CloudWorkspace,
162        connection_response: ConnectionResponse,
163    ) -> CloudConnection:
164        """Create a CloudConnection from a ConnectionResponse."""
165        result = cls(
166            workspace=workspace,
167            connection_id=connection_response.connection_id,
168            source=connection_response.source_id,
169            destination=connection_response.destination_id,
170        )
171        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
172        return result
173
174    # Properties
175
176    @property
177    def name(self) -> str | None:
178        """Get the display name of the connection, if available.
179
180        E.g. "My Postgres to Snowflake", not the connection ID.
181        """
182        if not self._connection_info:
183            self._connection_info = self._fetch_connection_info()
184
185        return self._connection_info.name
186
187    @property
188    def source_id(self) -> str:
189        """The ID of the source."""
190        if not self._source_id:
191            if not self._connection_info:
192                self._connection_info = self._fetch_connection_info()
193
194            self._source_id = self._connection_info.source_id
195
196        return self._source_id
197
198    @property
199    def source(self) -> CloudSource:
200        """Get the source object."""
201        if self._cloud_source_object:
202            return self._cloud_source_object
203
204        self._cloud_source_object = CloudSource(
205            workspace=self.workspace,
206            connector_id=self.source_id,
207        )
208        return self._cloud_source_object
209
210    @property
211    def destination_id(self) -> str:
212        """The ID of the destination."""
213        if not self._destination_id:
214            if not self._connection_info:
215                self._connection_info = self._fetch_connection_info()
216
217            self._destination_id = self._connection_info.destination_id
218
219        return self._destination_id
220
221    @property
222    def destination(self) -> CloudDestination:
223        """Get the destination object."""
224        if self._cloud_destination_object:
225            return self._cloud_destination_object
226
227        self._cloud_destination_object = CloudDestination(
228            workspace=self.workspace,
229            connector_id=self.destination_id,
230        )
231        return self._cloud_destination_object
232
233    @property
234    def stream_names(self) -> list[str]:
235        """The stream names."""
236        if not self._connection_info:
237            self._connection_info = self._fetch_connection_info()
238
239        return [stream.name for stream in self._connection_info.configurations.streams or []]
240
241    @property
242    def table_prefix(self) -> str:
243        """The table prefix."""
244        if not self._connection_info:
245            self._connection_info = self._fetch_connection_info()
246
247        return self._connection_info.prefix or ""
248
249    @property
250    def connection_url(self) -> str | None:
251        """The web URL to the connection."""
252        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
253
254    @property
255    def job_history_url(self) -> str | None:
256        """The URL to the job history for the connection."""
257        return f"{self.connection_url}/timeline"
258
259    # Run Sync
260
261    def run_sync(
262        self,
263        *,
264        wait: bool = True,
265        wait_timeout: int = 300,
266    ) -> SyncResult:
267        """Run a sync."""
268        connection_response = api_util.run_connection(
269            connection_id=self.connection_id,
270            api_root=self.workspace.api_root,
271            workspace_id=self.workspace.workspace_id,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            bearer_token=self.workspace.bearer_token,
275        )
276        sync_result = SyncResult(
277            workspace=self.workspace,
278            connection=self,
279            job_id=connection_response.job_id,
280        )
281
282        if wait:
283            sync_result.wait_for_completion(
284                wait_timeout=wait_timeout,
285                raise_failure=True,
286                raise_timeout=True,
287            )
288
289        return sync_result
290
291    def __repr__(self) -> str:
292        """String representation of the connection."""
293        return (
294            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
295            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
296        )
297
298    # Logs
299
300    def get_previous_sync_logs(
301        self,
302        *,
303        limit: int = 20,
304        offset: int | None = None,
305        from_tail: bool = True,
306        job_type: JobTypeEnum | None = None,
307    ) -> list[SyncResult]:
308        """Get previous sync jobs for a connection with pagination support.
309
310        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
311        rows_synced, start_time). Full log text can be fetched lazily via
312        `SyncResult.get_full_log_text()`.
313
314        Args:
315            limit: Maximum number of jobs to return. Defaults to 20.
316            offset: Number of jobs to skip from the beginning. Defaults to None (0).
317            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
318                If False, returns jobs ordered oldest-first (createdAt ASC).
319                Defaults to True.
320            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
321                If not specified, defaults to sync and reset jobs only (API default behavior).
322
323        Returns:
324            A list of SyncResult objects representing the sync jobs.
325        """
326        order_by = (
327            api_util.JOB_ORDER_BY_CREATED_AT_DESC
328            if from_tail
329            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
330        )
331        sync_logs: list[JobResponse] = api_util.get_job_logs(
332            connection_id=self.connection_id,
333            api_root=self.workspace.api_root,
334            workspace_id=self.workspace.workspace_id,
335            limit=limit,
336            offset=offset,
337            order_by=order_by,
338            job_type=job_type,
339            client_id=self.workspace.client_id,
340            client_secret=self.workspace.client_secret,
341            bearer_token=self.workspace.bearer_token,
342        )
343        return [
344            SyncResult(
345                workspace=self.workspace,
346                connection=self,
347                job_id=sync_log.job_id,
348                _latest_job_info=sync_log,
349            )
350            for sync_log in sync_logs
351        ]
352
353    def get_sync_result(
354        self,
355        job_id: int | None = None,
356    ) -> SyncResult | None:
357        """Get the sync result for the connection.
358
359        If `job_id` is not provided, the most recent sync job will be used.
360
361        Returns `None` if job_id is omitted and no previous jobs are found.
362        """
363        if job_id is None:
364            # Get the most recent sync job
365            results = self.get_previous_sync_logs(
366                limit=1,
367            )
368            if results:
369                return results[0]
370
371            return None
372
373        # Get the sync job by ID (lazy loaded)
374        return SyncResult(
375            workspace=self.workspace,
376            connection=self,
377            job_id=job_id,
378        )
379
380    # Artifacts
381
382    @deprecated("Use 'dump_raw_state()' instead.")
383    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
384        """Deprecated. Use `dump_raw_state()` instead."""
385        state_response = api_util.get_connection_state(
386            connection_id=self.connection_id,
387            api_root=self.workspace.api_root,
388            client_id=self.workspace.client_id,
389            client_secret=self.workspace.client_secret,
390            bearer_token=self.workspace.bearer_token,
391        )
392        if state_response.get("stateType") == "not_set":
393            return None
394        return state_response.get("streamState", [])
395
396    def dump_raw_state(self) -> dict[str, Any]:
397        """Dump the full raw state for this connection.
398
399        Returns the connection's sync state as a raw dictionary from the API.
400        The result includes stateType, connectionId, and all state data.
401
402        The output of this method can be passed directly to `import_raw_state()`
403        on the same or a different connection (connectionId is overridden on import).
404
405        Returns:
406            The full connection state as a dictionary.
407        """
408        return api_util.get_connection_state(
409            connection_id=self.connection_id,
410            api_root=self.workspace.api_root,
411            client_id=self.workspace.client_id,
412            client_secret=self.workspace.client_secret,
413            bearer_token=self.workspace.bearer_token,
414        )
415
416    def import_raw_state(
417        self,
418        connection_state_dict: dict[str, Any],
419    ) -> dict[str, Any]:
420        """Import (restore) the full raw state for this connection.
421
422        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
423        > could result in broken connections, and/or incorrect sync behavior.
424
425        Replaces the entire connection state with the provided state blob.
426        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
427
428        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
429        The `connectionId` in the blob is always overridden with this connection's
430        ID, making state blobs portable across connections.
431
432        Args:
433            connection_state_dict: The full connection state dict to import. Must include:
434                - stateType: "global", "stream", or "legacy"
435                - One of: state (legacy), streamState (stream), globalState (global)
436
437        Returns:
438            The updated connection state as a dictionary.
439
440        Raises:
441            AirbyteConnectionSyncActiveError: If a sync is currently running on this
442                connection (HTTP 423). Wait for the sync to complete before retrying.
443        """
444        return api_util.replace_connection_state(
445            connection_id=self.connection_id,
446            connection_state_dict=connection_state_dict,
447            api_root=self.workspace.api_root,
448            client_id=self.workspace.client_id,
449            client_secret=self.workspace.client_secret,
450            bearer_token=self.workspace.bearer_token,
451        )
452
453    def get_stream_state(
454        self,
455        stream_name: str,
456        stream_namespace: str | None = None,
457    ) -> dict[str, Any] | None:
458        """Get the state blob for a single stream within this connection.
459
460        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
461        not the full connection state envelope.
462
463        This is compatible with `stream`-type state and stream-level entries
464        within a `global`-type state. It is not compatible with `legacy` state.
465        To get or set the entire connection-level state artifact, use
466        `dump_raw_state` and `import_raw_state` instead.
467
468        Args:
469            stream_name: The name of the stream to get state for.
470            stream_namespace: The source-side stream namespace. This refers to the
471                namespace from the source (e.g., database schema), not any destination
472                namespace override set in connection advanced settings.
473
474        Returns:
475            The stream's state blob as a dictionary, or None if the stream is not found.
476        """
477        state_data = self.dump_raw_state()
478        result = ConnectionStateResponse(**state_data)
479
480        streams = _get_stream_list(result)
481        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
482
483        if not matching:
484            available = [s.stream_descriptor.name for s in streams]
485            logger.warning(
486                "Stream '%s' not found in connection state for connection '%s'. "
487                "Available streams: %s",
488                stream_name,
489                self.connection_id,
490                available,
491            )
492            return None
493
494        return matching[0].stream_state
495
496    def set_stream_state(
497        self,
498        stream_name: str,
499        state_blob_dict: dict[str, Any],
500        stream_namespace: str | None = None,
501    ) -> None:
502        """Set the state for a single stream within this connection.
503
504        Fetches the current full state, replaces only the specified stream's state,
505        then sends the full updated state back to the API. If the stream does not
506        exist in the current state, it is appended.
507
508        This is compatible with `stream`-type state and stream-level entries
509        within a `global`-type state. It is not compatible with `legacy` state.
510        To get or set the entire connection-level state artifact, use
511        `dump_raw_state` and `import_raw_state` instead.
512
513        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
514
515        Args:
516            stream_name: The name of the stream to update state for.
517            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
518            stream_namespace: The source-side stream namespace. This refers to the
519                namespace from the source (e.g., database schema), not any destination
520                namespace override set in connection advanced settings.
521
522        Raises:
523            PyAirbyteInputError: If the connection state type is not supported for
524                stream-level operations (not_set, legacy).
525            AirbyteConnectionSyncActiveError: If a sync is currently running on this
526                connection (HTTP 423). Wait for the sync to complete before retrying.
527        """
528        state_data = self.dump_raw_state()
529        current = ConnectionStateResponse(**state_data)
530
531        if current.state_type == "not_set":
532            raise PyAirbyteInputError(
533                message="Cannot set stream state: connection has no existing state.",
534                context={"connection_id": self.connection_id},
535            )
536
537        if current.state_type == "legacy":
538            raise PyAirbyteInputError(
539                message="Cannot set stream state on a legacy-type connection state.",
540                context={"connection_id": self.connection_id},
541            )
542
543        new_stream_entry = {
544            "streamDescriptor": {
545                "name": stream_name,
546                **(
547                    {
548                        "namespace": stream_namespace,
549                    }
550                    if stream_namespace
551                    else {}
552                ),
553            },
554            "streamState": state_blob_dict,
555        }
556
557        raw_streams: list[dict[str, Any]]
558        if current.state_type == "stream":
559            raw_streams = state_data.get("streamState", [])
560        elif current.state_type == "global":
561            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
562        else:
563            raw_streams = []
564
565        streams = _get_stream_list(current)
566        found = False
567        updated_streams_raw: list[dict[str, Any]] = []
568        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
569            if _match_stream(parsed_s, stream_name, stream_namespace):
570                updated_streams_raw.append(new_stream_entry)
571                found = True
572            else:
573                updated_streams_raw.append(raw_s)
574
575        if not found:
576            updated_streams_raw.append(new_stream_entry)
577
578        full_state: dict[str, Any] = {
579            **state_data,
580        }
581
582        if current.state_type == "stream":
583            full_state["streamState"] = updated_streams_raw
584        elif current.state_type == "global":
585            original_global = state_data.get("globalState", {})
586            full_state["globalState"] = {
587                **original_global,
588                "streamStates": updated_streams_raw,
589            }
590
591        self.import_raw_state(full_state)
592
593    @deprecated("Use 'dump_raw_catalog()' instead.")
594    def get_catalog_artifact(self) -> dict[str, Any] | None:
595        """Get the configured catalog for this connection.
596
597        Returns the full configured catalog (syncCatalog) for this connection,
598        including stream schemas, sync modes, cursor fields, and primary keys.
599
600        Uses the Config API endpoint: POST /v1/web_backend/connections/get
601
602        Returns:
603            Dictionary containing the configured catalog, or `None` if not found.
604        """
605        return self.dump_raw_catalog()
606
607    def dump_raw_catalog(self) -> dict[str, Any] | None:
608        """Dump the full configured catalog (syncCatalog) for this connection.
609
610        Returns the raw catalog dict as returned by the API, including stream
611        schemas, sync modes, cursor fields, and primary keys.
612
613        The returned dict can be passed to `import_raw_catalog()` on this or
614        another connection to restore or clone the catalog configuration.
615
616        Returns:
617            Dictionary containing the configured catalog, or `None` if not found.
618        """
619        connection_response = api_util.get_connection_catalog(
620            connection_id=self.connection_id,
621            api_root=self.workspace.api_root,
622            client_id=self.workspace.client_id,
623            client_secret=self.workspace.client_secret,
624            bearer_token=self.workspace.bearer_token,
625        )
626        return connection_response.get("syncCatalog")
627
628    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
629        """Replace the configured catalog for this connection.
630
631        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
632        > could result in broken connections, and/or incorrect sync behavior.
633
634        Accepts a configured catalog dict and replaces the connection's entire
635        catalog with it. All other connection settings remain unchanged.
636
637        The catalog shape should match the output of `dump_raw_catalog()`:
638        `{"streams": [{"stream": {...}, "config": {...}}, ...]}`.
639
640        Args:
641            catalog: The configured catalog dict to set.
642        """
643        api_util.replace_connection_catalog(
644            connection_id=self.connection_id,
645            configured_catalog_dict=catalog,
646            api_root=self.workspace.api_root,
647            client_id=self.workspace.client_id,
648            client_secret=self.workspace.client_secret,
649            bearer_token=self.workspace.bearer_token,
650        )
651
652    def rename(self, name: str) -> CloudConnection:
653        """Rename the connection.
654
655        Args:
656            name: New name for the connection
657
658        Returns:
659            Updated CloudConnection object with refreshed info
660        """
661        updated_response = api_util.patch_connection(
662            connection_id=self.connection_id,
663            api_root=self.workspace.api_root,
664            client_id=self.workspace.client_id,
665            client_secret=self.workspace.client_secret,
666            bearer_token=self.workspace.bearer_token,
667            name=name,
668        )
669        self._connection_info = updated_response
670        return self
671
672    def set_table_prefix(self, prefix: str) -> CloudConnection:
673        """Set the table prefix for the connection.
674
675        Args:
676            prefix: New table prefix to use when syncing to the destination
677
678        Returns:
679            Updated CloudConnection object with refreshed info
680        """
681        updated_response = api_util.patch_connection(
682            connection_id=self.connection_id,
683            api_root=self.workspace.api_root,
684            client_id=self.workspace.client_id,
685            client_secret=self.workspace.client_secret,
686            bearer_token=self.workspace.bearer_token,
687            prefix=prefix,
688        )
689        self._connection_info = updated_response
690        return self
691
692    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
693        """Set the selected streams for the connection.
694
695        This is a destructive operation that can break existing connections if the
696        stream selection is changed incorrectly. Use with caution.
697
698        Args:
699            stream_names: List of stream names to sync
700
701        Returns:
702            Updated CloudConnection object with refreshed info
703        """
704        configurations = api_util.build_stream_configurations(stream_names)
705
706        updated_response = api_util.patch_connection(
707            connection_id=self.connection_id,
708            api_root=self.workspace.api_root,
709            client_id=self.workspace.client_id,
710            client_secret=self.workspace.client_secret,
711            bearer_token=self.workspace.bearer_token,
712            configurations=configurations,
713        )
714        self._connection_info = updated_response
715        return self
716
717    # Enable/Disable
718
719    @property
720    def enabled(self) -> bool:
721        """Get the current enabled status of the connection.
722
723        This property always fetches fresh data from the API to ensure accuracy,
724        as another process or user may have toggled the setting.
725
726        Returns:
727            True if the connection status is 'active', False otherwise.
728        """
729        connection_info = self._fetch_connection_info(force_refresh=True)
730        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
731
732    @enabled.setter
733    def enabled(self, value: bool) -> None:
734        """Set the enabled status of the connection.
735
736        Args:
737            value: True to enable (set status to 'active'), False to disable
738                (set status to 'inactive').
739        """
740        self.set_enabled(enabled=value)
741
742    def set_enabled(
743        self,
744        *,
745        enabled: bool,
746        ignore_noop: bool = True,
747    ) -> None:
748        """Set the enabled status of the connection.
749
750        Args:
751            enabled: True to enable (set status to 'active'), False to disable
752                (set status to 'inactive').
753            ignore_noop: If True (default), silently return if the connection is already
754                in the requested state. If False, raise ValueError when the requested
755                state matches the current state.
756
757        Raises:
758            ValueError: If ignore_noop is False and the connection is already in the
759                requested state.
760        """
761        # Always fetch fresh data to check current status
762        connection_info = self._fetch_connection_info(force_refresh=True)
763        current_status = connection_info.status
764        desired_status = (
765            api_util.models.ConnectionStatusEnum.ACTIVE
766            if enabled
767            else api_util.models.ConnectionStatusEnum.INACTIVE
768        )
769
770        if current_status == desired_status:
771            if ignore_noop:
772                return
773            raise ValueError(
774                f"Connection is already {'enabled' if enabled else 'disabled'}. "
775                f"Current status: {current_status}"
776            )
777
778        updated_response = api_util.patch_connection(
779            connection_id=self.connection_id,
780            api_root=self.workspace.api_root,
781            client_id=self.workspace.client_id,
782            client_secret=self.workspace.client_secret,
783            bearer_token=self.workspace.bearer_token,
784            status=desired_status,
785        )
786        self._connection_info = updated_response
787
788    # Scheduling
789
790    def set_schedule(
791        self,
792        cron_expression: str,
793    ) -> None:
794        """Set a cron schedule for the connection.
795
796        Args:
797            cron_expression: A cron expression defining when syncs should run.
798
799        Examples:
800                - "0 0 * * *" - Daily at midnight UTC
801                - "0 */6 * * *" - Every 6 hours
802                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
803        """
804        schedule = api_util.models.AirbyteAPIConnectionSchedule(
805            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
806            cron_expression=cron_expression,
807        )
808        updated_response = api_util.patch_connection(
809            connection_id=self.connection_id,
810            api_root=self.workspace.api_root,
811            client_id=self.workspace.client_id,
812            client_secret=self.workspace.client_secret,
813            bearer_token=self.workspace.bearer_token,
814            schedule=schedule,
815        )
816        self._connection_info = updated_response
817
818    def set_manual_schedule(self) -> None:
819        """Set the connection to manual scheduling.
820
821        Disables automatic syncs. Syncs will only run when manually triggered.
822        """
823        schedule = api_util.models.AirbyteAPIConnectionSchedule(
824            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
825        )
826        updated_response = api_util.patch_connection(
827            connection_id=self.connection_id,
828            api_root=self.workspace.api_root,
829            client_id=self.workspace.client_id,
830            client_secret=self.workspace.client_secret,
831            bearer_token=self.workspace.bearer_token,
832            schedule=schedule,
833        )
834        self._connection_info = updated_response
835
836    # Deletions
837
838    def permanently_delete(
839        self,
840        *,
841        cascade_delete_source: bool = False,
842        cascade_delete_destination: bool = False,
843    ) -> None:
844        """Delete the connection.
845
846        Args:
847            cascade_delete_source: Whether to also delete the source.
848            cascade_delete_destination: Whether to also delete the destination.
849        """
850        self.workspace.permanently_delete_connection(self)
851
852        if cascade_delete_source:
853            self.workspace.permanently_delete_source(self.source_id)
854
855        if cascade_delete_destination:
856            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
38    def __init__(
39        self,
40        workspace: CloudWorkspace,
41        connection_id: str,
42        source: str | None = None,
43        destination: str | None = None,
44    ) -> None:
45        """It is not recommended to create a `CloudConnection` object directly.
46
47        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
48        """
49        self.connection_id = connection_id
50        """The ID of the connection."""
51
52        self.workspace = workspace
53        """The workspace that the connection belongs to."""
54
55        self._source_id = source
56        """The ID of the source."""
57
58        self._destination_id = destination
59        """The ID of the destination."""
60
61        self._connection_info: ConnectionResponse | None = None
62        """The connection info object. (Cached.)"""
63
64        self._cloud_source_object: CloudSource | None = None
65        """The source object. (Cached.)"""
66
67        self._cloud_destination_object: CloudDestination | None = None
68        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
141    def check_is_valid(self) -> bool:
142        """Check if this connection exists and belongs to the expected workspace.
143
144        This method fetches connection info from the API (if not already cached) and
145        verifies that the connection's workspace_id matches the workspace associated
146        with this CloudConnection object.
147
148        Returns:
149            True if the connection exists and belongs to the expected workspace.
150
151        Raises:
152            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
153            AirbyteMissingResourceError: If the connection doesn't exist.
154        """
155        self._fetch_connection_info(force_refresh=False, verify=True)
156        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
176    @property
177    def name(self) -> str | None:
178        """Get the display name of the connection, if available.
179
180        E.g. "My Postgres to Snowflake", not the connection ID.
181        """
182        if not self._connection_info:
183            self._connection_info = self._fetch_connection_info()
184
185        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
187    @property
188    def source_id(self) -> str:
189        """The ID of the source."""
190        if not self._source_id:
191            if not self._connection_info:
192                self._connection_info = self._fetch_connection_info()
193
194            self._source_id = self._connection_info.source_id
195
196        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
198    @property
199    def source(self) -> CloudSource:
200        """Get the source object."""
201        if self._cloud_source_object:
202            return self._cloud_source_object
203
204        self._cloud_source_object = CloudSource(
205            workspace=self.workspace,
206            connector_id=self.source_id,
207        )
208        return self._cloud_source_object

Get the source object.

destination_id: str
210    @property
211    def destination_id(self) -> str:
212        """The ID of the destination."""
213        if not self._destination_id:
214            if not self._connection_info:
215                self._connection_info = self._fetch_connection_info()
216
217            self._destination_id = self._connection_info.destination_id
218
219        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
221    @property
222    def destination(self) -> CloudDestination:
223        """Get the destination object."""
224        if self._cloud_destination_object:
225            return self._cloud_destination_object
226
227        self._cloud_destination_object = CloudDestination(
228            workspace=self.workspace,
229            connector_id=self.destination_id,
230        )
231        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
233    @property
234    def stream_names(self) -> list[str]:
235        """The stream names."""
236        if not self._connection_info:
237            self._connection_info = self._fetch_connection_info()
238
239        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
241    @property
242    def table_prefix(self) -> str:
243        """The table prefix."""
244        if not self._connection_info:
245            self._connection_info = self._fetch_connection_info()
246
247        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
249    @property
250    def connection_url(self) -> str | None:
251        """The web URL to the connection."""
252        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
254    @property
255    def job_history_url(self) -> str | None:
256        """The URL to the job history for the connection."""
257        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
261    def run_sync(
262        self,
263        *,
264        wait: bool = True,
265        wait_timeout: int = 300,
266    ) -> SyncResult:
267        """Run a sync."""
268        connection_response = api_util.run_connection(
269            connection_id=self.connection_id,
270            api_root=self.workspace.api_root,
271            workspace_id=self.workspace.workspace_id,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            bearer_token=self.workspace.bearer_token,
275        )
276        sync_result = SyncResult(
277            workspace=self.workspace,
278            connection=self,
279            job_id=connection_response.job_id,
280        )
281
282        if wait:
283            sync_result.wait_for_completion(
284                wait_timeout=wait_timeout,
285                raise_failure=True,
286                raise_timeout=True,
287            )
288
289        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True, job_type: airbyte_api.models.jobtypeenum.JobTypeEnum | None = None) -> list[SyncResult]:
300    def get_previous_sync_logs(
301        self,
302        *,
303        limit: int = 20,
304        offset: int | None = None,
305        from_tail: bool = True,
306        job_type: JobTypeEnum | None = None,
307    ) -> list[SyncResult]:
308        """Get previous sync jobs for a connection with pagination support.
309
310        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
311        rows_synced, start_time). Full log text can be fetched lazily via
312        `SyncResult.get_full_log_text()`.
313
314        Args:
315            limit: Maximum number of jobs to return. Defaults to 20.
316            offset: Number of jobs to skip from the beginning. Defaults to None (0).
317            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
318                If False, returns jobs ordered oldest-first (createdAt ASC).
319                Defaults to True.
320            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
321                If not specified, defaults to sync and reset jobs only (API default behavior).
322
323        Returns:
324            A list of SyncResult objects representing the sync jobs.
325        """
326        order_by = (
327            api_util.JOB_ORDER_BY_CREATED_AT_DESC
328            if from_tail
329            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
330        )
331        sync_logs: list[JobResponse] = api_util.get_job_logs(
332            connection_id=self.connection_id,
333            api_root=self.workspace.api_root,
334            workspace_id=self.workspace.workspace_id,
335            limit=limit,
336            offset=offset,
337            order_by=order_by,
338            job_type=job_type,
339            client_id=self.workspace.client_id,
340            client_secret=self.workspace.client_secret,
341            bearer_token=self.workspace.bearer_token,
342        )
343        return [
344            SyncResult(
345                workspace=self.workspace,
346                connection=self,
347                job_id=sync_log.job_id,
348                _latest_job_info=sync_log,
349            )
350            for sync_log in sync_logs
351        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
  • job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
353    def get_sync_result(
354        self,
355        job_id: int | None = None,
356    ) -> SyncResult | None:
357        """Get the sync result for the connection.
358
359        If `job_id` is not provided, the most recent sync job will be used.
360
361        Returns `None` if job_id is omitted and no previous jobs are found.
362        """
363        if job_id is None:
364            # Get the most recent sync job
365            results = self.get_previous_sync_logs(
366                limit=1,
367            )
368            if results:
369                return results[0]
370
371            return None
372
373        # Get the sync job by ID (lazy loaded)
374        return SyncResult(
375            workspace=self.workspace,
376            connection=self,
377            job_id=job_id,
378        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@deprecated("Use 'dump_raw_state()' instead.")
def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
382    @deprecated("Use 'dump_raw_state()' instead.")
383    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
384        """Deprecated. Use `dump_raw_state()` instead."""
385        state_response = api_util.get_connection_state(
386            connection_id=self.connection_id,
387            api_root=self.workspace.api_root,
388            client_id=self.workspace.client_id,
389            client_secret=self.workspace.client_secret,
390            bearer_token=self.workspace.bearer_token,
391        )
392        if state_response.get("stateType") == "not_set":
393            return None
394        return state_response.get("streamState", [])

Deprecated. Use dump_raw_state() instead.

def dump_raw_state(self) -> dict[str, typing.Any]:
396    def dump_raw_state(self) -> dict[str, Any]:
397        """Dump the full raw state for this connection.
398
399        Returns the connection's sync state as a raw dictionary from the API.
400        The result includes stateType, connectionId, and all state data.
401
402        The output of this method can be passed directly to `import_raw_state()`
403        on the same or a different connection (connectionId is overridden on import).
404
405        Returns:
406            The full connection state as a dictionary.
407        """
408        return api_util.get_connection_state(
409            connection_id=self.connection_id,
410            api_root=self.workspace.api_root,
411            client_id=self.workspace.client_id,
412            client_secret=self.workspace.client_secret,
413            bearer_token=self.workspace.bearer_token,
414        )

Dump the full raw state for this connection.

Returns the connection's sync state as a raw dictionary from the API. The result includes stateType, connectionId, and all state data.

The output of this method can be passed directly to import_raw_state() on the same or a different connection (connectionId is overridden on import).

Returns:

The full connection state as a dictionary.

def import_raw_state( self, connection_state_dict: dict[str, typing.Any]) -> dict[str, typing.Any]:
416    def import_raw_state(
417        self,
418        connection_state_dict: dict[str, Any],
419    ) -> dict[str, Any]:
420        """Import (restore) the full raw state for this connection.
421
422        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
423        > could result in broken connections, and/or incorrect sync behavior.
424
425        Replaces the entire connection state with the provided state blob.
426        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
427
428        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
429        The `connectionId` in the blob is always overridden with this connection's
430        ID, making state blobs portable across connections.
431
432        Args:
433            connection_state_dict: The full connection state dict to import. Must include:
434                - stateType: "global", "stream", or "legacy"
435                - One of: state (legacy), streamState (stream), globalState (global)
436
437        Returns:
438            The updated connection state as a dictionary.
439
440        Raises:
441            AirbyteConnectionSyncActiveError: If a sync is currently running on this
442                connection (HTTP 423). Wait for the sync to complete before retrying.
443        """
444        return api_util.replace_connection_state(
445            connection_id=self.connection_id,
446            connection_state_dict=connection_state_dict,
447            api_root=self.workspace.api_root,
448            client_id=self.workspace.client_id,
449            client_secret=self.workspace.client_secret,
450            bearer_token=self.workspace.bearer_token,
451        )

Import (restore) the full raw state for this connection.

⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

This is the counterpart to dump_raw_state() for backup/restore workflows. The connectionId in the blob is always overridden with this connection's ID, making state blobs portable across connections.

Arguments:
  • connection_state_dict: The full connection state dict to import. Must include:
    • stateType: "global", "stream", or "legacy"
    • One of: state (legacy), streamState (stream), globalState (global)
Returns:

The updated connection state as a dictionary.

Raises:
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
def get_stream_state( self, stream_name: str, stream_namespace: str | None = None) -> dict[str, typing.Any] | None:
453    def get_stream_state(
454        self,
455        stream_name: str,
456        stream_namespace: str | None = None,
457    ) -> dict[str, Any] | None:
458        """Get the state blob for a single stream within this connection.
459
460        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
461        not the full connection state envelope.
462
463        This is compatible with `stream`-type state and stream-level entries
464        within a `global`-type state. It is not compatible with `legacy` state.
465        To get or set the entire connection-level state artifact, use
466        `dump_raw_state` and `import_raw_state` instead.
467
468        Args:
469            stream_name: The name of the stream to get state for.
470            stream_namespace: The source-side stream namespace. This refers to the
471                namespace from the source (e.g., database schema), not any destination
472                namespace override set in connection advanced settings.
473
474        Returns:
475            The stream's state blob as a dictionary, or None if the stream is not found.
476        """
477        state_data = self.dump_raw_state()
478        result = ConnectionStateResponse(**state_data)
479
480        streams = _get_stream_list(result)
481        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
482
483        if not matching:
484            available = [s.stream_descriptor.name for s in streams]
485            logger.warning(
486                "Stream '%s' not found in connection state for connection '%s'. "
487                "Available streams: %s",
488                stream_name,
489                self.connection_id,
490                available,
491            )
492            return None
493
494        return matching[0].stream_state

Get the state blob for a single stream within this connection.

Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Arguments:
  • stream_name: The name of the stream to get state for.
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:

The stream's state blob as a dictionary, or None if the stream is not found.

def set_stream_state( self, stream_name: str, state_blob_dict: dict[str, typing.Any], stream_namespace: str | None = None) -> None:
496    def set_stream_state(
497        self,
498        stream_name: str,
499        state_blob_dict: dict[str, Any],
500        stream_namespace: str | None = None,
501    ) -> None:
502        """Set the state for a single stream within this connection.
503
504        Fetches the current full state, replaces only the specified stream's state,
505        then sends the full updated state back to the API. If the stream does not
506        exist in the current state, it is appended.
507
508        This is compatible with `stream`-type state and stream-level entries
509        within a `global`-type state. It is not compatible with `legacy` state.
510        To get or set the entire connection-level state artifact, use
511        `dump_raw_state` and `import_raw_state` instead.
512
513        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
514
515        Args:
516            stream_name: The name of the stream to update state for.
517            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
518            stream_namespace: The source-side stream namespace. This refers to the
519                namespace from the source (e.g., database schema), not any destination
520                namespace override set in connection advanced settings.
521
522        Raises:
523            PyAirbyteInputError: If the connection state type is not supported for
524                stream-level operations (not_set, legacy).
525            AirbyteConnectionSyncActiveError: If a sync is currently running on this
526                connection (HTTP 423). Wait for the sync to complete before retrying.
527        """
528        state_data = self.dump_raw_state()
529        current = ConnectionStateResponse(**state_data)
530
531        if current.state_type == "not_set":
532            raise PyAirbyteInputError(
533                message="Cannot set stream state: connection has no existing state.",
534                context={"connection_id": self.connection_id},
535            )
536
537        if current.state_type == "legacy":
538            raise PyAirbyteInputError(
539                message="Cannot set stream state on a legacy-type connection state.",
540                context={"connection_id": self.connection_id},
541            )
542
543        new_stream_entry = {
544            "streamDescriptor": {
545                "name": stream_name,
546                **(
547                    {
548                        "namespace": stream_namespace,
549                    }
550                    if stream_namespace
551                    else {}
552                ),
553            },
554            "streamState": state_blob_dict,
555        }
556
557        raw_streams: list[dict[str, Any]]
558        if current.state_type == "stream":
559            raw_streams = state_data.get("streamState", [])
560        elif current.state_type == "global":
561            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
562        else:
563            raw_streams = []
564
565        streams = _get_stream_list(current)
566        found = False
567        updated_streams_raw: list[dict[str, Any]] = []
568        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
569            if _match_stream(parsed_s, stream_name, stream_namespace):
570                updated_streams_raw.append(new_stream_entry)
571                found = True
572            else:
573                updated_streams_raw.append(raw_s)
574
575        if not found:
576            updated_streams_raw.append(new_stream_entry)
577
578        full_state: dict[str, Any] = {
579            **state_data,
580        }
581
582        if current.state_type == "stream":
583            full_state["streamState"] = updated_streams_raw
584        elif current.state_type == "global":
585            original_global = state_data.get("globalState", {})
586            full_state["globalState"] = {
587                **original_global,
588                "streamStates": updated_streams_raw,
589            }
590
591        self.import_raw_state(full_state)

Set the state for a single stream within this connection.

Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Uses the safe variant that prevents updates while a sync is running (HTTP 423).

Arguments:
  • stream_name: The name of the stream to update state for.
  • state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
  • PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
@deprecated("Use 'dump_raw_catalog()' instead.")
def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
593    @deprecated("Use 'dump_raw_catalog()' instead.")
594    def get_catalog_artifact(self) -> dict[str, Any] | None:
595        """Get the configured catalog for this connection.
596
597        Returns the full configured catalog (syncCatalog) for this connection,
598        including stream schemas, sync modes, cursor fields, and primary keys.
599
600        Uses the Config API endpoint: POST /v1/web_backend/connections/get
601
602        Returns:
603            Dictionary containing the configured catalog, or `None` if not found.
604        """
605        return self.dump_raw_catalog()

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def dump_raw_catalog(self) -> dict[str, typing.Any] | None:
607    def dump_raw_catalog(self) -> dict[str, Any] | None:
608        """Dump the full configured catalog (syncCatalog) for this connection.
609
610        Returns the raw catalog dict as returned by the API, including stream
611        schemas, sync modes, cursor fields, and primary keys.
612
613        The returned dict can be passed to `import_raw_catalog()` on this or
614        another connection to restore or clone the catalog configuration.
615
616        Returns:
617            Dictionary containing the configured catalog, or `None` if not found.
618        """
619        connection_response = api_util.get_connection_catalog(
620            connection_id=self.connection_id,
621            api_root=self.workspace.api_root,
622            client_id=self.workspace.client_id,
623            client_secret=self.workspace.client_secret,
624            bearer_token=self.workspace.bearer_token,
625        )
626        return connection_response.get("syncCatalog")

Dump the full configured catalog (syncCatalog) for this connection.

Returns the raw catalog dict as returned by the API, including stream schemas, sync modes, cursor fields, and primary keys.

The returned dict can be passed to import_raw_catalog() on this or another connection to restore or clone the catalog configuration.

Returns:

Dictionary containing the configured catalog, or None if not found.

def import_raw_catalog(self, catalog: dict[str, typing.Any]) -> None:
628    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
629        """Replace the configured catalog for this connection.
630
631        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
632        > could result in broken connections, and/or incorrect sync behavior.
633
634        Accepts a configured catalog dict and replaces the connection's entire
635        catalog with it. All other connection settings remain unchanged.
636
637        The catalog shape should match the output of `dump_raw_catalog()`:
638        `{"streams": [{"stream": {...}, "config": {...}}, ...]}`.
639
640        Args:
641            catalog: The configured catalog dict to set.
642        """
643        api_util.replace_connection_catalog(
644            connection_id=self.connection_id,
645            configured_catalog_dict=catalog,
646            api_root=self.workspace.api_root,
647            client_id=self.workspace.client_id,
648            client_secret=self.workspace.client_secret,
649            bearer_token=self.workspace.bearer_token,
650        )

Replace the configured catalog for this connection.

⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.

The catalog shape should match the output of dump_raw_catalog(): {"streams": [{"stream": {...}, "config": {...}}, ...]}.

Arguments:
  • catalog: The configured catalog dict to set.
def rename(self, name: str) -> CloudConnection:
652    def rename(self, name: str) -> CloudConnection:
653        """Rename the connection.
654
655        Args:
656            name: New name for the connection
657
658        Returns:
659            Updated CloudConnection object with refreshed info
660        """
661        updated_response = api_util.patch_connection(
662            connection_id=self.connection_id,
663            api_root=self.workspace.api_root,
664            client_id=self.workspace.client_id,
665            client_secret=self.workspace.client_secret,
666            bearer_token=self.workspace.bearer_token,
667            name=name,
668        )
669        self._connection_info = updated_response
670        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
672    def set_table_prefix(self, prefix: str) -> CloudConnection:
673        """Set the table prefix for the connection.
674
675        Args:
676            prefix: New table prefix to use when syncing to the destination
677
678        Returns:
679            Updated CloudConnection object with refreshed info
680        """
681        updated_response = api_util.patch_connection(
682            connection_id=self.connection_id,
683            api_root=self.workspace.api_root,
684            client_id=self.workspace.client_id,
685            client_secret=self.workspace.client_secret,
686            bearer_token=self.workspace.bearer_token,
687            prefix=prefix,
688        )
689        self._connection_info = updated_response
690        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
692    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
693        """Set the selected streams for the connection.
694
695        This is a destructive operation that can break existing connections if the
696        stream selection is changed incorrectly. Use with caution.
697
698        Args:
699            stream_names: List of stream names to sync
700
701        Returns:
702            Updated CloudConnection object with refreshed info
703        """
704        configurations = api_util.build_stream_configurations(stream_names)
705
706        updated_response = api_util.patch_connection(
707            connection_id=self.connection_id,
708            api_root=self.workspace.api_root,
709            client_id=self.workspace.client_id,
710            client_secret=self.workspace.client_secret,
711            bearer_token=self.workspace.bearer_token,
712            configurations=configurations,
713        )
714        self._connection_info = updated_response
715        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
719    @property
720    def enabled(self) -> bool:
721        """Get the current enabled status of the connection.
722
723        This property always fetches fresh data from the API to ensure accuracy,
724        as another process or user may have toggled the setting.
725
726        Returns:
727            True if the connection status is 'active', False otherwise.
728        """
729        connection_info = self._fetch_connection_info(force_refresh=True)
730        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
742    def set_enabled(
743        self,
744        *,
745        enabled: bool,
746        ignore_noop: bool = True,
747    ) -> None:
748        """Set the enabled status of the connection.
749
750        Args:
751            enabled: True to enable (set status to 'active'), False to disable
752                (set status to 'inactive').
753            ignore_noop: If True (default), silently return if the connection is already
754                in the requested state. If False, raise ValueError when the requested
755                state matches the current state.
756
757        Raises:
758            ValueError: If ignore_noop is False and the connection is already in the
759                requested state.
760        """
761        # Always fetch fresh data to check current status
762        connection_info = self._fetch_connection_info(force_refresh=True)
763        current_status = connection_info.status
764        desired_status = (
765            api_util.models.ConnectionStatusEnum.ACTIVE
766            if enabled
767            else api_util.models.ConnectionStatusEnum.INACTIVE
768        )
769
770        if current_status == desired_status:
771            if ignore_noop:
772                return
773            raise ValueError(
774                f"Connection is already {'enabled' if enabled else 'disabled'}. "
775                f"Current status: {current_status}"
776            )
777
778        updated_response = api_util.patch_connection(
779            connection_id=self.connection_id,
780            api_root=self.workspace.api_root,
781            client_id=self.workspace.client_id,
782            client_secret=self.workspace.client_secret,
783            bearer_token=self.workspace.bearer_token,
784            status=desired_status,
785        )
786        self._connection_info = updated_response

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
790    def set_schedule(
791        self,
792        cron_expression: str,
793    ) -> None:
794        """Set a cron schedule for the connection.
795
796        Args:
797            cron_expression: A cron expression defining when syncs should run.
798
799        Examples:
800                - "0 0 * * *" - Daily at midnight UTC
801                - "0 */6 * * *" - Every 6 hours
802                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
803        """
804        schedule = api_util.models.AirbyteAPIConnectionSchedule(
805            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
806            cron_expression=cron_expression,
807        )
808        updated_response = api_util.patch_connection(
809            connection_id=self.connection_id,
810            api_root=self.workspace.api_root,
811            client_id=self.workspace.client_id,
812            client_secret=self.workspace.client_secret,
813            bearer_token=self.workspace.bearer_token,
814            schedule=schedule,
815        )
816        self._connection_info = updated_response

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
818    def set_manual_schedule(self) -> None:
819        """Set the connection to manual scheduling.
820
821        Disables automatic syncs. Syncs will only run when manually triggered.
822        """
823        schedule = api_util.models.AirbyteAPIConnectionSchedule(
824            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
825        )
826        updated_response = api_util.patch_connection(
827            connection_id=self.connection_id,
828            api_root=self.workspace.api_root,
829            client_id=self.workspace.client_id,
830            client_secret=self.workspace.client_secret,
831            bearer_token=self.workspace.bearer_token,
832            schedule=schedule,
833        )
834        self._connection_info = updated_response

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
838    def permanently_delete(
839        self,
840        *,
841        cascade_delete_source: bool = False,
842        cascade_delete_destination: bool = False,
843    ) -> None:
844        """Delete the connection.
845
846        Args:
847            cascade_delete_source: Whether to also delete the source.
848            cascade_delete_destination: Whether to also delete the destination.
849        """
850        self.workspace.permanently_delete_connection(self)
851
852        if cascade_delete_source:
853            self.workspace.permanently_delete_source(self.source_id)
854
855        if cascade_delete_destination:
856            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class CloudClientConfig:
 57@dataclass
 58class CloudClientConfig:
 59    """Client configuration for Airbyte Cloud API.
 60
 61    This class encapsulates the authentication and API configuration needed to connect
 62    to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually
 63    exclusive authentication methods:
 64
 65    1. OAuth2 client credentials flow (client_id + client_secret)
 66    2. Bearer token authentication
 67
 68    Exactly one authentication method must be provided. Providing both or neither
 69    will raise a validation error.
 70
 71    Attributes:
 72        client_id: OAuth2 client ID for client credentials flow.
 73        client_secret: OAuth2 client secret for client credentials flow.
 74        bearer_token: Pre-generated bearer token for direct authentication.
 75        api_root: The API root URL. Defaults to Airbyte Cloud API.
 76    """
 77
 78    client_id: SecretString | None = None
 79    """OAuth2 client ID for client credentials authentication."""
 80
 81    client_secret: SecretString | None = None
 82    """OAuth2 client secret for client credentials authentication."""
 83
 84    bearer_token: SecretString | None = None
 85    """Bearer token for direct authentication (alternative to client credentials)."""
 86
 87    api_root: str = api_util.CLOUD_API_ROOT
 88    """The API root URL. Defaults to Airbyte Cloud API."""
 89
 90    def __post_init__(self) -> None:
 91        """Validate credentials and ensure secrets are properly wrapped."""
 92        # Wrap secrets in SecretString if they aren't already
 93        if self.client_id is not None:
 94            self.client_id = SecretString(self.client_id)
 95        if self.client_secret is not None:
 96            self.client_secret = SecretString(self.client_secret)
 97        if self.bearer_token is not None:
 98            self.bearer_token = SecretString(self.bearer_token)
 99
100        # Validate mutual exclusivity
101        has_client_credentials = self.client_id is not None or self.client_secret is not None
102        has_bearer_token = self.bearer_token is not None
103
104        if has_client_credentials and has_bearer_token:
105            raise PyAirbyteInputError(
106                message="Cannot use both client credentials and bearer token authentication.",
107                guidance=(
108                    "Provide either client_id and client_secret together, "
109                    "or bearer_token alone, but not both."
110                ),
111            )
112
113        if has_client_credentials and (self.client_id is None or self.client_secret is None):
114            # If using client credentials, both must be provided
115            raise PyAirbyteInputError(
116                message="Incomplete client credentials.",
117                guidance=(
118                    "When using client credentials authentication, "
119                    "both client_id and client_secret must be provided."
120                ),
121            )
122
123        if not has_client_credentials and not has_bearer_token:
124            raise PyAirbyteInputError(
125                message="No authentication credentials provided.",
126                guidance=(
127                    "Provide either client_id and client_secret together for OAuth2 "
128                    "client credentials flow, or bearer_token for direct authentication."
129                ),
130            )
131
132    @property
133    def uses_bearer_token(self) -> bool:
134        """Return True if using bearer token authentication."""
135        return self.bearer_token is not None
136
137    @property
138    def uses_client_credentials(self) -> bool:
139        """Return True if using client credentials authentication."""
140        return self.client_id is not None and self.client_secret is not None
141
142    @classmethod
143    def from_env(
144        cls,
145        *,
146        api_root: str | None = None,
147    ) -> CloudClientConfig:
148        """Create CloudClientConfig from environment variables.
149
150        This factory method resolves credentials from environment variables,
151        providing a convenient way to create credentials without explicitly
152        passing secrets.
153
154        Environment variables used:
155            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
156            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
157            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
158            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
159
160        The method will first check for a bearer token. If not found, it will
161        attempt to use client credentials.
162
163        Args:
164            api_root: The API root URL. If not provided, will be resolved from
165                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
166                the Airbyte Cloud API.
167
168        Returns:
169            A CloudClientConfig instance configured with credentials from the environment.
170
171        Raises:
172            PyAirbyteSecretNotFoundError: If required credentials are not found in
173                the environment.
174        """
175        resolved_api_root = resolve_cloud_api_url(api_root)
176
177        # Try bearer token first
178        bearer_token = resolve_cloud_bearer_token()
179        if bearer_token:
180            return cls(
181                bearer_token=bearer_token,
182                api_root=resolved_api_root,
183            )
184
185        # Fall back to client credentials
186        return cls(
187            client_id=resolve_cloud_client_id(),
188            client_secret=resolve_cloud_client_secret(),
189            api_root=resolved_api_root,
190        )

Client configuration for Airbyte Cloud API.

This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:

  1. OAuth2 client credentials flow (client_id + client_secret)
  2. Bearer token authentication

Exactly one authentication method must be provided. Providing both or neither will raise a validation error.

Attributes:
  • client_id: OAuth2 client ID for client credentials flow.
  • client_secret: OAuth2 client secret for client credentials flow.
  • bearer_token: Pre-generated bearer token for direct authentication.
  • api_root: The API root URL. Defaults to Airbyte Cloud API.
CloudClientConfig( client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, bearer_token: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1')
client_id: airbyte.secrets.SecretString | None = None

OAuth2 client ID for client credentials authentication.

client_secret: airbyte.secrets.SecretString | None = None

OAuth2 client secret for client credentials authentication.

bearer_token: airbyte.secrets.SecretString | None = None

Bearer token for direct authentication (alternative to client credentials).

api_root: str = 'https://api.airbyte.com/v1'

The API root URL. Defaults to Airbyte Cloud API.

uses_bearer_token: bool
132    @property
133    def uses_bearer_token(self) -> bool:
134        """Return True if using bearer token authentication."""
135        return self.bearer_token is not None

Return True if using bearer token authentication.

uses_client_credentials: bool
137    @property
138    def uses_client_credentials(self) -> bool:
139        """Return True if using client credentials authentication."""
140        return self.client_id is not None and self.client_secret is not None

Return True if using client credentials authentication.

@classmethod
def from_env( cls, *, api_root: str | None = None) -> CloudClientConfig:
142    @classmethod
143    def from_env(
144        cls,
145        *,
146        api_root: str | None = None,
147    ) -> CloudClientConfig:
148        """Create CloudClientConfig from environment variables.
149
150        This factory method resolves credentials from environment variables,
151        providing a convenient way to create credentials without explicitly
152        passing secrets.
153
154        Environment variables used:
155            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
156            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
157            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
158            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
159
160        The method will first check for a bearer token. If not found, it will
161        attempt to use client credentials.
162
163        Args:
164            api_root: The API root URL. If not provided, will be resolved from
165                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
166                the Airbyte Cloud API.
167
168        Returns:
169            A CloudClientConfig instance configured with credentials from the environment.
170
171        Raises:
172            PyAirbyteSecretNotFoundError: If required credentials are not found in
173                the environment.
174        """
175        resolved_api_root = resolve_cloud_api_url(api_root)
176
177        # Try bearer token first
178        bearer_token = resolve_cloud_bearer_token()
179        if bearer_token:
180            return cls(
181                bearer_token=bearer_token,
182                api_root=resolved_api_root,
183            )
184
185        # Fall back to client credentials
186        return cls(
187            client_id=resolve_cloud_client_id(),
188            client_secret=resolve_cloud_client_secret(),
189            api_root=resolved_api_root,
190        )

Create CloudClientConfig from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.

Environment variables used:
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).

The method will first check for a bearer token. If not found, it will attempt to use client credentials.

Arguments:
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudClientConfig instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259            bearer_token=self.workspace.bearer_token,
260        )
261        return self._connection_response
262
263    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
264        """Return the destination configuration for the sync job."""
265        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
266        destination_response = api_util.get_destination(
267            destination_id=connection_info.destination_id,
268            api_root=self.workspace.api_root,
269            client_id=self.workspace.client_id,
270            client_secret=self.workspace.client_secret,
271            bearer_token=self.workspace.bearer_token,
272        )
273        return asdict(destination_response.configuration)
274
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES
278
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status
282
283    def _fetch_latest_job_info(self) -> JobResponse:
284        """Return the job info for the sync job."""
285        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
286            return self._latest_job_info
287
288        self._latest_job_info = api_util.get_job_info(
289            job_id=self.job_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            bearer_token=self.workspace.bearer_token,
294        )
295        return self._latest_job_info
296
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0
301
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0
306
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise
326
327    def _fetch_job_with_attempts(self) -> dict[str, Any]:
328        """Fetch job info with attempts from Config API using lazy loading pattern."""
329        if self._job_with_attempts_info is not None:
330            return self._job_with_attempts_info
331
332        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
333            api_root=self.workspace.api_root,
334            path="/jobs/get",
335            json={
336                "id": self.job_id,
337            },
338            client_id=self.workspace.client_id,
339            client_secret=self.workspace.client_secret,
340            bearer_token=self.workspace.bearer_token,
341        )
342        return self._job_with_attempts_info
343
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]
359
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )
385
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
417
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache
426
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()
430
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
434
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)
441
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )
456
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()
461
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name
466
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names
471
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)
483
484    class _SyncResultStreams(Mapping[str, CachedDataset]):
485        """A mapping of stream names to cached datasets."""
486
487        def __init__(
488            self,
489            parent: SyncResult,
490            /,
491        ) -> None:
492            self.parent: SyncResult = parent
493
494        def __getitem__(self, key: str) -> CachedDataset:
495            return self.parent.get_dataset(stream_name=key)
496
497        def __iter__(self) -> Iterator[str]:
498            return iter(self.parent.stream_names)
499
500        def __len__(self) -> int:
501            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>