airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.client_config import CloudClientConfig
57from airbyte.cloud.connections import CloudConnection
58from airbyte.cloud.constants import JobStatusEnum
59from airbyte.cloud.sync_results import SyncResult
60from airbyte.cloud.workspaces import CloudWorkspace
61
62
63# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
64if TYPE_CHECKING:
65    # ruff: noqa: TC004
66    from airbyte.cloud import client_config, connections, constants, sync_results, workspaces
67
68
69__all__ = [
70    # Submodules
71    "workspaces",
72    "connections",
73    "constants",
74    "client_config",
75    "sync_results",
76    # Classes
77    "CloudWorkspace",
78    "CloudConnection",
79    "CloudClientConfig",
80    "SyncResult",
81    # Enums
82    "JobStatusEnum",
83]
@dataclass
class CloudWorkspace:
 88@dataclass
 89class CloudWorkspace:
 90    """A remote workspace on the Airbyte Cloud.
 91
 92    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 93    instances, both OSS and Enterprise.
 94
 95    Two authentication methods are supported (mutually exclusive):
 96    1. OAuth2 client credentials (client_id + client_secret)
 97    2. Bearer token authentication
 98
 99    Example with client credentials:
100        ```python
101        workspace = CloudWorkspace(
102            workspace_id="...",
103            client_id="...",
104            client_secret="...",
105        )
106        ```
107
108    Example with bearer token:
109        ```python
110        workspace = CloudWorkspace(
111            workspace_id="...",
112            bearer_token="...",
113        )
114        ```
115    """
116
117    workspace_id: str
118    client_id: SecretString | None = None
119    client_secret: SecretString | None = None
120    api_root: str = api_util.CLOUD_API_ROOT
121    bearer_token: SecretString | None = None
122
123    # Internal credentials object (set in __post_init__, excluded from __init__)
124    _credentials: CloudClientConfig | None = field(default=None, init=False, repr=False)
125
126    def __post_init__(self) -> None:
127        """Validate and initialize credentials."""
128        # Wrap secrets in SecretString if provided
129        if self.client_id is not None:
130            self.client_id = SecretString(self.client_id)
131        if self.client_secret is not None:
132            self.client_secret = SecretString(self.client_secret)
133        if self.bearer_token is not None:
134            self.bearer_token = SecretString(self.bearer_token)
135
136        # Create internal CloudClientConfig object (validates mutual exclusivity)
137        self._credentials = CloudClientConfig(
138            client_id=self.client_id,
139            client_secret=self.client_secret,
140            bearer_token=self.bearer_token,
141            api_root=self.api_root,
142        )
143
144    @classmethod
145    def from_env(
146        cls,
147        workspace_id: str | None = None,
148        *,
149        api_root: str | None = None,
150    ) -> CloudWorkspace:
151        """Create a CloudWorkspace using credentials from environment variables.
152
153        This factory method resolves credentials from environment variables,
154        providing a convenient way to create a workspace without explicitly
155        passing credentials.
156
157        Two authentication methods are supported (mutually exclusive):
158        1. Bearer token (checked first)
159        2. OAuth2 client credentials (fallback)
160
161        Environment variables used:
162            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
163            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
164            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
165            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
166            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
167
168        Args:
169            workspace_id: The workspace ID. If not provided, will be resolved from
170                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
171            api_root: The API root URL. If not provided, will be resolved from
172                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
173                the Airbyte Cloud API.
174
175        Returns:
176            A CloudWorkspace instance configured with credentials from the environment.
177
178        Raises:
179            PyAirbyteSecretNotFoundError: If required credentials are not found in
180                the environment.
181
182        Example:
183            ```python
184            # With workspace_id from environment
185            workspace = CloudWorkspace.from_env()
186
187            # With explicit workspace_id
188            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
189            ```
190        """
191        resolved_api_root = resolve_cloud_api_url(api_root)
192
193        # Try bearer token first
194        bearer_token = resolve_cloud_bearer_token()
195        if bearer_token:
196            return cls(
197                workspace_id=resolve_cloud_workspace_id(workspace_id),
198                bearer_token=bearer_token,
199                api_root=resolved_api_root,
200            )
201
202        # Fall back to client credentials
203        return cls(
204            workspace_id=resolve_cloud_workspace_id(workspace_id),
205            client_id=resolve_cloud_client_id(),
206            client_secret=resolve_cloud_client_secret(),
207            api_root=resolved_api_root,
208        )
209
210    @property
211    def workspace_url(self) -> str | None:
212        """The web URL of the workspace."""
213        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
214
215    @cached_property
216    def _organization_info(self) -> dict[str, Any]:
217        """Fetch and cache organization info for this workspace.
218
219        Uses the Config API endpoint for an efficient O(1) lookup.
220        This is an internal method; use get_organization() for public access.
221        """
222        return api_util.get_workspace_organization_info(
223            workspace_id=self.workspace_id,
224            api_root=self.api_root,
225            client_id=self.client_id,
226            client_secret=self.client_secret,
227            bearer_token=self.bearer_token,
228        )
229
230    @overload
231    def get_organization(self) -> CloudOrganization: ...
232
233    @overload
234    def get_organization(
235        self,
236        *,
237        raise_on_error: Literal[True],
238    ) -> CloudOrganization: ...
239
240    @overload
241    def get_organization(
242        self,
243        *,
244        raise_on_error: Literal[False],
245    ) -> CloudOrganization | None: ...
246
247    def get_organization(
248        self,
249        *,
250        raise_on_error: bool = True,
251    ) -> CloudOrganization | None:
252        """Get the organization this workspace belongs to.
253
254        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
255        which may not be available with workspace-scoped credentials.
256
257        Args:
258            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
259                If False, returns None instead of raising.
260
261        Returns:
262            CloudOrganization object with organization_id and organization_name,
263            or None if raise_on_error=False and an error occurred.
264
265        Raises:
266            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
267                (e.g., due to insufficient permissions or missing data).
268        """
269        try:
270            info = self._organization_info
271        except (AirbyteError, NotImplementedError):
272            if raise_on_error:
273                raise
274            return None
275
276        organization_id = info.get("organizationId")
277        organization_name = info.get("organizationName")
278
279        # Validate that both organization_id and organization_name are non-null and non-empty
280        if not organization_id or not organization_name:
281            if raise_on_error:
282                raise AirbyteError(
283                    message="Organization info is incomplete.",
284                    context={
285                        "organization_id": organization_id,
286                        "organization_name": organization_name,
287                    },
288                )
289            return None
290
291        return CloudOrganization(
292            organization_id=organization_id,
293            organization_name=organization_name,
294        )
295
296    # Test connection and creds
297
298    def connect(self) -> None:
299        """Check that the workspace is reachable and raise an exception otherwise.
300
301        Note: It is not necessary to call this method before calling other operations. It
302              serves primarily as a simple check to ensure that the workspace is reachable
303              and credentials are correct.
304        """
305        _ = api_util.get_workspace(
306            api_root=self.api_root,
307            workspace_id=self.workspace_id,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        print(f"Successfully connected to workspace: {self.workspace_url}")
313
314    # Get sources, destinations, and connections
315
316    def get_connection(
317        self,
318        connection_id: str,
319    ) -> CloudConnection:
320        """Get a connection by ID.
321
322        This method does not fetch data from the API. It returns a `CloudConnection` object,
323        which will be loaded lazily as needed.
324        """
325        return CloudConnection(
326            workspace=self,
327            connection_id=connection_id,
328        )
329
330    def get_source(
331        self,
332        source_id: str,
333    ) -> CloudSource:
334        """Get a source by ID.
335
336        This method does not fetch data from the API. It returns a `CloudSource` object,
337        which will be loaded lazily as needed.
338        """
339        return CloudSource(
340            workspace=self,
341            connector_id=source_id,
342        )
343
344    def get_destination(
345        self,
346        destination_id: str,
347    ) -> CloudDestination:
348        """Get a destination by ID.
349
350        This method does not fetch data from the API. It returns a `CloudDestination` object,
351        which will be loaded lazily as needed.
352        """
353        return CloudDestination(
354            workspace=self,
355            connector_id=destination_id,
356        )
357
358    # Deploy sources and destinations
359
360    def deploy_source(
361        self,
362        name: str,
363        source: Source,
364        *,
365        unique: bool = True,
366        random_name_suffix: bool = False,
367    ) -> CloudSource:
368        """Deploy a source to the workspace.
369
370        Returns the newly deployed source.
371
372        Args:
373            name: The name to use when deploying.
374            source: The source object to deploy.
375            unique: Whether to require a unique name. If `True`, duplicate names
376                are not allowed. Defaults to `True`.
377            random_name_suffix: Whether to append a random suffix to the name.
378        """
379        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
380        source_config_dict["sourceType"] = source.name.replace("source-", "")
381
382        if random_name_suffix:
383            name += f" (ID: {text_util.generate_random_suffix()})"
384
385        if unique:
386            existing = self.list_sources(name=name)
387            if existing:
388                raise exc.AirbyteDuplicateResourcesError(
389                    resource_type="source",
390                    resource_name=name,
391                )
392
393        deployed_source = api_util.create_source(
394            name=name,
395            api_root=self.api_root,
396            workspace_id=self.workspace_id,
397            config=source_config_dict,
398            client_id=self.client_id,
399            client_secret=self.client_secret,
400            bearer_token=self.bearer_token,
401        )
402        return CloudSource(
403            workspace=self,
404            connector_id=deployed_source.source_id,
405        )
406
407    def deploy_destination(
408        self,
409        name: str,
410        destination: Destination | dict[str, Any],
411        *,
412        unique: bool = True,
413        random_name_suffix: bool = False,
414    ) -> CloudDestination:
415        """Deploy a destination to the workspace.
416
417        Returns the newly deployed destination ID.
418
419        Args:
420            name: The name to use when deploying.
421            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
422                dictionary of configuration values.
423            unique: Whether to require a unique name. If `True`, duplicate names
424                are not allowed. Defaults to `True`.
425            random_name_suffix: Whether to append a random suffix to the name.
426        """
427        if isinstance(destination, Destination):
428            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
429            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
430            # raise ValueError(destination_conf_dict)
431        else:
432            destination_conf_dict = destination.copy()
433            if "destinationType" not in destination_conf_dict:
434                raise exc.PyAirbyteInputError(
435                    message="Missing `destinationType` in configuration dictionary.",
436                )
437
438        if random_name_suffix:
439            name += f" (ID: {text_util.generate_random_suffix()})"
440
441        if unique:
442            existing = self.list_destinations(name=name)
443            if existing:
444                raise exc.AirbyteDuplicateResourcesError(
445                    resource_type="destination",
446                    resource_name=name,
447                )
448
449        deployed_destination = api_util.create_destination(
450            name=name,
451            api_root=self.api_root,
452            workspace_id=self.workspace_id,
453            config=destination_conf_dict,  # Wants a dataclass but accepts dict
454            client_id=self.client_id,
455            client_secret=self.client_secret,
456            bearer_token=self.bearer_token,
457        )
458        return CloudDestination(
459            workspace=self,
460            connector_id=deployed_destination.destination_id,
461        )
462
463    def permanently_delete_source(
464        self,
465        source: str | CloudSource,
466        *,
467        safe_mode: bool = True,
468    ) -> None:
469        """Delete a source from the workspace.
470
471        You can pass either the source ID `str` or a deployed `Source` object.
472
473        Args:
474            source: The source ID or CloudSource object to delete
475            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
476                (case insensitive) to prevent accidental deletion. Defaults to True.
477        """
478        if not isinstance(source, (str, CloudSource)):
479            raise exc.PyAirbyteInputError(
480                message="Invalid source type.",
481                input_value=type(source).__name__,
482            )
483
484        api_util.delete_source(
485            source_id=source.connector_id if isinstance(source, CloudSource) else source,
486            source_name=source.name if isinstance(source, CloudSource) else None,
487            api_root=self.api_root,
488            client_id=self.client_id,
489            client_secret=self.client_secret,
490            bearer_token=self.bearer_token,
491            safe_mode=safe_mode,
492        )
493
494    # Deploy and delete destinations
495
496    def permanently_delete_destination(
497        self,
498        destination: str | CloudDestination,
499        *,
500        safe_mode: bool = True,
501    ) -> None:
502        """Delete a deployed destination from the workspace.
503
504        You can pass either the `Cache` class or the deployed destination ID as a `str`.
505
506        Args:
507            destination: The destination ID or CloudDestination object to delete
508            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
509                (case insensitive) to prevent accidental deletion. Defaults to True.
510        """
511        if not isinstance(destination, (str, CloudDestination)):
512            raise exc.PyAirbyteInputError(
513                message="Invalid destination type.",
514                input_value=type(destination).__name__,
515            )
516
517        api_util.delete_destination(
518            destination_id=(
519                destination if isinstance(destination, str) else destination.destination_id
520            ),
521            destination_name=(
522                destination.name if isinstance(destination, CloudDestination) else None
523            ),
524            api_root=self.api_root,
525            client_id=self.client_id,
526            client_secret=self.client_secret,
527            bearer_token=self.bearer_token,
528            safe_mode=safe_mode,
529        )
530
531    # Deploy and delete connections
532
533    def deploy_connection(
534        self,
535        connection_name: str,
536        *,
537        source: CloudSource | str,
538        selected_streams: list[str],
539        destination: CloudDestination | str,
540        table_prefix: str | None = None,
541    ) -> CloudConnection:
542        """Create a new connection between an already deployed source and destination.
543
544        Returns the newly deployed connection object.
545
546        Args:
547            connection_name: The name of the connection.
548            source: The deployed source. You can pass a source ID or a CloudSource object.
549            destination: The deployed destination. You can pass a destination ID or a
550                CloudDestination object.
551            table_prefix: Optional. The table prefix to use when syncing to the destination.
552            selected_streams: The selected stream names to sync within the connection.
553        """
554        if not selected_streams:
555            raise exc.PyAirbyteInputError(
556                guidance="You must provide `selected_streams` when creating a connection."
557            )
558
559        source_id: str = source if isinstance(source, str) else source.connector_id
560        destination_id: str = (
561            destination if isinstance(destination, str) else destination.connector_id
562        )
563
564        deployed_connection = api_util.create_connection(
565            name=connection_name,
566            source_id=source_id,
567            destination_id=destination_id,
568            api_root=self.api_root,
569            workspace_id=self.workspace_id,
570            selected_stream_names=selected_streams,
571            prefix=table_prefix or "",
572            client_id=self.client_id,
573            client_secret=self.client_secret,
574            bearer_token=self.bearer_token,
575        )
576
577        return CloudConnection(
578            workspace=self,
579            connection_id=deployed_connection.connection_id,
580            source=deployed_connection.source_id,
581            destination=deployed_connection.destination_id,
582        )
583
584    def permanently_delete_connection(
585        self,
586        connection: str | CloudConnection,
587        *,
588        cascade_delete_source: bool = False,
589        cascade_delete_destination: bool = False,
590        safe_mode: bool = True,
591    ) -> None:
592        """Delete a deployed connection from the workspace.
593
594        Args:
595            connection: The connection ID or CloudConnection object to delete
596            cascade_delete_source: If True, also delete the source after deleting the connection
597            cascade_delete_destination: If True, also delete the destination after deleting
598                the connection
599            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
600                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
601                to cascade deletes.
602        """
603        if connection is None:
604            raise ValueError("No connection ID provided.")
605
606        if isinstance(connection, str):
607            connection = CloudConnection(
608                workspace=self,
609                connection_id=connection,
610            )
611
612        api_util.delete_connection(
613            connection_id=connection.connection_id,
614            connection_name=connection.name,
615            api_root=self.api_root,
616            workspace_id=self.workspace_id,
617            client_id=self.client_id,
618            client_secret=self.client_secret,
619            bearer_token=self.bearer_token,
620            safe_mode=safe_mode,
621        )
622
623        if cascade_delete_source:
624            self.permanently_delete_source(
625                source=connection.source_id,
626                safe_mode=safe_mode,
627            )
628        if cascade_delete_destination:
629            self.permanently_delete_destination(
630                destination=connection.destination_id,
631                safe_mode=safe_mode,
632            )
633
634    # List sources, destinations, and connections
635
636    def list_connections(
637        self,
638        name: str | None = None,
639        *,
640        name_filter: Callable | None = None,
641    ) -> list[CloudConnection]:
642        """List connections by name in the workspace.
643
644        TODO: Add pagination support
645        """
646        connections = api_util.list_connections(
647            api_root=self.api_root,
648            workspace_id=self.workspace_id,
649            name=name,
650            name_filter=name_filter,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654        )
655        return [
656            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
657                workspace=self,
658                connection_response=connection,
659            )
660            for connection in connections
661            if name is None or connection.name == name
662        ]
663
664    def list_sources(
665        self,
666        name: str | None = None,
667        *,
668        name_filter: Callable | None = None,
669    ) -> list[CloudSource]:
670        """List all sources in the workspace.
671
672        TODO: Add pagination support
673        """
674        sources = api_util.list_sources(
675            api_root=self.api_root,
676            workspace_id=self.workspace_id,
677            name=name,
678            name_filter=name_filter,
679            client_id=self.client_id,
680            client_secret=self.client_secret,
681            bearer_token=self.bearer_token,
682        )
683        return [
684            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
685                workspace=self,
686                source_response=source,
687            )
688            for source in sources
689            if name is None or source.name == name
690        ]
691
692    def list_destinations(
693        self,
694        name: str | None = None,
695        *,
696        name_filter: Callable | None = None,
697    ) -> list[CloudDestination]:
698        """List all destinations in the workspace.
699
700        TODO: Add pagination support
701        """
702        destinations = api_util.list_destinations(
703            api_root=self.api_root,
704            workspace_id=self.workspace_id,
705            name=name,
706            name_filter=name_filter,
707            client_id=self.client_id,
708            client_secret=self.client_secret,
709            bearer_token=self.bearer_token,
710        )
711        return [
712            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
713                workspace=self,
714                destination_response=destination,
715            )
716            for destination in destinations
717            if name is None or destination.name == name
718        ]
719
720    def publish_custom_source_definition(
721        self,
722        name: str,
723        *,
724        manifest_yaml: dict[str, Any] | Path | str | None = None,
725        docker_image: str | None = None,
726        docker_tag: str | None = None,
727        unique: bool = True,
728        pre_validate: bool = True,
729        testing_values: dict[str, Any] | None = None,
730    ) -> CustomCloudSourceDefinition:
731        """Publish a custom source connector definition.
732
733        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
734        and docker_tag (for Docker connectors), but not both.
735
736        Args:
737            name: Display name for the connector definition
738            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
739            docker_image: Docker repository (e.g., 'airbyte/source-custom')
740            docker_tag: Docker image tag (e.g., '1.0.0')
741            unique: Whether to enforce name uniqueness
742            pre_validate: Whether to validate manifest client-side (YAML only)
743            testing_values: Optional configuration values to use for testing in the
744                Connector Builder UI. If provided, these values are stored as the complete
745                testing values object for the connector builder project (replaces any existing
746                values), allowing immediate test read operations.
747
748        Returns:
749            CustomCloudSourceDefinition object representing the created definition
750
751        Raises:
752            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
753            AirbyteDuplicateResourcesError: If unique=True and name already exists
754        """
755        is_yaml = manifest_yaml is not None
756        is_docker = docker_image is not None
757
758        if is_yaml == is_docker:
759            raise exc.PyAirbyteInputError(
760                message=(
761                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
762                    "docker_image + docker_tag (for Docker connectors), but not both"
763                ),
764                context={
765                    "manifest_yaml_provided": is_yaml,
766                    "docker_image_provided": is_docker,
767                },
768            )
769
770        if is_docker and docker_tag is None:
771            raise exc.PyAirbyteInputError(
772                message="docker_tag is required when docker_image is specified",
773                context={"docker_image": docker_image},
774            )
775
776        if unique:
777            existing = self.list_custom_source_definitions(
778                definition_type="yaml" if is_yaml else "docker",
779            )
780            if any(d.name == name for d in existing):
781                raise exc.AirbyteDuplicateResourcesError(
782                    resource_type="custom_source_definition",
783                    resource_name=name,
784                )
785
786        if is_yaml:
787            manifest_dict: dict[str, Any]
788            if isinstance(manifest_yaml, Path):
789                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
790            elif isinstance(manifest_yaml, str):
791                manifest_dict = yaml.safe_load(manifest_yaml)
792            elif manifest_yaml is not None:
793                manifest_dict = manifest_yaml
794            else:
795                raise exc.PyAirbyteInputError(
796                    message="manifest_yaml is required for YAML connectors",
797                    context={"name": name},
798                )
799
800            if pre_validate:
801                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
802
803            result = api_util.create_custom_yaml_source_definition(
804                name=name,
805                workspace_id=self.workspace_id,
806                manifest=manifest_dict,
807                api_root=self.api_root,
808                client_id=self.client_id,
809                client_secret=self.client_secret,
810                bearer_token=self.bearer_token,
811            )
812            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
813                self, result
814            )
815
816            # Set testing values if provided
817            if testing_values is not None:
818                custom_definition.set_testing_values(testing_values)
819
820            return custom_definition
821
822        raise NotImplementedError(
823            "Docker custom source definitions are not yet supported. "
824            "Only YAML manifest-based custom sources are currently available."
825        )
826
827    def list_custom_source_definitions(
828        self,
829        *,
830        definition_type: Literal["yaml", "docker"],
831    ) -> list[CustomCloudSourceDefinition]:
832        """List custom source connector definitions.
833
834        Args:
835            definition_type: Connector type to list ("yaml" or "docker"). Required.
836
837        Returns:
838            List of CustomCloudSourceDefinition objects matching the specified type
839        """
840        if definition_type == "yaml":
841            yaml_definitions = api_util.list_custom_yaml_source_definitions(
842                workspace_id=self.workspace_id,
843                api_root=self.api_root,
844                client_id=self.client_id,
845                client_secret=self.client_secret,
846                bearer_token=self.bearer_token,
847            )
848            return [
849                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
850                for d in yaml_definitions
851            ]
852
853        raise NotImplementedError(
854            "Docker custom source definitions are not yet supported. "
855            "Only YAML manifest-based custom sources are currently available."
856        )
857
858    def get_custom_source_definition(
859        self,
860        definition_id: str,
861        *,
862        definition_type: Literal["yaml", "docker"],
863    ) -> CustomCloudSourceDefinition:
864        """Get a specific custom source definition by ID.
865
866        Args:
867            definition_id: The definition ID
868            definition_type: Connector type ("yaml" or "docker"). Required.
869
870        Returns:
871            CustomCloudSourceDefinition object
872        """
873        if definition_type == "yaml":
874            result = api_util.get_custom_yaml_source_definition(
875                workspace_id=self.workspace_id,
876                definition_id=definition_id,
877                api_root=self.api_root,
878                client_id=self.client_id,
879                client_secret=self.client_secret,
880                bearer_token=self.bearer_token,
881            )
882            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1', bearer_token: airbyte.secrets.SecretString | None = None)
workspace_id: str
client_id: airbyte.secrets.SecretString | None = None
client_secret: airbyte.secrets.SecretString | None = None
api_root: str = 'https://api.airbyte.com/v1'
bearer_token: airbyte.secrets.SecretString | None = None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None) -> CloudWorkspace:
144    @classmethod
145    def from_env(
146        cls,
147        workspace_id: str | None = None,
148        *,
149        api_root: str | None = None,
150    ) -> CloudWorkspace:
151        """Create a CloudWorkspace using credentials from environment variables.
152
153        This factory method resolves credentials from environment variables,
154        providing a convenient way to create a workspace without explicitly
155        passing credentials.
156
157        Two authentication methods are supported (mutually exclusive):
158        1. Bearer token (checked first)
159        2. OAuth2 client credentials (fallback)
160
161        Environment variables used:
162            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
163            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
164            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
165            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
166            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
167
168        Args:
169            workspace_id: The workspace ID. If not provided, will be resolved from
170                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
171            api_root: The API root URL. If not provided, will be resolved from
172                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
173                the Airbyte Cloud API.
174
175        Returns:
176            A CloudWorkspace instance configured with credentials from the environment.
177
178        Raises:
179            PyAirbyteSecretNotFoundError: If required credentials are not found in
180                the environment.
181
182        Example:
183            ```python
184            # With workspace_id from environment
185            workspace = CloudWorkspace.from_env()
186
187            # With explicit workspace_id
188            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
189            ```
190        """
191        resolved_api_root = resolve_cloud_api_url(api_root)
192
193        # Try bearer token first
194        bearer_token = resolve_cloud_bearer_token()
195        if bearer_token:
196            return cls(
197                workspace_id=resolve_cloud_workspace_id(workspace_id),
198                bearer_token=bearer_token,
199                api_root=resolved_api_root,
200            )
201
202        # Fall back to client credentials
203        return cls(
204            workspace_id=resolve_cloud_workspace_id(workspace_id),
205            client_id=resolve_cloud_client_id(),
206            client_secret=resolve_cloud_client_secret(),
207            api_root=resolved_api_root,
208        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
210    @property
211    def workspace_url(self) -> str | None:
212        """The web URL of the workspace."""
213        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> airbyte.cloud.workspaces.CloudOrganization | None:
247    def get_organization(
248        self,
249        *,
250        raise_on_error: bool = True,
251    ) -> CloudOrganization | None:
252        """Get the organization this workspace belongs to.
253
254        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
255        which may not be available with workspace-scoped credentials.
256
257        Args:
258            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
259                If False, returns None instead of raising.
260
261        Returns:
262            CloudOrganization object with organization_id and organization_name,
263            or None if raise_on_error=False and an error occurred.
264
265        Raises:
266            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
267                (e.g., due to insufficient permissions or missing data).
268        """
269        try:
270            info = self._organization_info
271        except (AirbyteError, NotImplementedError):
272            if raise_on_error:
273                raise
274            return None
275
276        organization_id = info.get("organizationId")
277        organization_name = info.get("organizationName")
278
279        # Validate that both organization_id and organization_name are non-null and non-empty
280        if not organization_id or not organization_name:
281            if raise_on_error:
282                raise AirbyteError(
283                    message="Organization info is incomplete.",
284                    context={
285                        "organization_id": organization_id,
286                        "organization_name": organization_name,
287                    },
288                )
289            return None
290
291        return CloudOrganization(
292            organization_id=organization_id,
293            organization_name=organization_name,
294        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
298    def connect(self) -> None:
299        """Check that the workspace is reachable and raise an exception otherwise.
300
301        Note: It is not necessary to call this method before calling other operations. It
302              serves primarily as a simple check to ensure that the workspace is reachable
303              and credentials are correct.
304        """
305        _ = api_util.get_workspace(
306            api_root=self.api_root,
307            workspace_id=self.workspace_id,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
316    def get_connection(
317        self,
318        connection_id: str,
319    ) -> CloudConnection:
320        """Get a connection by ID.
321
322        This method does not fetch data from the API. It returns a `CloudConnection` object,
323        which will be loaded lazily as needed.
324        """
325        return CloudConnection(
326            workspace=self,
327            connection_id=connection_id,
328        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
330    def get_source(
331        self,
332        source_id: str,
333    ) -> CloudSource:
334        """Get a source by ID.
335
336        This method does not fetch data from the API. It returns a `CloudSource` object,
337        which will be loaded lazily as needed.
338        """
339        return CloudSource(
340            workspace=self,
341            connector_id=source_id,
342        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
344    def get_destination(
345        self,
346        destination_id: str,
347    ) -> CloudDestination:
348        """Get a destination by ID.
349
350        This method does not fetch data from the API. It returns a `CloudDestination` object,
351        which will be loaded lazily as needed.
352        """
353        return CloudDestination(
354            workspace=self,
355            connector_id=destination_id,
356        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
360    def deploy_source(
361        self,
362        name: str,
363        source: Source,
364        *,
365        unique: bool = True,
366        random_name_suffix: bool = False,
367    ) -> CloudSource:
368        """Deploy a source to the workspace.
369
370        Returns the newly deployed source.
371
372        Args:
373            name: The name to use when deploying.
374            source: The source object to deploy.
375            unique: Whether to require a unique name. If `True`, duplicate names
376                are not allowed. Defaults to `True`.
377            random_name_suffix: Whether to append a random suffix to the name.
378        """
379        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
380        source_config_dict["sourceType"] = source.name.replace("source-", "")
381
382        if random_name_suffix:
383            name += f" (ID: {text_util.generate_random_suffix()})"
384
385        if unique:
386            existing = self.list_sources(name=name)
387            if existing:
388                raise exc.AirbyteDuplicateResourcesError(
389                    resource_type="source",
390                    resource_name=name,
391                )
392
393        deployed_source = api_util.create_source(
394            name=name,
395            api_root=self.api_root,
396            workspace_id=self.workspace_id,
397            config=source_config_dict,
398            client_id=self.client_id,
399            client_secret=self.client_secret,
400            bearer_token=self.bearer_token,
401        )
402        return CloudSource(
403            workspace=self,
404            connector_id=deployed_source.source_id,
405        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
407    def deploy_destination(
408        self,
409        name: str,
410        destination: Destination | dict[str, Any],
411        *,
412        unique: bool = True,
413        random_name_suffix: bool = False,
414    ) -> CloudDestination:
415        """Deploy a destination to the workspace.
416
417        Returns the newly deployed destination ID.
418
419        Args:
420            name: The name to use when deploying.
421            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
422                dictionary of configuration values.
423            unique: Whether to require a unique name. If `True`, duplicate names
424                are not allowed. Defaults to `True`.
425            random_name_suffix: Whether to append a random suffix to the name.
426        """
427        if isinstance(destination, Destination):
428            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
429            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
430            # raise ValueError(destination_conf_dict)
431        else:
432            destination_conf_dict = destination.copy()
433            if "destinationType" not in destination_conf_dict:
434                raise exc.PyAirbyteInputError(
435                    message="Missing `destinationType` in configuration dictionary.",
436                )
437
438        if random_name_suffix:
439            name += f" (ID: {text_util.generate_random_suffix()})"
440
441        if unique:
442            existing = self.list_destinations(name=name)
443            if existing:
444                raise exc.AirbyteDuplicateResourcesError(
445                    resource_type="destination",
446                    resource_name=name,
447                )
448
449        deployed_destination = api_util.create_destination(
450            name=name,
451            api_root=self.api_root,
452            workspace_id=self.workspace_id,
453            config=destination_conf_dict,  # Wants a dataclass but accepts dict
454            client_id=self.client_id,
455            client_secret=self.client_secret,
456            bearer_token=self.bearer_token,
457        )
458        return CloudDestination(
459            workspace=self,
460            connector_id=deployed_destination.destination_id,
461        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
463    def permanently_delete_source(
464        self,
465        source: str | CloudSource,
466        *,
467        safe_mode: bool = True,
468    ) -> None:
469        """Delete a source from the workspace.
470
471        You can pass either the source ID `str` or a deployed `Source` object.
472
473        Args:
474            source: The source ID or CloudSource object to delete
475            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
476                (case insensitive) to prevent accidental deletion. Defaults to True.
477        """
478        if not isinstance(source, (str, CloudSource)):
479            raise exc.PyAirbyteInputError(
480                message="Invalid source type.",
481                input_value=type(source).__name__,
482            )
483
484        api_util.delete_source(
485            source_id=source.connector_id if isinstance(source, CloudSource) else source,
486            source_name=source.name if isinstance(source, CloudSource) else None,
487            api_root=self.api_root,
488            client_id=self.client_id,
489            client_secret=self.client_secret,
490            bearer_token=self.bearer_token,
491            safe_mode=safe_mode,
492        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
496    def permanently_delete_destination(
497        self,
498        destination: str | CloudDestination,
499        *,
500        safe_mode: bool = True,
501    ) -> None:
502        """Delete a deployed destination from the workspace.
503
504        You can pass either the `Cache` class or the deployed destination ID as a `str`.
505
506        Args:
507            destination: The destination ID or CloudDestination object to delete
508            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
509                (case insensitive) to prevent accidental deletion. Defaults to True.
510        """
511        if not isinstance(destination, (str, CloudDestination)):
512            raise exc.PyAirbyteInputError(
513                message="Invalid destination type.",
514                input_value=type(destination).__name__,
515            )
516
517        api_util.delete_destination(
518            destination_id=(
519                destination if isinstance(destination, str) else destination.destination_id
520            ),
521            destination_name=(
522                destination.name if isinstance(destination, CloudDestination) else None
523            ),
524            api_root=self.api_root,
525            client_id=self.client_id,
526            client_secret=self.client_secret,
527            bearer_token=self.bearer_token,
528            safe_mode=safe_mode,
529        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
533    def deploy_connection(
534        self,
535        connection_name: str,
536        *,
537        source: CloudSource | str,
538        selected_streams: list[str],
539        destination: CloudDestination | str,
540        table_prefix: str | None = None,
541    ) -> CloudConnection:
542        """Create a new connection between an already deployed source and destination.
543
544        Returns the newly deployed connection object.
545
546        Args:
547            connection_name: The name of the connection.
548            source: The deployed source. You can pass a source ID or a CloudSource object.
549            destination: The deployed destination. You can pass a destination ID or a
550                CloudDestination object.
551            table_prefix: Optional. The table prefix to use when syncing to the destination.
552            selected_streams: The selected stream names to sync within the connection.
553        """
554        if not selected_streams:
555            raise exc.PyAirbyteInputError(
556                guidance="You must provide `selected_streams` when creating a connection."
557            )
558
559        source_id: str = source if isinstance(source, str) else source.connector_id
560        destination_id: str = (
561            destination if isinstance(destination, str) else destination.connector_id
562        )
563
564        deployed_connection = api_util.create_connection(
565            name=connection_name,
566            source_id=source_id,
567            destination_id=destination_id,
568            api_root=self.api_root,
569            workspace_id=self.workspace_id,
570            selected_stream_names=selected_streams,
571            prefix=table_prefix or "",
572            client_id=self.client_id,
573            client_secret=self.client_secret,
574            bearer_token=self.bearer_token,
575        )
576
577        return CloudConnection(
578            workspace=self,
579            connection_id=deployed_connection.connection_id,
580            source=deployed_connection.source_id,
581            destination=deployed_connection.destination_id,
582        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
584    def permanently_delete_connection(
585        self,
586        connection: str | CloudConnection,
587        *,
588        cascade_delete_source: bool = False,
589        cascade_delete_destination: bool = False,
590        safe_mode: bool = True,
591    ) -> None:
592        """Delete a deployed connection from the workspace.
593
594        Args:
595            connection: The connection ID or CloudConnection object to delete
596            cascade_delete_source: If True, also delete the source after deleting the connection
597            cascade_delete_destination: If True, also delete the destination after deleting
598                the connection
599            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
600                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
601                to cascade deletes.
602        """
603        if connection is None:
604            raise ValueError("No connection ID provided.")
605
606        if isinstance(connection, str):
607            connection = CloudConnection(
608                workspace=self,
609                connection_id=connection,
610            )
611
612        api_util.delete_connection(
613            connection_id=connection.connection_id,
614            connection_name=connection.name,
615            api_root=self.api_root,
616            workspace_id=self.workspace_id,
617            client_id=self.client_id,
618            client_secret=self.client_secret,
619            bearer_token=self.bearer_token,
620            safe_mode=safe_mode,
621        )
622
623        if cascade_delete_source:
624            self.permanently_delete_source(
625                source=connection.source_id,
626                safe_mode=safe_mode,
627            )
628        if cascade_delete_destination:
629            self.permanently_delete_destination(
630                destination=connection.destination_id,
631                safe_mode=safe_mode,
632            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
636    def list_connections(
637        self,
638        name: str | None = None,
639        *,
640        name_filter: Callable | None = None,
641    ) -> list[CloudConnection]:
642        """List connections by name in the workspace.
643
644        TODO: Add pagination support
645        """
646        connections = api_util.list_connections(
647            api_root=self.api_root,
648            workspace_id=self.workspace_id,
649            name=name,
650            name_filter=name_filter,
651            client_id=self.client_id,
652            client_secret=self.client_secret,
653            bearer_token=self.bearer_token,
654        )
655        return [
656            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
657                workspace=self,
658                connection_response=connection,
659            )
660            for connection in connections
661            if name is None or connection.name == name
662        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
664    def list_sources(
665        self,
666        name: str | None = None,
667        *,
668        name_filter: Callable | None = None,
669    ) -> list[CloudSource]:
670        """List all sources in the workspace.
671
672        TODO: Add pagination support
673        """
674        sources = api_util.list_sources(
675            api_root=self.api_root,
676            workspace_id=self.workspace_id,
677            name=name,
678            name_filter=name_filter,
679            client_id=self.client_id,
680            client_secret=self.client_secret,
681            bearer_token=self.bearer_token,
682        )
683        return [
684            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
685                workspace=self,
686                source_response=source,
687            )
688            for source in sources
689            if name is None or source.name == name
690        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
692    def list_destinations(
693        self,
694        name: str | None = None,
695        *,
696        name_filter: Callable | None = None,
697    ) -> list[CloudDestination]:
698        """List all destinations in the workspace.
699
700        TODO: Add pagination support
701        """
702        destinations = api_util.list_destinations(
703            api_root=self.api_root,
704            workspace_id=self.workspace_id,
705            name=name,
706            name_filter=name_filter,
707            client_id=self.client_id,
708            client_secret=self.client_secret,
709            bearer_token=self.bearer_token,
710        )
711        return [
712            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
713                workspace=self,
714                destination_response=destination,
715            )
716            for destination in destinations
717            if name is None or destination.name == name
718        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
720    def publish_custom_source_definition(
721        self,
722        name: str,
723        *,
724        manifest_yaml: dict[str, Any] | Path | str | None = None,
725        docker_image: str | None = None,
726        docker_tag: str | None = None,
727        unique: bool = True,
728        pre_validate: bool = True,
729        testing_values: dict[str, Any] | None = None,
730    ) -> CustomCloudSourceDefinition:
731        """Publish a custom source connector definition.
732
733        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
734        and docker_tag (for Docker connectors), but not both.
735
736        Args:
737            name: Display name for the connector definition
738            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
739            docker_image: Docker repository (e.g., 'airbyte/source-custom')
740            docker_tag: Docker image tag (e.g., '1.0.0')
741            unique: Whether to enforce name uniqueness
742            pre_validate: Whether to validate manifest client-side (YAML only)
743            testing_values: Optional configuration values to use for testing in the
744                Connector Builder UI. If provided, these values are stored as the complete
745                testing values object for the connector builder project (replaces any existing
746                values), allowing immediate test read operations.
747
748        Returns:
749            CustomCloudSourceDefinition object representing the created definition
750
751        Raises:
752            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
753            AirbyteDuplicateResourcesError: If unique=True and name already exists
754        """
755        is_yaml = manifest_yaml is not None
756        is_docker = docker_image is not None
757
758        if is_yaml == is_docker:
759            raise exc.PyAirbyteInputError(
760                message=(
761                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
762                    "docker_image + docker_tag (for Docker connectors), but not both"
763                ),
764                context={
765                    "manifest_yaml_provided": is_yaml,
766                    "docker_image_provided": is_docker,
767                },
768            )
769
770        if is_docker and docker_tag is None:
771            raise exc.PyAirbyteInputError(
772                message="docker_tag is required when docker_image is specified",
773                context={"docker_image": docker_image},
774            )
775
776        if unique:
777            existing = self.list_custom_source_definitions(
778                definition_type="yaml" if is_yaml else "docker",
779            )
780            if any(d.name == name for d in existing):
781                raise exc.AirbyteDuplicateResourcesError(
782                    resource_type="custom_source_definition",
783                    resource_name=name,
784                )
785
786        if is_yaml:
787            manifest_dict: dict[str, Any]
788            if isinstance(manifest_yaml, Path):
789                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
790            elif isinstance(manifest_yaml, str):
791                manifest_dict = yaml.safe_load(manifest_yaml)
792            elif manifest_yaml is not None:
793                manifest_dict = manifest_yaml
794            else:
795                raise exc.PyAirbyteInputError(
796                    message="manifest_yaml is required for YAML connectors",
797                    context={"name": name},
798                )
799
800            if pre_validate:
801                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
802
803            result = api_util.create_custom_yaml_source_definition(
804                name=name,
805                workspace_id=self.workspace_id,
806                manifest=manifest_dict,
807                api_root=self.api_root,
808                client_id=self.client_id,
809                client_secret=self.client_secret,
810                bearer_token=self.bearer_token,
811            )
812            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
813                self, result
814            )
815
816            # Set testing values if provided
817            if testing_values is not None:
818                custom_definition.set_testing_values(testing_values)
819
820            return custom_definition
821
822        raise NotImplementedError(
823            "Docker custom source definitions are not yet supported. "
824            "Only YAML manifest-based custom sources are currently available."
825        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
827    def list_custom_source_definitions(
828        self,
829        *,
830        definition_type: Literal["yaml", "docker"],
831    ) -> list[CustomCloudSourceDefinition]:
832        """List custom source connector definitions.
833
834        Args:
835            definition_type: Connector type to list ("yaml" or "docker"). Required.
836
837        Returns:
838            List of CustomCloudSourceDefinition objects matching the specified type
839        """
840        if definition_type == "yaml":
841            yaml_definitions = api_util.list_custom_yaml_source_definitions(
842                workspace_id=self.workspace_id,
843                api_root=self.api_root,
844                client_id=self.client_id,
845                client_secret=self.client_secret,
846                bearer_token=self.bearer_token,
847            )
848            return [
849                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
850                for d in yaml_definitions
851            ]
852
853        raise NotImplementedError(
854            "Docker custom source definitions are not yet supported. "
855            "Only YAML manifest-based custom sources are currently available."
856        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
858    def get_custom_source_definition(
859        self,
860        definition_id: str,
861        *,
862        definition_type: Literal["yaml", "docker"],
863    ) -> CustomCloudSourceDefinition:
864        """Get a specific custom source definition by ID.
865
866        Args:
867            definition_id: The definition ID
868            definition_type: Connector type ("yaml" or "docker"). Required.
869
870        Returns:
871            CustomCloudSourceDefinition object
872        """
873        if definition_type == "yaml":
874            result = api_util.get_custom_yaml_source_definition(
875                workspace_id=self.workspace_id,
876                definition_id=definition_id,
877                api_root=self.api_root,
878                client_id=self.client_id,
879                client_secret=self.client_secret,
880                bearer_token=self.bearer_token,
881            )
882            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object

class CloudConnection:
 21class CloudConnection:  # noqa: PLR0904  # Too many public methods
 22    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 23
 24    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 25    """
 26
 27    def __init__(
 28        self,
 29        workspace: CloudWorkspace,
 30        connection_id: str,
 31        source: str | None = None,
 32        destination: str | None = None,
 33    ) -> None:
 34        """It is not recommended to create a `CloudConnection` object directly.
 35
 36        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 37        """
 38        self.connection_id = connection_id
 39        """The ID of the connection."""
 40
 41        self.workspace = workspace
 42        """The workspace that the connection belongs to."""
 43
 44        self._source_id = source
 45        """The ID of the source."""
 46
 47        self._destination_id = destination
 48        """The ID of the destination."""
 49
 50        self._connection_info: ConnectionResponse | None = None
 51        """The connection info object. (Cached.)"""
 52
 53        self._cloud_source_object: CloudSource | None = None
 54        """The source object. (Cached.)"""
 55
 56        self._cloud_destination_object: CloudDestination | None = None
 57        """The destination object. (Cached.)"""
 58
 59    def _fetch_connection_info(
 60        self,
 61        *,
 62        force_refresh: bool = False,
 63        verify: bool = True,
 64    ) -> ConnectionResponse:
 65        """Fetch and cache connection info from the API.
 66
 67        By default, this method will only fetch from the API if connection info is not
 68        already cached. It also verifies that the connection belongs to the expected
 69        workspace unless verification is explicitly disabled.
 70
 71        Args:
 72            force_refresh: If True, always fetch from the API even if cached.
 73                If False (default), only fetch if not already cached.
 74            verify: If True (default), verify that the connection is valid (e.g., that
 75                the workspace_id matches this object's workspace). Raises an error if
 76                validation fails.
 77
 78        Returns:
 79            The ConnectionResponse from the API.
 80
 81        Raises:
 82            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 83                workspace_id doesn't match the expected workspace.
 84            AirbyteMissingResourceError: If the connection doesn't exist.
 85        """
 86        if not force_refresh and self._connection_info is not None:
 87            # Use cached info, but still verify if requested
 88            if verify:
 89                self._verify_workspace_match(self._connection_info)
 90            return self._connection_info
 91
 92        # Fetch from API
 93        connection_info = api_util.get_connection(
 94            workspace_id=self.workspace.workspace_id,
 95            connection_id=self.connection_id,
 96            api_root=self.workspace.api_root,
 97            client_id=self.workspace.client_id,
 98            client_secret=self.workspace.client_secret,
 99            bearer_token=self.workspace.bearer_token,
100        )
101
102        # Cache the result first (before verification may raise)
103        self._connection_info = connection_info
104
105        # Verify if requested
106        if verify:
107            self._verify_workspace_match(connection_info)
108
109        return connection_info
110
111    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
112        """Verify that the connection belongs to the expected workspace.
113
114        Raises:
115            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
116        """
117        if connection_info.workspace_id != self.workspace.workspace_id:
118            raise AirbyteWorkspaceMismatchError(
119                resource_type="connection",
120                resource_id=self.connection_id,
121                workspace=self.workspace,
122                expected_workspace_id=self.workspace.workspace_id,
123                actual_workspace_id=connection_info.workspace_id,
124                message=(
125                    f"Connection '{self.connection_id}' belongs to workspace "
126                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
127                ),
128            )
129
130    def check_is_valid(self) -> bool:
131        """Check if this connection exists and belongs to the expected workspace.
132
133        This method fetches connection info from the API (if not already cached) and
134        verifies that the connection's workspace_id matches the workspace associated
135        with this CloudConnection object.
136
137        Returns:
138            True if the connection exists and belongs to the expected workspace.
139
140        Raises:
141            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
142            AirbyteMissingResourceError: If the connection doesn't exist.
143        """
144        self._fetch_connection_info(force_refresh=False, verify=True)
145        return True
146
147    @classmethod
148    def _from_connection_response(
149        cls,
150        workspace: CloudWorkspace,
151        connection_response: ConnectionResponse,
152    ) -> CloudConnection:
153        """Create a CloudConnection from a ConnectionResponse."""
154        result = cls(
155            workspace=workspace,
156            connection_id=connection_response.connection_id,
157            source=connection_response.source_id,
158            destination=connection_response.destination_id,
159        )
160        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
161        return result
162
163    # Properties
164
165    @property
166    def name(self) -> str | None:
167        """Get the display name of the connection, if available.
168
169        E.g. "My Postgres to Snowflake", not the connection ID.
170        """
171        if not self._connection_info:
172            self._connection_info = self._fetch_connection_info()
173
174        return self._connection_info.name
175
176    @property
177    def source_id(self) -> str:
178        """The ID of the source."""
179        if not self._source_id:
180            if not self._connection_info:
181                self._connection_info = self._fetch_connection_info()
182
183            self._source_id = self._connection_info.source_id
184
185        return self._source_id
186
187    @property
188    def source(self) -> CloudSource:
189        """Get the source object."""
190        if self._cloud_source_object:
191            return self._cloud_source_object
192
193        self._cloud_source_object = CloudSource(
194            workspace=self.workspace,
195            connector_id=self.source_id,
196        )
197        return self._cloud_source_object
198
199    @property
200    def destination_id(self) -> str:
201        """The ID of the destination."""
202        if not self._destination_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._destination_id = self._connection_info.destination_id
207
208        return self._destination_id
209
210    @property
211    def destination(self) -> CloudDestination:
212        """Get the destination object."""
213        if self._cloud_destination_object:
214            return self._cloud_destination_object
215
216        self._cloud_destination_object = CloudDestination(
217            workspace=self.workspace,
218            connector_id=self.destination_id,
219        )
220        return self._cloud_destination_object
221
222    @property
223    def stream_names(self) -> list[str]:
224        """The stream names."""
225        if not self._connection_info:
226            self._connection_info = self._fetch_connection_info()
227
228        return [stream.name for stream in self._connection_info.configurations.streams or []]
229
230    @property
231    def table_prefix(self) -> str:
232        """The table prefix."""
233        if not self._connection_info:
234            self._connection_info = self._fetch_connection_info()
235
236        return self._connection_info.prefix or ""
237
238    @property
239    def connection_url(self) -> str | None:
240        """The web URL to the connection."""
241        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
242
243    @property
244    def job_history_url(self) -> str | None:
245        """The URL to the job history for the connection."""
246        return f"{self.connection_url}/timeline"
247
248    # Run Sync
249
250    def run_sync(
251        self,
252        *,
253        wait: bool = True,
254        wait_timeout: int = 300,
255    ) -> SyncResult:
256        """Run a sync."""
257        connection_response = api_util.run_connection(
258            connection_id=self.connection_id,
259            api_root=self.workspace.api_root,
260            workspace_id=self.workspace.workspace_id,
261            client_id=self.workspace.client_id,
262            client_secret=self.workspace.client_secret,
263            bearer_token=self.workspace.bearer_token,
264        )
265        sync_result = SyncResult(
266            workspace=self.workspace,
267            connection=self,
268            job_id=connection_response.job_id,
269        )
270
271        if wait:
272            sync_result.wait_for_completion(
273                wait_timeout=wait_timeout,
274                raise_failure=True,
275                raise_timeout=True,
276            )
277
278        return sync_result
279
280    def __repr__(self) -> str:
281        """String representation of the connection."""
282        return (
283            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
284            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
285        )
286
287    # Logs
288
289    def get_previous_sync_logs(
290        self,
291        *,
292        limit: int = 20,
293        offset: int | None = None,
294        from_tail: bool = True,
295    ) -> list[SyncResult]:
296        """Get previous sync jobs for a connection with pagination support.
297
298        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
299        rows_synced, start_time). Full log text can be fetched lazily via
300        `SyncResult.get_full_log_text()`.
301
302        Args:
303            limit: Maximum number of jobs to return. Defaults to 20.
304            offset: Number of jobs to skip from the beginning. Defaults to None (0).
305            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
306                If False, returns jobs ordered oldest-first (createdAt ASC).
307                Defaults to True.
308
309        Returns:
310            A list of SyncResult objects representing the sync jobs.
311        """
312        order_by = (
313            api_util.JOB_ORDER_BY_CREATED_AT_DESC
314            if from_tail
315            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
316        )
317        sync_logs: list[JobResponse] = api_util.get_job_logs(
318            connection_id=self.connection_id,
319            api_root=self.workspace.api_root,
320            workspace_id=self.workspace.workspace_id,
321            limit=limit,
322            offset=offset,
323            order_by=order_by,
324            client_id=self.workspace.client_id,
325            client_secret=self.workspace.client_secret,
326            bearer_token=self.workspace.bearer_token,
327        )
328        return [
329            SyncResult(
330                workspace=self.workspace,
331                connection=self,
332                job_id=sync_log.job_id,
333                _latest_job_info=sync_log,
334            )
335            for sync_log in sync_logs
336        ]
337
338    def get_sync_result(
339        self,
340        job_id: int | None = None,
341    ) -> SyncResult | None:
342        """Get the sync result for the connection.
343
344        If `job_id` is not provided, the most recent sync job will be used.
345
346        Returns `None` if job_id is omitted and no previous jobs are found.
347        """
348        if job_id is None:
349            # Get the most recent sync job
350            results = self.get_previous_sync_logs(
351                limit=1,
352            )
353            if results:
354                return results[0]
355
356            return None
357
358        # Get the sync job by ID (lazy loaded)
359        return SyncResult(
360            workspace=self.workspace,
361            connection=self,
362            job_id=job_id,
363        )
364
365    # Artifacts
366
367    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
368        """Get the connection state artifacts.
369
370        Returns the persisted state for this connection, which can be used
371        when debugging incremental syncs.
372
373        Uses the Config API endpoint: POST /v1/state/get
374
375        Returns:
376            List of state objects for each stream, or None if no state is set.
377        """
378        state_response = api_util.get_connection_state(
379            connection_id=self.connection_id,
380            api_root=self.workspace.api_root,
381            client_id=self.workspace.client_id,
382            client_secret=self.workspace.client_secret,
383            bearer_token=self.workspace.bearer_token,
384        )
385        if state_response.get("stateType") == "not_set":
386            return None
387        return state_response.get("streamState", [])
388
389    def get_catalog_artifact(self) -> dict[str, Any] | None:
390        """Get the configured catalog for this connection.
391
392        Returns the full configured catalog (syncCatalog) for this connection,
393        including stream schemas, sync modes, cursor fields, and primary keys.
394
395        Uses the Config API endpoint: POST /v1/web_backend/connections/get
396
397        Returns:
398            Dictionary containing the configured catalog, or `None` if not found.
399        """
400        connection_response = api_util.get_connection_catalog(
401            connection_id=self.connection_id,
402            api_root=self.workspace.api_root,
403            client_id=self.workspace.client_id,
404            client_secret=self.workspace.client_secret,
405            bearer_token=self.workspace.bearer_token,
406        )
407        return connection_response.get("syncCatalog")
408
409    def rename(self, name: str) -> CloudConnection:
410        """Rename the connection.
411
412        Args:
413            name: New name for the connection
414
415        Returns:
416            Updated CloudConnection object with refreshed info
417        """
418        updated_response = api_util.patch_connection(
419            connection_id=self.connection_id,
420            api_root=self.workspace.api_root,
421            client_id=self.workspace.client_id,
422            client_secret=self.workspace.client_secret,
423            bearer_token=self.workspace.bearer_token,
424            name=name,
425        )
426        self._connection_info = updated_response
427        return self
428
429    def set_table_prefix(self, prefix: str) -> CloudConnection:
430        """Set the table prefix for the connection.
431
432        Args:
433            prefix: New table prefix to use when syncing to the destination
434
435        Returns:
436            Updated CloudConnection object with refreshed info
437        """
438        updated_response = api_util.patch_connection(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            prefix=prefix,
445        )
446        self._connection_info = updated_response
447        return self
448
449    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
450        """Set the selected streams for the connection.
451
452        This is a destructive operation that can break existing connections if the
453        stream selection is changed incorrectly. Use with caution.
454
455        Args:
456            stream_names: List of stream names to sync
457
458        Returns:
459            Updated CloudConnection object with refreshed info
460        """
461        configurations = api_util.build_stream_configurations(stream_names)
462
463        updated_response = api_util.patch_connection(
464            connection_id=self.connection_id,
465            api_root=self.workspace.api_root,
466            client_id=self.workspace.client_id,
467            client_secret=self.workspace.client_secret,
468            bearer_token=self.workspace.bearer_token,
469            configurations=configurations,
470        )
471        self._connection_info = updated_response
472        return self
473
474    # Enable/Disable
475
476    @property
477    def enabled(self) -> bool:
478        """Get the current enabled status of the connection.
479
480        This property always fetches fresh data from the API to ensure accuracy,
481        as another process or user may have toggled the setting.
482
483        Returns:
484            True if the connection status is 'active', False otherwise.
485        """
486        connection_info = self._fetch_connection_info(force_refresh=True)
487        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
488
489    @enabled.setter
490    def enabled(self, value: bool) -> None:
491        """Set the enabled status of the connection.
492
493        Args:
494            value: True to enable (set status to 'active'), False to disable
495                (set status to 'inactive').
496        """
497        self.set_enabled(enabled=value)
498
499    def set_enabled(
500        self,
501        *,
502        enabled: bool,
503        ignore_noop: bool = True,
504    ) -> None:
505        """Set the enabled status of the connection.
506
507        Args:
508            enabled: True to enable (set status to 'active'), False to disable
509                (set status to 'inactive').
510            ignore_noop: If True (default), silently return if the connection is already
511                in the requested state. If False, raise ValueError when the requested
512                state matches the current state.
513
514        Raises:
515            ValueError: If ignore_noop is False and the connection is already in the
516                requested state.
517        """
518        # Always fetch fresh data to check current status
519        connection_info = self._fetch_connection_info(force_refresh=True)
520        current_status = connection_info.status
521        desired_status = (
522            api_util.models.ConnectionStatusEnum.ACTIVE
523            if enabled
524            else api_util.models.ConnectionStatusEnum.INACTIVE
525        )
526
527        if current_status == desired_status:
528            if ignore_noop:
529                return
530            raise ValueError(
531                f"Connection is already {'enabled' if enabled else 'disabled'}. "
532                f"Current status: {current_status}"
533            )
534
535        updated_response = api_util.patch_connection(
536            connection_id=self.connection_id,
537            api_root=self.workspace.api_root,
538            client_id=self.workspace.client_id,
539            client_secret=self.workspace.client_secret,
540            bearer_token=self.workspace.bearer_token,
541            status=desired_status,
542        )
543        self._connection_info = updated_response
544
545    # Scheduling
546
547    def set_schedule(
548        self,
549        cron_expression: str,
550    ) -> None:
551        """Set a cron schedule for the connection.
552
553        Args:
554            cron_expression: A cron expression defining when syncs should run.
555
556        Examples:
557                - "0 0 * * *" - Daily at midnight UTC
558                - "0 */6 * * *" - Every 6 hours
559                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
560        """
561        schedule = api_util.models.AirbyteAPIConnectionSchedule(
562            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
563            cron_expression=cron_expression,
564        )
565        updated_response = api_util.patch_connection(
566            connection_id=self.connection_id,
567            api_root=self.workspace.api_root,
568            client_id=self.workspace.client_id,
569            client_secret=self.workspace.client_secret,
570            bearer_token=self.workspace.bearer_token,
571            schedule=schedule,
572        )
573        self._connection_info = updated_response
574
575    def set_manual_schedule(self) -> None:
576        """Set the connection to manual scheduling.
577
578        Disables automatic syncs. Syncs will only run when manually triggered.
579        """
580        schedule = api_util.models.AirbyteAPIConnectionSchedule(
581            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
582        )
583        updated_response = api_util.patch_connection(
584            connection_id=self.connection_id,
585            api_root=self.workspace.api_root,
586            client_id=self.workspace.client_id,
587            client_secret=self.workspace.client_secret,
588            bearer_token=self.workspace.bearer_token,
589            schedule=schedule,
590        )
591        self._connection_info = updated_response
592
593    # Deletions
594
595    def permanently_delete(
596        self,
597        *,
598        cascade_delete_source: bool = False,
599        cascade_delete_destination: bool = False,
600    ) -> None:
601        """Delete the connection.
602
603        Args:
604            cascade_delete_source: Whether to also delete the source.
605            cascade_delete_destination: Whether to also delete the destination.
606        """
607        self.workspace.permanently_delete_connection(self)
608
609        if cascade_delete_source:
610            self.workspace.permanently_delete_source(self.source_id)
611
612        if cascade_delete_destination:
613            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
27    def __init__(
28        self,
29        workspace: CloudWorkspace,
30        connection_id: str,
31        source: str | None = None,
32        destination: str | None = None,
33    ) -> None:
34        """It is not recommended to create a `CloudConnection` object directly.
35
36        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
37        """
38        self.connection_id = connection_id
39        """The ID of the connection."""
40
41        self.workspace = workspace
42        """The workspace that the connection belongs to."""
43
44        self._source_id = source
45        """The ID of the source."""
46
47        self._destination_id = destination
48        """The ID of the destination."""
49
50        self._connection_info: ConnectionResponse | None = None
51        """The connection info object. (Cached.)"""
52
53        self._cloud_source_object: CloudSource | None = None
54        """The source object. (Cached.)"""
55
56        self._cloud_destination_object: CloudDestination | None = None
57        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
130    def check_is_valid(self) -> bool:
131        """Check if this connection exists and belongs to the expected workspace.
132
133        This method fetches connection info from the API (if not already cached) and
134        verifies that the connection's workspace_id matches the workspace associated
135        with this CloudConnection object.
136
137        Returns:
138            True if the connection exists and belongs to the expected workspace.
139
140        Raises:
141            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
142            AirbyteMissingResourceError: If the connection doesn't exist.
143        """
144        self._fetch_connection_info(force_refresh=False, verify=True)
145        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
165    @property
166    def name(self) -> str | None:
167        """Get the display name of the connection, if available.
168
169        E.g. "My Postgres to Snowflake", not the connection ID.
170        """
171        if not self._connection_info:
172            self._connection_info = self._fetch_connection_info()
173
174        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
176    @property
177    def source_id(self) -> str:
178        """The ID of the source."""
179        if not self._source_id:
180            if not self._connection_info:
181                self._connection_info = self._fetch_connection_info()
182
183            self._source_id = self._connection_info.source_id
184
185        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
187    @property
188    def source(self) -> CloudSource:
189        """Get the source object."""
190        if self._cloud_source_object:
191            return self._cloud_source_object
192
193        self._cloud_source_object = CloudSource(
194            workspace=self.workspace,
195            connector_id=self.source_id,
196        )
197        return self._cloud_source_object

Get the source object.

destination_id: str
199    @property
200    def destination_id(self) -> str:
201        """The ID of the destination."""
202        if not self._destination_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._destination_id = self._connection_info.destination_id
207
208        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
210    @property
211    def destination(self) -> CloudDestination:
212        """Get the destination object."""
213        if self._cloud_destination_object:
214            return self._cloud_destination_object
215
216        self._cloud_destination_object = CloudDestination(
217            workspace=self.workspace,
218            connector_id=self.destination_id,
219        )
220        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
222    @property
223    def stream_names(self) -> list[str]:
224        """The stream names."""
225        if not self._connection_info:
226            self._connection_info = self._fetch_connection_info()
227
228        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
230    @property
231    def table_prefix(self) -> str:
232        """The table prefix."""
233        if not self._connection_info:
234            self._connection_info = self._fetch_connection_info()
235
236        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
238    @property
239    def connection_url(self) -> str | None:
240        """The web URL to the connection."""
241        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
243    @property
244    def job_history_url(self) -> str | None:
245        """The URL to the job history for the connection."""
246        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
250    def run_sync(
251        self,
252        *,
253        wait: bool = True,
254        wait_timeout: int = 300,
255    ) -> SyncResult:
256        """Run a sync."""
257        connection_response = api_util.run_connection(
258            connection_id=self.connection_id,
259            api_root=self.workspace.api_root,
260            workspace_id=self.workspace.workspace_id,
261            client_id=self.workspace.client_id,
262            client_secret=self.workspace.client_secret,
263            bearer_token=self.workspace.bearer_token,
264        )
265        sync_result = SyncResult(
266            workspace=self.workspace,
267            connection=self,
268            job_id=connection_response.job_id,
269        )
270
271        if wait:
272            sync_result.wait_for_completion(
273                wait_timeout=wait_timeout,
274                raise_failure=True,
275                raise_timeout=True,
276            )
277
278        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True) -> list[SyncResult]:
289    def get_previous_sync_logs(
290        self,
291        *,
292        limit: int = 20,
293        offset: int | None = None,
294        from_tail: bool = True,
295    ) -> list[SyncResult]:
296        """Get previous sync jobs for a connection with pagination support.
297
298        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
299        rows_synced, start_time). Full log text can be fetched lazily via
300        `SyncResult.get_full_log_text()`.
301
302        Args:
303            limit: Maximum number of jobs to return. Defaults to 20.
304            offset: Number of jobs to skip from the beginning. Defaults to None (0).
305            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
306                If False, returns jobs ordered oldest-first (createdAt ASC).
307                Defaults to True.
308
309        Returns:
310            A list of SyncResult objects representing the sync jobs.
311        """
312        order_by = (
313            api_util.JOB_ORDER_BY_CREATED_AT_DESC
314            if from_tail
315            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
316        )
317        sync_logs: list[JobResponse] = api_util.get_job_logs(
318            connection_id=self.connection_id,
319            api_root=self.workspace.api_root,
320            workspace_id=self.workspace.workspace_id,
321            limit=limit,
322            offset=offset,
323            order_by=order_by,
324            client_id=self.workspace.client_id,
325            client_secret=self.workspace.client_secret,
326            bearer_token=self.workspace.bearer_token,
327        )
328        return [
329            SyncResult(
330                workspace=self.workspace,
331                connection=self,
332                job_id=sync_log.job_id,
333                _latest_job_info=sync_log,
334            )
335            for sync_log in sync_logs
336        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
338    def get_sync_result(
339        self,
340        job_id: int | None = None,
341    ) -> SyncResult | None:
342        """Get the sync result for the connection.
343
344        If `job_id` is not provided, the most recent sync job will be used.
345
346        Returns `None` if job_id is omitted and no previous jobs are found.
347        """
348        if job_id is None:
349            # Get the most recent sync job
350            results = self.get_previous_sync_logs(
351                limit=1,
352            )
353            if results:
354                return results[0]
355
356            return None
357
358        # Get the sync job by ID (lazy loaded)
359        return SyncResult(
360            workspace=self.workspace,
361            connection=self,
362            job_id=job_id,
363        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
367    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
368        """Get the connection state artifacts.
369
370        Returns the persisted state for this connection, which can be used
371        when debugging incremental syncs.
372
373        Uses the Config API endpoint: POST /v1/state/get
374
375        Returns:
376            List of state objects for each stream, or None if no state is set.
377        """
378        state_response = api_util.get_connection_state(
379            connection_id=self.connection_id,
380            api_root=self.workspace.api_root,
381            client_id=self.workspace.client_id,
382            client_secret=self.workspace.client_secret,
383            bearer_token=self.workspace.bearer_token,
384        )
385        if state_response.get("stateType") == "not_set":
386            return None
387        return state_response.get("streamState", [])

Get the connection state artifacts.

Returns the persisted state for this connection, which can be used when debugging incremental syncs.

Uses the Config API endpoint: POST /v1/state/get

Returns:

List of state objects for each stream, or None if no state is set.

def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
389    def get_catalog_artifact(self) -> dict[str, Any] | None:
390        """Get the configured catalog for this connection.
391
392        Returns the full configured catalog (syncCatalog) for this connection,
393        including stream schemas, sync modes, cursor fields, and primary keys.
394
395        Uses the Config API endpoint: POST /v1/web_backend/connections/get
396
397        Returns:
398            Dictionary containing the configured catalog, or `None` if not found.
399        """
400        connection_response = api_util.get_connection_catalog(
401            connection_id=self.connection_id,
402            api_root=self.workspace.api_root,
403            client_id=self.workspace.client_id,
404            client_secret=self.workspace.client_secret,
405            bearer_token=self.workspace.bearer_token,
406        )
407        return connection_response.get("syncCatalog")

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def rename(self, name: str) -> CloudConnection:
409    def rename(self, name: str) -> CloudConnection:
410        """Rename the connection.
411
412        Args:
413            name: New name for the connection
414
415        Returns:
416            Updated CloudConnection object with refreshed info
417        """
418        updated_response = api_util.patch_connection(
419            connection_id=self.connection_id,
420            api_root=self.workspace.api_root,
421            client_id=self.workspace.client_id,
422            client_secret=self.workspace.client_secret,
423            bearer_token=self.workspace.bearer_token,
424            name=name,
425        )
426        self._connection_info = updated_response
427        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
429    def set_table_prefix(self, prefix: str) -> CloudConnection:
430        """Set the table prefix for the connection.
431
432        Args:
433            prefix: New table prefix to use when syncing to the destination
434
435        Returns:
436            Updated CloudConnection object with refreshed info
437        """
438        updated_response = api_util.patch_connection(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            prefix=prefix,
445        )
446        self._connection_info = updated_response
447        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
449    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
450        """Set the selected streams for the connection.
451
452        This is a destructive operation that can break existing connections if the
453        stream selection is changed incorrectly. Use with caution.
454
455        Args:
456            stream_names: List of stream names to sync
457
458        Returns:
459            Updated CloudConnection object with refreshed info
460        """
461        configurations = api_util.build_stream_configurations(stream_names)
462
463        updated_response = api_util.patch_connection(
464            connection_id=self.connection_id,
465            api_root=self.workspace.api_root,
466            client_id=self.workspace.client_id,
467            client_secret=self.workspace.client_secret,
468            bearer_token=self.workspace.bearer_token,
469            configurations=configurations,
470        )
471        self._connection_info = updated_response
472        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
476    @property
477    def enabled(self) -> bool:
478        """Get the current enabled status of the connection.
479
480        This property always fetches fresh data from the API to ensure accuracy,
481        as another process or user may have toggled the setting.
482
483        Returns:
484            True if the connection status is 'active', False otherwise.
485        """
486        connection_info = self._fetch_connection_info(force_refresh=True)
487        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
499    def set_enabled(
500        self,
501        *,
502        enabled: bool,
503        ignore_noop: bool = True,
504    ) -> None:
505        """Set the enabled status of the connection.
506
507        Args:
508            enabled: True to enable (set status to 'active'), False to disable
509                (set status to 'inactive').
510            ignore_noop: If True (default), silently return if the connection is already
511                in the requested state. If False, raise ValueError when the requested
512                state matches the current state.
513
514        Raises:
515            ValueError: If ignore_noop is False and the connection is already in the
516                requested state.
517        """
518        # Always fetch fresh data to check current status
519        connection_info = self._fetch_connection_info(force_refresh=True)
520        current_status = connection_info.status
521        desired_status = (
522            api_util.models.ConnectionStatusEnum.ACTIVE
523            if enabled
524            else api_util.models.ConnectionStatusEnum.INACTIVE
525        )
526
527        if current_status == desired_status:
528            if ignore_noop:
529                return
530            raise ValueError(
531                f"Connection is already {'enabled' if enabled else 'disabled'}. "
532                f"Current status: {current_status}"
533            )
534
535        updated_response = api_util.patch_connection(
536            connection_id=self.connection_id,
537            api_root=self.workspace.api_root,
538            client_id=self.workspace.client_id,
539            client_secret=self.workspace.client_secret,
540            bearer_token=self.workspace.bearer_token,
541            status=desired_status,
542        )
543        self._connection_info = updated_response

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
547    def set_schedule(
548        self,
549        cron_expression: str,
550    ) -> None:
551        """Set a cron schedule for the connection.
552
553        Args:
554            cron_expression: A cron expression defining when syncs should run.
555
556        Examples:
557                - "0 0 * * *" - Daily at midnight UTC
558                - "0 */6 * * *" - Every 6 hours
559                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
560        """
561        schedule = api_util.models.AirbyteAPIConnectionSchedule(
562            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
563            cron_expression=cron_expression,
564        )
565        updated_response = api_util.patch_connection(
566            connection_id=self.connection_id,
567            api_root=self.workspace.api_root,
568            client_id=self.workspace.client_id,
569            client_secret=self.workspace.client_secret,
570            bearer_token=self.workspace.bearer_token,
571            schedule=schedule,
572        )
573        self._connection_info = updated_response

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
575    def set_manual_schedule(self) -> None:
576        """Set the connection to manual scheduling.
577
578        Disables automatic syncs. Syncs will only run when manually triggered.
579        """
580        schedule = api_util.models.AirbyteAPIConnectionSchedule(
581            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
582        )
583        updated_response = api_util.patch_connection(
584            connection_id=self.connection_id,
585            api_root=self.workspace.api_root,
586            client_id=self.workspace.client_id,
587            client_secret=self.workspace.client_secret,
588            bearer_token=self.workspace.bearer_token,
589            schedule=schedule,
590        )
591        self._connection_info = updated_response

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
595    def permanently_delete(
596        self,
597        *,
598        cascade_delete_source: bool = False,
599        cascade_delete_destination: bool = False,
600    ) -> None:
601        """Delete the connection.
602
603        Args:
604            cascade_delete_source: Whether to also delete the source.
605            cascade_delete_destination: Whether to also delete the destination.
606        """
607        self.workspace.permanently_delete_connection(self)
608
609        if cascade_delete_source:
610            self.workspace.permanently_delete_source(self.source_id)
611
612        if cascade_delete_destination:
613            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class CloudClientConfig:
 57@dataclass
 58class CloudClientConfig:
 59    """Client configuration for Airbyte Cloud API.
 60
 61    This class encapsulates the authentication and API configuration needed to connect
 62    to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually
 63    exclusive authentication methods:
 64
 65    1. OAuth2 client credentials flow (client_id + client_secret)
 66    2. Bearer token authentication
 67
 68    Exactly one authentication method must be provided. Providing both or neither
 69    will raise a validation error.
 70
 71    Attributes:
 72        client_id: OAuth2 client ID for client credentials flow.
 73        client_secret: OAuth2 client secret for client credentials flow.
 74        bearer_token: Pre-generated bearer token for direct authentication.
 75        api_root: The API root URL. Defaults to Airbyte Cloud API.
 76    """
 77
 78    client_id: SecretString | None = None
 79    """OAuth2 client ID for client credentials authentication."""
 80
 81    client_secret: SecretString | None = None
 82    """OAuth2 client secret for client credentials authentication."""
 83
 84    bearer_token: SecretString | None = None
 85    """Bearer token for direct authentication (alternative to client credentials)."""
 86
 87    api_root: str = api_util.CLOUD_API_ROOT
 88    """The API root URL. Defaults to Airbyte Cloud API."""
 89
 90    def __post_init__(self) -> None:
 91        """Validate credentials and ensure secrets are properly wrapped."""
 92        # Wrap secrets in SecretString if they aren't already
 93        if self.client_id is not None:
 94            self.client_id = SecretString(self.client_id)
 95        if self.client_secret is not None:
 96            self.client_secret = SecretString(self.client_secret)
 97        if self.bearer_token is not None:
 98            self.bearer_token = SecretString(self.bearer_token)
 99
100        # Validate mutual exclusivity
101        has_client_credentials = self.client_id is not None or self.client_secret is not None
102        has_bearer_token = self.bearer_token is not None
103
104        if has_client_credentials and has_bearer_token:
105            raise PyAirbyteInputError(
106                message="Cannot use both client credentials and bearer token authentication.",
107                guidance=(
108                    "Provide either client_id and client_secret together, "
109                    "or bearer_token alone, but not both."
110                ),
111            )
112
113        if has_client_credentials and (self.client_id is None or self.client_secret is None):
114            # If using client credentials, both must be provided
115            raise PyAirbyteInputError(
116                message="Incomplete client credentials.",
117                guidance=(
118                    "When using client credentials authentication, "
119                    "both client_id and client_secret must be provided."
120                ),
121            )
122
123        if not has_client_credentials and not has_bearer_token:
124            raise PyAirbyteInputError(
125                message="No authentication credentials provided.",
126                guidance=(
127                    "Provide either client_id and client_secret together for OAuth2 "
128                    "client credentials flow, or bearer_token for direct authentication."
129                ),
130            )
131
132    @property
133    def uses_bearer_token(self) -> bool:
134        """Return True if using bearer token authentication."""
135        return self.bearer_token is not None
136
137    @property
138    def uses_client_credentials(self) -> bool:
139        """Return True if using client credentials authentication."""
140        return self.client_id is not None and self.client_secret is not None
141
142    @classmethod
143    def from_env(
144        cls,
145        *,
146        api_root: str | None = None,
147    ) -> CloudClientConfig:
148        """Create CloudClientConfig from environment variables.
149
150        This factory method resolves credentials from environment variables,
151        providing a convenient way to create credentials without explicitly
152        passing secrets.
153
154        Environment variables used:
155            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
156            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
157            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
158            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
159
160        The method will first check for a bearer token. If not found, it will
161        attempt to use client credentials.
162
163        Args:
164            api_root: The API root URL. If not provided, will be resolved from
165                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
166                the Airbyte Cloud API.
167
168        Returns:
169            A CloudClientConfig instance configured with credentials from the environment.
170
171        Raises:
172            PyAirbyteSecretNotFoundError: If required credentials are not found in
173                the environment.
174        """
175        resolved_api_root = resolve_cloud_api_url(api_root)
176
177        # Try bearer token first
178        bearer_token = resolve_cloud_bearer_token()
179        if bearer_token:
180            return cls(
181                bearer_token=bearer_token,
182                api_root=resolved_api_root,
183            )
184
185        # Fall back to client credentials
186        return cls(
187            client_id=resolve_cloud_client_id(),
188            client_secret=resolve_cloud_client_secret(),
189            api_root=resolved_api_root,
190        )

Client configuration for Airbyte Cloud API.

This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:

  1. OAuth2 client credentials flow (client_id + client_secret)
  2. Bearer token authentication

Exactly one authentication method must be provided. Providing both or neither will raise a validation error.

Attributes:
  • client_id: OAuth2 client ID for client credentials flow.
  • client_secret: OAuth2 client secret for client credentials flow.
  • bearer_token: Pre-generated bearer token for direct authentication.
  • api_root: The API root URL. Defaults to Airbyte Cloud API.
CloudClientConfig( client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, bearer_token: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1')
client_id: airbyte.secrets.SecretString | None = None

OAuth2 client ID for client credentials authentication.

client_secret: airbyte.secrets.SecretString | None = None

OAuth2 client secret for client credentials authentication.

bearer_token: airbyte.secrets.SecretString | None = None

Bearer token for direct authentication (alternative to client credentials).

api_root: str = 'https://api.airbyte.com/v1'

The API root URL. Defaults to Airbyte Cloud API.

uses_bearer_token: bool
132    @property
133    def uses_bearer_token(self) -> bool:
134        """Return True if using bearer token authentication."""
135        return self.bearer_token is not None

Return True if using bearer token authentication.

uses_client_credentials: bool
137    @property
138    def uses_client_credentials(self) -> bool:
139        """Return True if using client credentials authentication."""
140        return self.client_id is not None and self.client_secret is not None

Return True if using client credentials authentication.

@classmethod
def from_env( cls, *, api_root: str | None = None) -> CloudClientConfig:
142    @classmethod
143    def from_env(
144        cls,
145        *,
146        api_root: str | None = None,
147    ) -> CloudClientConfig:
148        """Create CloudClientConfig from environment variables.
149
150        This factory method resolves credentials from environment variables,
151        providing a convenient way to create credentials without explicitly
152        passing secrets.
153
154        Environment variables used:
155            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
156            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
157            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
158            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
159
160        The method will first check for a bearer token. If not found, it will
161        attempt to use client credentials.
162
163        Args:
164            api_root: The API root URL. If not provided, will be resolved from
165                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
166                the Airbyte Cloud API.
167
168        Returns:
169            A CloudClientConfig instance configured with credentials from the environment.
170
171        Raises:
172            PyAirbyteSecretNotFoundError: If required credentials are not found in
173                the environment.
174        """
175        resolved_api_root = resolve_cloud_api_url(api_root)
176
177        # Try bearer token first
178        bearer_token = resolve_cloud_bearer_token()
179        if bearer_token:
180            return cls(
181                bearer_token=bearer_token,
182                api_root=resolved_api_root,
183            )
184
185        # Fall back to client credentials
186        return cls(
187            client_id=resolve_cloud_client_id(),
188            client_secret=resolve_cloud_client_secret(),
189            api_root=resolved_api_root,
190        )

Create CloudClientConfig from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.

Environment variables used:
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).

The method will first check for a bearer token. If not found, it will attempt to use client credentials.

Arguments:
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
Returns:

A CloudClientConfig instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259            bearer_token=self.workspace.bearer_token,
260        )
261        return self._connection_response
262
263    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
264        """Return the destination configuration for the sync job."""
265        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
266        destination_response = api_util.get_destination(
267            destination_id=connection_info.destination_id,
268            api_root=self.workspace.api_root,
269            client_id=self.workspace.client_id,
270            client_secret=self.workspace.client_secret,
271            bearer_token=self.workspace.bearer_token,
272        )
273        return asdict(destination_response.configuration)
274
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES
278
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status
282
283    def _fetch_latest_job_info(self) -> JobResponse:
284        """Return the job info for the sync job."""
285        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
286            return self._latest_job_info
287
288        self._latest_job_info = api_util.get_job_info(
289            job_id=self.job_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            bearer_token=self.workspace.bearer_token,
294        )
295        return self._latest_job_info
296
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0
301
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0
306
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise
326
327    def _fetch_job_with_attempts(self) -> dict[str, Any]:
328        """Fetch job info with attempts from Config API using lazy loading pattern."""
329        if self._job_with_attempts_info is not None:
330            return self._job_with_attempts_info
331
332        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
333            api_root=self.workspace.api_root,
334            path="/jobs/get",
335            json={
336                "id": self.job_id,
337            },
338            client_id=self.workspace.client_id,
339            client_secret=self.workspace.client_secret,
340            bearer_token=self.workspace.bearer_token,
341        )
342        return self._job_with_attempts_info
343
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]
359
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )
385
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
417
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache
426
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()
430
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
434
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)
441
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )
456
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()
461
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name
466
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names
471
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)
483
484    class _SyncResultStreams(Mapping[str, CachedDataset]):
485        """A mapping of stream names to cached datasets."""
486
487        def __init__(
488            self,
489            parent: SyncResult,
490            /,
491        ) -> None:
492            self.parent: SyncResult = parent
493
494        def __getitem__(self, key: str) -> CachedDataset:
495            return self.parent.get_dataset(stream_name=key)
496
497        def __iter__(self) -> Iterator[str]:
498            return iter(self.parent.stream_names)
499
500        def __len__(self) -> int:
501            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>