airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.connections import CloudConnection
57from airbyte.cloud.constants import JobStatusEnum
58from airbyte.cloud.sync_results import SyncResult
59from airbyte.cloud.workspaces import CloudWorkspace
60
61
62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
63if TYPE_CHECKING:
64    # ruff: noqa: TC004
65    from airbyte.cloud import connections, constants, sync_results, workspaces
66
67
68__all__ = [
69    # Submodules
70    "workspaces",
71    "connections",
72    "constants",
73    "sync_results",
74    # Classes
75    "CloudWorkspace",
76    "CloudConnection",
77    "SyncResult",
78    # Enums
79    "JobStatusEnum",
80]
@dataclass
class CloudWorkspace:
 65@dataclass
 66class CloudWorkspace:
 67    """A remote workspace on the Airbyte Cloud.
 68
 69    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 70    instances, both OSS and Enterprise.
 71    """
 72
 73    workspace_id: str
 74    client_id: SecretString
 75    client_secret: SecretString
 76    api_root: str = api_util.CLOUD_API_ROOT
 77
 78    def __post_init__(self) -> None:
 79        """Ensure that the client ID and secret are handled securely."""
 80        self.client_id = SecretString(self.client_id)
 81        self.client_secret = SecretString(self.client_secret)
 82
 83    @property
 84    def workspace_url(self) -> str | None:
 85        """The web URL of the workspace."""
 86        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 87
 88    @cached_property
 89    def _organization_info(self) -> dict[str, Any]:
 90        """Fetch and cache organization info for this workspace.
 91
 92        Uses the Config API endpoint for an efficient O(1) lookup.
 93        """
 94        return api_util.get_workspace_organization_info(
 95            workspace_id=self.workspace_id,
 96            api_root=self.api_root,
 97            client_id=self.client_id,
 98            client_secret=self.client_secret,
 99        )
100
101    @property
102    def organization_id(self) -> str | None:
103        """The ID of the organization this workspace belongs to.
104
105        This value is cached after the first lookup.
106        """
107        return self._organization_info.get("organizationId")
108
109    @property
110    def organization_name(self) -> str | None:
111        """The name of the organization this workspace belongs to.
112
113        This value is cached after the first lookup.
114        """
115        return self._organization_info.get("organizationName")
116
117    # Test connection and creds
118
119    def connect(self) -> None:
120        """Check that the workspace is reachable and raise an exception otherwise.
121
122        Note: It is not necessary to call this method before calling other operations. It
123              serves primarily as a simple check to ensure that the workspace is reachable
124              and credentials are correct.
125        """
126        _ = api_util.get_workspace(
127            api_root=self.api_root,
128            workspace_id=self.workspace_id,
129            client_id=self.client_id,
130            client_secret=self.client_secret,
131        )
132        print(f"Successfully connected to workspace: {self.workspace_url}")
133
134    # Get sources, destinations, and connections
135
136    def get_connection(
137        self,
138        connection_id: str,
139    ) -> CloudConnection:
140        """Get a connection by ID.
141
142        This method does not fetch data from the API. It returns a `CloudConnection` object,
143        which will be loaded lazily as needed.
144        """
145        return CloudConnection(
146            workspace=self,
147            connection_id=connection_id,
148        )
149
150    def get_source(
151        self,
152        source_id: str,
153    ) -> CloudSource:
154        """Get a source by ID.
155
156        This method does not fetch data from the API. It returns a `CloudSource` object,
157        which will be loaded lazily as needed.
158        """
159        return CloudSource(
160            workspace=self,
161            connector_id=source_id,
162        )
163
164    def get_destination(
165        self,
166        destination_id: str,
167    ) -> CloudDestination:
168        """Get a destination by ID.
169
170        This method does not fetch data from the API. It returns a `CloudDestination` object,
171        which will be loaded lazily as needed.
172        """
173        return CloudDestination(
174            workspace=self,
175            connector_id=destination_id,
176        )
177
178    # Deploy sources and destinations
179
180    def deploy_source(
181        self,
182        name: str,
183        source: Source,
184        *,
185        unique: bool = True,
186        random_name_suffix: bool = False,
187    ) -> CloudSource:
188        """Deploy a source to the workspace.
189
190        Returns the newly deployed source.
191
192        Args:
193            name: The name to use when deploying.
194            source: The source object to deploy.
195            unique: Whether to require a unique name. If `True`, duplicate names
196                are not allowed. Defaults to `True`.
197            random_name_suffix: Whether to append a random suffix to the name.
198        """
199        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
200        source_config_dict["sourceType"] = source.name.replace("source-", "")
201
202        if random_name_suffix:
203            name += f" (ID: {text_util.generate_random_suffix()})"
204
205        if unique:
206            existing = self.list_sources(name=name)
207            if existing:
208                raise exc.AirbyteDuplicateResourcesError(
209                    resource_type="source",
210                    resource_name=name,
211                )
212
213        deployed_source = api_util.create_source(
214            name=name,
215            api_root=self.api_root,
216            workspace_id=self.workspace_id,
217            config=source_config_dict,
218            client_id=self.client_id,
219            client_secret=self.client_secret,
220        )
221        return CloudSource(
222            workspace=self,
223            connector_id=deployed_source.source_id,
224        )
225
226    def deploy_destination(
227        self,
228        name: str,
229        destination: Destination | dict[str, Any],
230        *,
231        unique: bool = True,
232        random_name_suffix: bool = False,
233    ) -> CloudDestination:
234        """Deploy a destination to the workspace.
235
236        Returns the newly deployed destination ID.
237
238        Args:
239            name: The name to use when deploying.
240            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
241                dictionary of configuration values.
242            unique: Whether to require a unique name. If `True`, duplicate names
243                are not allowed. Defaults to `True`.
244            random_name_suffix: Whether to append a random suffix to the name.
245        """
246        if isinstance(destination, Destination):
247            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
248            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
249            # raise ValueError(destination_conf_dict)
250        else:
251            destination_conf_dict = destination.copy()
252            if "destinationType" not in destination_conf_dict:
253                raise exc.PyAirbyteInputError(
254                    message="Missing `destinationType` in configuration dictionary.",
255                )
256
257        if random_name_suffix:
258            name += f" (ID: {text_util.generate_random_suffix()})"
259
260        if unique:
261            existing = self.list_destinations(name=name)
262            if existing:
263                raise exc.AirbyteDuplicateResourcesError(
264                    resource_type="destination",
265                    resource_name=name,
266                )
267
268        deployed_destination = api_util.create_destination(
269            name=name,
270            api_root=self.api_root,
271            workspace_id=self.workspace_id,
272            config=destination_conf_dict,  # Wants a dataclass but accepts dict
273            client_id=self.client_id,
274            client_secret=self.client_secret,
275        )
276        return CloudDestination(
277            workspace=self,
278            connector_id=deployed_destination.destination_id,
279        )
280
281    def permanently_delete_source(
282        self,
283        source: str | CloudSource,
284        *,
285        safe_mode: bool = True,
286    ) -> None:
287        """Delete a source from the workspace.
288
289        You can pass either the source ID `str` or a deployed `Source` object.
290
291        Args:
292            source: The source ID or CloudSource object to delete
293            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
294                (case insensitive) to prevent accidental deletion. Defaults to True.
295        """
296        if not isinstance(source, (str, CloudSource)):
297            raise exc.PyAirbyteInputError(
298                message="Invalid source type.",
299                input_value=type(source).__name__,
300            )
301
302        api_util.delete_source(
303            source_id=source.connector_id if isinstance(source, CloudSource) else source,
304            source_name=source.name if isinstance(source, CloudSource) else None,
305            api_root=self.api_root,
306            client_id=self.client_id,
307            client_secret=self.client_secret,
308            safe_mode=safe_mode,
309        )
310
311    # Deploy and delete destinations
312
313    def permanently_delete_destination(
314        self,
315        destination: str | CloudDestination,
316        *,
317        safe_mode: bool = True,
318    ) -> None:
319        """Delete a deployed destination from the workspace.
320
321        You can pass either the `Cache` class or the deployed destination ID as a `str`.
322
323        Args:
324            destination: The destination ID or CloudDestination object to delete
325            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
326                (case insensitive) to prevent accidental deletion. Defaults to True.
327        """
328        if not isinstance(destination, (str, CloudDestination)):
329            raise exc.PyAirbyteInputError(
330                message="Invalid destination type.",
331                input_value=type(destination).__name__,
332            )
333
334        api_util.delete_destination(
335            destination_id=(
336                destination if isinstance(destination, str) else destination.destination_id
337            ),
338            destination_name=(
339                destination.name if isinstance(destination, CloudDestination) else None
340            ),
341            api_root=self.api_root,
342            client_id=self.client_id,
343            client_secret=self.client_secret,
344            safe_mode=safe_mode,
345        )
346
347    # Deploy and delete connections
348
349    def deploy_connection(
350        self,
351        connection_name: str,
352        *,
353        source: CloudSource | str,
354        selected_streams: list[str],
355        destination: CloudDestination | str,
356        table_prefix: str | None = None,
357    ) -> CloudConnection:
358        """Create a new connection between an already deployed source and destination.
359
360        Returns the newly deployed connection object.
361
362        Args:
363            connection_name: The name of the connection.
364            source: The deployed source. You can pass a source ID or a CloudSource object.
365            destination: The deployed destination. You can pass a destination ID or a
366                CloudDestination object.
367            table_prefix: Optional. The table prefix to use when syncing to the destination.
368            selected_streams: The selected stream names to sync within the connection.
369        """
370        if not selected_streams:
371            raise exc.PyAirbyteInputError(
372                guidance="You must provide `selected_streams` when creating a connection."
373            )
374
375        source_id: str = source if isinstance(source, str) else source.connector_id
376        destination_id: str = (
377            destination if isinstance(destination, str) else destination.connector_id
378        )
379
380        deployed_connection = api_util.create_connection(
381            name=connection_name,
382            source_id=source_id,
383            destination_id=destination_id,
384            api_root=self.api_root,
385            workspace_id=self.workspace_id,
386            selected_stream_names=selected_streams,
387            prefix=table_prefix or "",
388            client_id=self.client_id,
389            client_secret=self.client_secret,
390        )
391
392        return CloudConnection(
393            workspace=self,
394            connection_id=deployed_connection.connection_id,
395            source=deployed_connection.source_id,
396            destination=deployed_connection.destination_id,
397        )
398
399    def permanently_delete_connection(
400        self,
401        connection: str | CloudConnection,
402        *,
403        cascade_delete_source: bool = False,
404        cascade_delete_destination: bool = False,
405        safe_mode: bool = True,
406    ) -> None:
407        """Delete a deployed connection from the workspace.
408
409        Args:
410            connection: The connection ID or CloudConnection object to delete
411            cascade_delete_source: If True, also delete the source after deleting the connection
412            cascade_delete_destination: If True, also delete the destination after deleting
413                the connection
414            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
415                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
416                to cascade deletes.
417        """
418        if connection is None:
419            raise ValueError("No connection ID provided.")
420
421        if isinstance(connection, str):
422            connection = CloudConnection(
423                workspace=self,
424                connection_id=connection,
425            )
426
427        api_util.delete_connection(
428            connection_id=connection.connection_id,
429            connection_name=connection.name,
430            api_root=self.api_root,
431            workspace_id=self.workspace_id,
432            client_id=self.client_id,
433            client_secret=self.client_secret,
434            safe_mode=safe_mode,
435        )
436
437        if cascade_delete_source:
438            self.permanently_delete_source(
439                source=connection.source_id,
440                safe_mode=safe_mode,
441            )
442        if cascade_delete_destination:
443            self.permanently_delete_destination(
444                destination=connection.destination_id,
445                safe_mode=safe_mode,
446            )
447
448    # List sources, destinations, and connections
449
450    def list_connections(
451        self,
452        name: str | None = None,
453        *,
454        name_filter: Callable | None = None,
455    ) -> list[CloudConnection]:
456        """List connections by name in the workspace.
457
458        TODO: Add pagination support
459        """
460        connections = api_util.list_connections(
461            api_root=self.api_root,
462            workspace_id=self.workspace_id,
463            name=name,
464            name_filter=name_filter,
465            client_id=self.client_id,
466            client_secret=self.client_secret,
467        )
468        return [
469            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
470                workspace=self,
471                connection_response=connection,
472            )
473            for connection in connections
474            if name is None or connection.name == name
475        ]
476
477    def list_sources(
478        self,
479        name: str | None = None,
480        *,
481        name_filter: Callable | None = None,
482    ) -> list[CloudSource]:
483        """List all sources in the workspace.
484
485        TODO: Add pagination support
486        """
487        sources = api_util.list_sources(
488            api_root=self.api_root,
489            workspace_id=self.workspace_id,
490            name=name,
491            name_filter=name_filter,
492            client_id=self.client_id,
493            client_secret=self.client_secret,
494        )
495        return [
496            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
497                workspace=self,
498                source_response=source,
499            )
500            for source in sources
501            if name is None or source.name == name
502        ]
503
504    def list_destinations(
505        self,
506        name: str | None = None,
507        *,
508        name_filter: Callable | None = None,
509    ) -> list[CloudDestination]:
510        """List all destinations in the workspace.
511
512        TODO: Add pagination support
513        """
514        destinations = api_util.list_destinations(
515            api_root=self.api_root,
516            workspace_id=self.workspace_id,
517            name=name,
518            name_filter=name_filter,
519            client_id=self.client_id,
520            client_secret=self.client_secret,
521        )
522        return [
523            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
524                workspace=self,
525                destination_response=destination,
526            )
527            for destination in destinations
528            if name is None or destination.name == name
529        ]
530
531    def publish_custom_source_definition(
532        self,
533        name: str,
534        *,
535        manifest_yaml: dict[str, Any] | Path | str | None = None,
536        docker_image: str | None = None,
537        docker_tag: str | None = None,
538        unique: bool = True,
539        pre_validate: bool = True,
540        testing_values: dict[str, Any] | None = None,
541    ) -> CustomCloudSourceDefinition:
542        """Publish a custom source connector definition.
543
544        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
545        and docker_tag (for Docker connectors), but not both.
546
547        Args:
548            name: Display name for the connector definition
549            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
550            docker_image: Docker repository (e.g., 'airbyte/source-custom')
551            docker_tag: Docker image tag (e.g., '1.0.0')
552            unique: Whether to enforce name uniqueness
553            pre_validate: Whether to validate manifest client-side (YAML only)
554            testing_values: Optional configuration values to use for testing in the
555                Connector Builder UI. If provided, these values are stored as the complete
556                testing values object for the connector builder project (replaces any existing
557                values), allowing immediate test read operations.
558
559        Returns:
560            CustomCloudSourceDefinition object representing the created definition
561
562        Raises:
563            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
564            AirbyteDuplicateResourcesError: If unique=True and name already exists
565        """
566        is_yaml = manifest_yaml is not None
567        is_docker = docker_image is not None
568
569        if is_yaml == is_docker:
570            raise exc.PyAirbyteInputError(
571                message=(
572                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
573                    "docker_image + docker_tag (for Docker connectors), but not both"
574                ),
575                context={
576                    "manifest_yaml_provided": is_yaml,
577                    "docker_image_provided": is_docker,
578                },
579            )
580
581        if is_docker and docker_tag is None:
582            raise exc.PyAirbyteInputError(
583                message="docker_tag is required when docker_image is specified",
584                context={"docker_image": docker_image},
585            )
586
587        if unique:
588            existing = self.list_custom_source_definitions(
589                definition_type="yaml" if is_yaml else "docker",
590            )
591            if any(d.name == name for d in existing):
592                raise exc.AirbyteDuplicateResourcesError(
593                    resource_type="custom_source_definition",
594                    resource_name=name,
595                )
596
597        if is_yaml:
598            manifest_dict: dict[str, Any]
599            if isinstance(manifest_yaml, Path):
600                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
601            elif isinstance(manifest_yaml, str):
602                manifest_dict = yaml.safe_load(manifest_yaml)
603            elif manifest_yaml is not None:
604                manifest_dict = manifest_yaml
605            else:
606                raise exc.PyAirbyteInputError(
607                    message="manifest_yaml is required for YAML connectors",
608                    context={"name": name},
609                )
610
611            if pre_validate:
612                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
613
614            result = api_util.create_custom_yaml_source_definition(
615                name=name,
616                workspace_id=self.workspace_id,
617                manifest=manifest_dict,
618                api_root=self.api_root,
619                client_id=self.client_id,
620                client_secret=self.client_secret,
621            )
622            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
623                self, result
624            )
625
626            # Set testing values if provided
627            if testing_values is not None:
628                custom_definition.set_testing_values(testing_values)
629
630            return custom_definition
631
632        raise NotImplementedError(
633            "Docker custom source definitions are not yet supported. "
634            "Only YAML manifest-based custom sources are currently available."
635        )
636
637    def list_custom_source_definitions(
638        self,
639        *,
640        definition_type: Literal["yaml", "docker"],
641    ) -> list[CustomCloudSourceDefinition]:
642        """List custom source connector definitions.
643
644        Args:
645            definition_type: Connector type to list ("yaml" or "docker"). Required.
646
647        Returns:
648            List of CustomCloudSourceDefinition objects matching the specified type
649        """
650        if definition_type == "yaml":
651            yaml_definitions = api_util.list_custom_yaml_source_definitions(
652                workspace_id=self.workspace_id,
653                api_root=self.api_root,
654                client_id=self.client_id,
655                client_secret=self.client_secret,
656            )
657            return [
658                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
659                for d in yaml_definitions
660            ]
661
662        raise NotImplementedError(
663            "Docker custom source definitions are not yet supported. "
664            "Only YAML manifest-based custom sources are currently available."
665        )
666
667    def get_custom_source_definition(
668        self,
669        definition_id: str,
670        *,
671        definition_type: Literal["yaml", "docker"],
672    ) -> CustomCloudSourceDefinition:
673        """Get a specific custom source definition by ID.
674
675        Args:
676            definition_id: The definition ID
677            definition_type: Connector type ("yaml" or "docker"). Required.
678
679        Returns:
680            CustomCloudSourceDefinition object
681        """
682        if definition_type == "yaml":
683            result = api_util.get_custom_yaml_source_definition(
684                workspace_id=self.workspace_id,
685                definition_id=definition_id,
686                api_root=self.api_root,
687                client_id=self.client_id,
688                client_secret=self.client_secret,
689            )
690            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
691
692        raise NotImplementedError(
693            "Docker custom source definitions are not yet supported. "
694            "Only YAML manifest-based custom sources are currently available."
695        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
83    @property
84    def workspace_url(self) -> str | None:
85        """The web URL of the workspace."""
86        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

organization_id: str | None
101    @property
102    def organization_id(self) -> str | None:
103        """The ID of the organization this workspace belongs to.
104
105        This value is cached after the first lookup.
106        """
107        return self._organization_info.get("organizationId")

The ID of the organization this workspace belongs to.

This value is cached after the first lookup.

organization_name: str | None
109    @property
110    def organization_name(self) -> str | None:
111        """The name of the organization this workspace belongs to.
112
113        This value is cached after the first lookup.
114        """
115        return self._organization_info.get("organizationName")

The name of the organization this workspace belongs to.

This value is cached after the first lookup.

def connect(self) -> None:
119    def connect(self) -> None:
120        """Check that the workspace is reachable and raise an exception otherwise.
121
122        Note: It is not necessary to call this method before calling other operations. It
123              serves primarily as a simple check to ensure that the workspace is reachable
124              and credentials are correct.
125        """
126        _ = api_util.get_workspace(
127            api_root=self.api_root,
128            workspace_id=self.workspace_id,
129            client_id=self.client_id,
130            client_secret=self.client_secret,
131        )
132        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
136    def get_connection(
137        self,
138        connection_id: str,
139    ) -> CloudConnection:
140        """Get a connection by ID.
141
142        This method does not fetch data from the API. It returns a `CloudConnection` object,
143        which will be loaded lazily as needed.
144        """
145        return CloudConnection(
146            workspace=self,
147            connection_id=connection_id,
148        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
150    def get_source(
151        self,
152        source_id: str,
153    ) -> CloudSource:
154        """Get a source by ID.
155
156        This method does not fetch data from the API. It returns a `CloudSource` object,
157        which will be loaded lazily as needed.
158        """
159        return CloudSource(
160            workspace=self,
161            connector_id=source_id,
162        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
164    def get_destination(
165        self,
166        destination_id: str,
167    ) -> CloudDestination:
168        """Get a destination by ID.
169
170        This method does not fetch data from the API. It returns a `CloudDestination` object,
171        which will be loaded lazily as needed.
172        """
173        return CloudDestination(
174            workspace=self,
175            connector_id=destination_id,
176        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
180    def deploy_source(
181        self,
182        name: str,
183        source: Source,
184        *,
185        unique: bool = True,
186        random_name_suffix: bool = False,
187    ) -> CloudSource:
188        """Deploy a source to the workspace.
189
190        Returns the newly deployed source.
191
192        Args:
193            name: The name to use when deploying.
194            source: The source object to deploy.
195            unique: Whether to require a unique name. If `True`, duplicate names
196                are not allowed. Defaults to `True`.
197            random_name_suffix: Whether to append a random suffix to the name.
198        """
199        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
200        source_config_dict["sourceType"] = source.name.replace("source-", "")
201
202        if random_name_suffix:
203            name += f" (ID: {text_util.generate_random_suffix()})"
204
205        if unique:
206            existing = self.list_sources(name=name)
207            if existing:
208                raise exc.AirbyteDuplicateResourcesError(
209                    resource_type="source",
210                    resource_name=name,
211                )
212
213        deployed_source = api_util.create_source(
214            name=name,
215            api_root=self.api_root,
216            workspace_id=self.workspace_id,
217            config=source_config_dict,
218            client_id=self.client_id,
219            client_secret=self.client_secret,
220        )
221        return CloudSource(
222            workspace=self,
223            connector_id=deployed_source.source_id,
224        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
226    def deploy_destination(
227        self,
228        name: str,
229        destination: Destination | dict[str, Any],
230        *,
231        unique: bool = True,
232        random_name_suffix: bool = False,
233    ) -> CloudDestination:
234        """Deploy a destination to the workspace.
235
236        Returns the newly deployed destination ID.
237
238        Args:
239            name: The name to use when deploying.
240            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
241                dictionary of configuration values.
242            unique: Whether to require a unique name. If `True`, duplicate names
243                are not allowed. Defaults to `True`.
244            random_name_suffix: Whether to append a random suffix to the name.
245        """
246        if isinstance(destination, Destination):
247            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
248            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
249            # raise ValueError(destination_conf_dict)
250        else:
251            destination_conf_dict = destination.copy()
252            if "destinationType" not in destination_conf_dict:
253                raise exc.PyAirbyteInputError(
254                    message="Missing `destinationType` in configuration dictionary.",
255                )
256
257        if random_name_suffix:
258            name += f" (ID: {text_util.generate_random_suffix()})"
259
260        if unique:
261            existing = self.list_destinations(name=name)
262            if existing:
263                raise exc.AirbyteDuplicateResourcesError(
264                    resource_type="destination",
265                    resource_name=name,
266                )
267
268        deployed_destination = api_util.create_destination(
269            name=name,
270            api_root=self.api_root,
271            workspace_id=self.workspace_id,
272            config=destination_conf_dict,  # Wants a dataclass but accepts dict
273            client_id=self.client_id,
274            client_secret=self.client_secret,
275        )
276        return CloudDestination(
277            workspace=self,
278            connector_id=deployed_destination.destination_id,
279        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
281    def permanently_delete_source(
282        self,
283        source: str | CloudSource,
284        *,
285        safe_mode: bool = True,
286    ) -> None:
287        """Delete a source from the workspace.
288
289        You can pass either the source ID `str` or a deployed `Source` object.
290
291        Args:
292            source: The source ID or CloudSource object to delete
293            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
294                (case insensitive) to prevent accidental deletion. Defaults to True.
295        """
296        if not isinstance(source, (str, CloudSource)):
297            raise exc.PyAirbyteInputError(
298                message="Invalid source type.",
299                input_value=type(source).__name__,
300            )
301
302        api_util.delete_source(
303            source_id=source.connector_id if isinstance(source, CloudSource) else source,
304            source_name=source.name if isinstance(source, CloudSource) else None,
305            api_root=self.api_root,
306            client_id=self.client_id,
307            client_secret=self.client_secret,
308            safe_mode=safe_mode,
309        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
313    def permanently_delete_destination(
314        self,
315        destination: str | CloudDestination,
316        *,
317        safe_mode: bool = True,
318    ) -> None:
319        """Delete a deployed destination from the workspace.
320
321        You can pass either the `Cache` class or the deployed destination ID as a `str`.
322
323        Args:
324            destination: The destination ID or CloudDestination object to delete
325            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
326                (case insensitive) to prevent accidental deletion. Defaults to True.
327        """
328        if not isinstance(destination, (str, CloudDestination)):
329            raise exc.PyAirbyteInputError(
330                message="Invalid destination type.",
331                input_value=type(destination).__name__,
332            )
333
334        api_util.delete_destination(
335            destination_id=(
336                destination if isinstance(destination, str) else destination.destination_id
337            ),
338            destination_name=(
339                destination.name if isinstance(destination, CloudDestination) else None
340            ),
341            api_root=self.api_root,
342            client_id=self.client_id,
343            client_secret=self.client_secret,
344            safe_mode=safe_mode,
345        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
349    def deploy_connection(
350        self,
351        connection_name: str,
352        *,
353        source: CloudSource | str,
354        selected_streams: list[str],
355        destination: CloudDestination | str,
356        table_prefix: str | None = None,
357    ) -> CloudConnection:
358        """Create a new connection between an already deployed source and destination.
359
360        Returns the newly deployed connection object.
361
362        Args:
363            connection_name: The name of the connection.
364            source: The deployed source. You can pass a source ID or a CloudSource object.
365            destination: The deployed destination. You can pass a destination ID or a
366                CloudDestination object.
367            table_prefix: Optional. The table prefix to use when syncing to the destination.
368            selected_streams: The selected stream names to sync within the connection.
369        """
370        if not selected_streams:
371            raise exc.PyAirbyteInputError(
372                guidance="You must provide `selected_streams` when creating a connection."
373            )
374
375        source_id: str = source if isinstance(source, str) else source.connector_id
376        destination_id: str = (
377            destination if isinstance(destination, str) else destination.connector_id
378        )
379
380        deployed_connection = api_util.create_connection(
381            name=connection_name,
382            source_id=source_id,
383            destination_id=destination_id,
384            api_root=self.api_root,
385            workspace_id=self.workspace_id,
386            selected_stream_names=selected_streams,
387            prefix=table_prefix or "",
388            client_id=self.client_id,
389            client_secret=self.client_secret,
390        )
391
392        return CloudConnection(
393            workspace=self,
394            connection_id=deployed_connection.connection_id,
395            source=deployed_connection.source_id,
396            destination=deployed_connection.destination_id,
397        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
399    def permanently_delete_connection(
400        self,
401        connection: str | CloudConnection,
402        *,
403        cascade_delete_source: bool = False,
404        cascade_delete_destination: bool = False,
405        safe_mode: bool = True,
406    ) -> None:
407        """Delete a deployed connection from the workspace.
408
409        Args:
410            connection: The connection ID or CloudConnection object to delete
411            cascade_delete_source: If True, also delete the source after deleting the connection
412            cascade_delete_destination: If True, also delete the destination after deleting
413                the connection
414            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
415                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
416                to cascade deletes.
417        """
418        if connection is None:
419            raise ValueError("No connection ID provided.")
420
421        if isinstance(connection, str):
422            connection = CloudConnection(
423                workspace=self,
424                connection_id=connection,
425            )
426
427        api_util.delete_connection(
428            connection_id=connection.connection_id,
429            connection_name=connection.name,
430            api_root=self.api_root,
431            workspace_id=self.workspace_id,
432            client_id=self.client_id,
433            client_secret=self.client_secret,
434            safe_mode=safe_mode,
435        )
436
437        if cascade_delete_source:
438            self.permanently_delete_source(
439                source=connection.source_id,
440                safe_mode=safe_mode,
441            )
442        if cascade_delete_destination:
443            self.permanently_delete_destination(
444                destination=connection.destination_id,
445                safe_mode=safe_mode,
446            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
450    def list_connections(
451        self,
452        name: str | None = None,
453        *,
454        name_filter: Callable | None = None,
455    ) -> list[CloudConnection]:
456        """List connections by name in the workspace.
457
458        TODO: Add pagination support
459        """
460        connections = api_util.list_connections(
461            api_root=self.api_root,
462            workspace_id=self.workspace_id,
463            name=name,
464            name_filter=name_filter,
465            client_id=self.client_id,
466            client_secret=self.client_secret,
467        )
468        return [
469            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
470                workspace=self,
471                connection_response=connection,
472            )
473            for connection in connections
474            if name is None or connection.name == name
475        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
477    def list_sources(
478        self,
479        name: str | None = None,
480        *,
481        name_filter: Callable | None = None,
482    ) -> list[CloudSource]:
483        """List all sources in the workspace.
484
485        TODO: Add pagination support
486        """
487        sources = api_util.list_sources(
488            api_root=self.api_root,
489            workspace_id=self.workspace_id,
490            name=name,
491            name_filter=name_filter,
492            client_id=self.client_id,
493            client_secret=self.client_secret,
494        )
495        return [
496            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
497                workspace=self,
498                source_response=source,
499            )
500            for source in sources
501            if name is None or source.name == name
502        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
504    def list_destinations(
505        self,
506        name: str | None = None,
507        *,
508        name_filter: Callable | None = None,
509    ) -> list[CloudDestination]:
510        """List all destinations in the workspace.
511
512        TODO: Add pagination support
513        """
514        destinations = api_util.list_destinations(
515            api_root=self.api_root,
516            workspace_id=self.workspace_id,
517            name=name,
518            name_filter=name_filter,
519            client_id=self.client_id,
520            client_secret=self.client_secret,
521        )
522        return [
523            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
524                workspace=self,
525                destination_response=destination,
526            )
527            for destination in destinations
528            if name is None or destination.name == name
529        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
531    def publish_custom_source_definition(
532        self,
533        name: str,
534        *,
535        manifest_yaml: dict[str, Any] | Path | str | None = None,
536        docker_image: str | None = None,
537        docker_tag: str | None = None,
538        unique: bool = True,
539        pre_validate: bool = True,
540        testing_values: dict[str, Any] | None = None,
541    ) -> CustomCloudSourceDefinition:
542        """Publish a custom source connector definition.
543
544        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
545        and docker_tag (for Docker connectors), but not both.
546
547        Args:
548            name: Display name for the connector definition
549            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
550            docker_image: Docker repository (e.g., 'airbyte/source-custom')
551            docker_tag: Docker image tag (e.g., '1.0.0')
552            unique: Whether to enforce name uniqueness
553            pre_validate: Whether to validate manifest client-side (YAML only)
554            testing_values: Optional configuration values to use for testing in the
555                Connector Builder UI. If provided, these values are stored as the complete
556                testing values object for the connector builder project (replaces any existing
557                values), allowing immediate test read operations.
558
559        Returns:
560            CustomCloudSourceDefinition object representing the created definition
561
562        Raises:
563            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
564            AirbyteDuplicateResourcesError: If unique=True and name already exists
565        """
566        is_yaml = manifest_yaml is not None
567        is_docker = docker_image is not None
568
569        if is_yaml == is_docker:
570            raise exc.PyAirbyteInputError(
571                message=(
572                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
573                    "docker_image + docker_tag (for Docker connectors), but not both"
574                ),
575                context={
576                    "manifest_yaml_provided": is_yaml,
577                    "docker_image_provided": is_docker,
578                },
579            )
580
581        if is_docker and docker_tag is None:
582            raise exc.PyAirbyteInputError(
583                message="docker_tag is required when docker_image is specified",
584                context={"docker_image": docker_image},
585            )
586
587        if unique:
588            existing = self.list_custom_source_definitions(
589                definition_type="yaml" if is_yaml else "docker",
590            )
591            if any(d.name == name for d in existing):
592                raise exc.AirbyteDuplicateResourcesError(
593                    resource_type="custom_source_definition",
594                    resource_name=name,
595                )
596
597        if is_yaml:
598            manifest_dict: dict[str, Any]
599            if isinstance(manifest_yaml, Path):
600                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
601            elif isinstance(manifest_yaml, str):
602                manifest_dict = yaml.safe_load(manifest_yaml)
603            elif manifest_yaml is not None:
604                manifest_dict = manifest_yaml
605            else:
606                raise exc.PyAirbyteInputError(
607                    message="manifest_yaml is required for YAML connectors",
608                    context={"name": name},
609                )
610
611            if pre_validate:
612                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
613
614            result = api_util.create_custom_yaml_source_definition(
615                name=name,
616                workspace_id=self.workspace_id,
617                manifest=manifest_dict,
618                api_root=self.api_root,
619                client_id=self.client_id,
620                client_secret=self.client_secret,
621            )
622            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
623                self, result
624            )
625
626            # Set testing values if provided
627            if testing_values is not None:
628                custom_definition.set_testing_values(testing_values)
629
630            return custom_definition
631
632        raise NotImplementedError(
633            "Docker custom source definitions are not yet supported. "
634            "Only YAML manifest-based custom sources are currently available."
635        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
637    def list_custom_source_definitions(
638        self,
639        *,
640        definition_type: Literal["yaml", "docker"],
641    ) -> list[CustomCloudSourceDefinition]:
642        """List custom source connector definitions.
643
644        Args:
645            definition_type: Connector type to list ("yaml" or "docker"). Required.
646
647        Returns:
648            List of CustomCloudSourceDefinition objects matching the specified type
649        """
650        if definition_type == "yaml":
651            yaml_definitions = api_util.list_custom_yaml_source_definitions(
652                workspace_id=self.workspace_id,
653                api_root=self.api_root,
654                client_id=self.client_id,
655                client_secret=self.client_secret,
656            )
657            return [
658                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
659                for d in yaml_definitions
660            ]
661
662        raise NotImplementedError(
663            "Docker custom source definitions are not yet supported. "
664            "Only YAML manifest-based custom sources are currently available."
665        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
667    def get_custom_source_definition(
668        self,
669        definition_id: str,
670        *,
671        definition_type: Literal["yaml", "docker"],
672    ) -> CustomCloudSourceDefinition:
673        """Get a specific custom source definition by ID.
674
675        Args:
676            definition_id: The definition ID
677            definition_type: Connector type ("yaml" or "docker"). Required.
678
679        Returns:
680            CustomCloudSourceDefinition object
681        """
682        if definition_type == "yaml":
683            result = api_util.get_custom_yaml_source_definition(
684                workspace_id=self.workspace_id,
685                definition_id=definition_id,
686                api_root=self.api_root,
687                client_id=self.client_id,
688                client_secret=self.client_secret,
689            )
690            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
691
692        raise NotImplementedError(
693            "Docker custom source definitions are not yet supported. "
694            "Only YAML manifest-based custom sources are currently available."
695        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object

class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    @classmethod
 69    def _from_connection_response(
 70        cls,
 71        workspace: CloudWorkspace,
 72        connection_response: ConnectionResponse,
 73    ) -> CloudConnection:
 74        """Create a CloudConnection from a ConnectionResponse."""
 75        result = cls(
 76            workspace=workspace,
 77            connection_id=connection_response.connection_id,
 78            source=connection_response.source_id,
 79            destination=connection_response.destination_id,
 80        )
 81        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
 82        return result
 83
 84    # Properties
 85
 86    @property
 87    def name(self) -> str | None:
 88        """Get the display name of the connection, if available.
 89
 90        E.g. "My Postgres to Snowflake", not the connection ID.
 91        """
 92        if not self._connection_info:
 93            self._connection_info = self._fetch_connection_info()
 94
 95        return self._connection_info.name
 96
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return self._source_id
107
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object
119
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.destination_id
128
129        return self._destination_id
130
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object
142
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]
150
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""
158
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
163
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"
168
169    # Run Sync
170
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result
199
200    def __repr__(self) -> str:
201        """String representation of the connection."""
202        return (
203            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
204            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
205        )
206
207    # Logs
208
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]
232
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )
259
260    def rename(self, name: str) -> CloudConnection:
261        """Rename the connection.
262
263        Args:
264            name: New name for the connection
265
266        Returns:
267            Updated CloudConnection object with refreshed info
268        """
269        updated_response = api_util.patch_connection(
270            connection_id=self.connection_id,
271            api_root=self.workspace.api_root,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            name=name,
275        )
276        self._connection_info = updated_response
277        return self
278
279    def set_table_prefix(self, prefix: str) -> CloudConnection:
280        """Set the table prefix for the connection.
281
282        Args:
283            prefix: New table prefix to use when syncing to the destination
284
285        Returns:
286            Updated CloudConnection object with refreshed info
287        """
288        updated_response = api_util.patch_connection(
289            connection_id=self.connection_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            prefix=prefix,
294        )
295        self._connection_info = updated_response
296        return self
297
298    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
299        """Set the selected streams for the connection.
300
301        This is a destructive operation that can break existing connections if the
302        stream selection is changed incorrectly. Use with caution.
303
304        Args:
305            stream_names: List of stream names to sync
306
307        Returns:
308            Updated CloudConnection object with refreshed info
309        """
310        configurations = api_util.build_stream_configurations(stream_names)
311
312        updated_response = api_util.patch_connection(
313            connection_id=self.connection_id,
314            api_root=self.workspace.api_root,
315            client_id=self.workspace.client_id,
316            client_secret=self.workspace.client_secret,
317            configurations=configurations,
318        )
319        self._connection_info = updated_response
320        return self
321
322    # Deletions
323
324    def permanently_delete(
325        self,
326        *,
327        cascade_delete_source: bool = False,
328        cascade_delete_destination: bool = False,
329    ) -> None:
330        """Delete the connection.
331
332        Args:
333            cascade_delete_source: Whether to also delete the source.
334            cascade_delete_destination: Whether to also delete the destination.
335        """
336        self.workspace.permanently_delete_connection(self)
337
338        if cascade_delete_source:
339            self.workspace.permanently_delete_source(self.source_id)
340
341        if cascade_delete_destination:
342            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

name: str | None
86    @property
87    def name(self) -> str | None:
88        """Get the display name of the connection, if available.
89
90        E.g. "My Postgres to Snowflake", not the connection ID.
91        """
92        if not self._connection_info:
93            self._connection_info = self._fetch_connection_info()
94
95        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object

Get the source object.

destination_id: str
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.destination_id
128
129        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def rename(self, name: str) -> CloudConnection:
260    def rename(self, name: str) -> CloudConnection:
261        """Rename the connection.
262
263        Args:
264            name: New name for the connection
265
266        Returns:
267            Updated CloudConnection object with refreshed info
268        """
269        updated_response = api_util.patch_connection(
270            connection_id=self.connection_id,
271            api_root=self.workspace.api_root,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            name=name,
275        )
276        self._connection_info = updated_response
277        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
279    def set_table_prefix(self, prefix: str) -> CloudConnection:
280        """Set the table prefix for the connection.
281
282        Args:
283            prefix: New table prefix to use when syncing to the destination
284
285        Returns:
286            Updated CloudConnection object with refreshed info
287        """
288        updated_response = api_util.patch_connection(
289            connection_id=self.connection_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            prefix=prefix,
294        )
295        self._connection_info = updated_response
296        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
298    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
299        """Set the selected streams for the connection.
300
301        This is a destructive operation that can break existing connections if the
302        stream selection is changed incorrectly. Use with caution.
303
304        Args:
305            stream_names: List of stream names to sync
306
307        Returns:
308            Updated CloudConnection object with refreshed info
309        """
310        configurations = api_util.build_stream_configurations(stream_names)
311
312        updated_response = api_util.patch_connection(
313            connection_id=self.connection_id,
314            api_root=self.workspace.api_root,
315            client_id=self.workspace.client_id,
316            client_secret=self.workspace.client_secret,
317            configurations=configurations,
318        )
319        self._connection_info = updated_response
320        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
324    def permanently_delete(
325        self,
326        *,
327        cascade_delete_source: bool = False,
328        cascade_delete_destination: bool = False,
329    ) -> None:
330        """Delete the connection.
331
332        Args:
333            cascade_delete_source: Whether to also delete the source.
334            cascade_delete_destination: Whether to also delete the destination.
335        """
336        self.workspace.permanently_delete_connection(self)
337
338        if cascade_delete_source:
339            self.workspace.permanently_delete_source(self.source_id)
340
341        if cascade_delete_destination:
342            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259        )
260        return self._connection_response
261
262    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
263        """Return the destination configuration for the sync job."""
264        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
265        destination_response = api_util.get_destination(
266            destination_id=connection_info.destination_id,
267            api_root=self.workspace.api_root,
268            client_id=self.workspace.client_id,
269            client_secret=self.workspace.client_secret,
270        )
271        return asdict(destination_response.configuration)
272
273    def is_job_complete(self) -> bool:
274        """Check if the sync job is complete."""
275        return self.get_job_status() in FINAL_STATUSES
276
277    def get_job_status(self) -> JobStatusEnum:
278        """Check if the sync job is still running."""
279        return self._fetch_latest_job_info().status
280
281    def _fetch_latest_job_info(self) -> JobResponse:
282        """Return the job info for the sync job."""
283        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
284            return self._latest_job_info
285
286        self._latest_job_info = api_util.get_job_info(
287            job_id=self.job_id,
288            api_root=self.workspace.api_root,
289            client_id=self.workspace.client_id,
290            client_secret=self.workspace.client_secret,
291        )
292        return self._latest_job_info
293
294    @property
295    def bytes_synced(self) -> int:
296        """Return the number of records processed."""
297        return self._fetch_latest_job_info().bytes_synced or 0
298
299    @property
300    def records_synced(self) -> int:
301        """Return the number of records processed."""
302        return self._fetch_latest_job_info().rows_synced or 0
303
304    @property
305    def start_time(self) -> datetime:
306        """Return the start time of the sync job in UTC."""
307        try:
308            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
309        except (ValueError, TypeError) as e:
310            if "Invalid isoformat string" in str(e):
311                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
312                    api_root=self.workspace.api_root,
313                    path="/jobs/get",
314                    json={"id": self.job_id},
315                    client_id=self.workspace.client_id,
316                    client_secret=self.workspace.client_secret,
317                )
318                raw_start_time = job_info_raw.get("startTime")
319                if raw_start_time:
320                    return ab_datetime_parse(raw_start_time)
321            raise
322
323    def _fetch_job_with_attempts(self) -> dict[str, Any]:
324        """Fetch job info with attempts from Config API using lazy loading pattern."""
325        if self._job_with_attempts_info is not None:
326            return self._job_with_attempts_info
327
328        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
329            api_root=self.workspace.api_root,
330            path="/jobs/get",
331            json={
332                "id": self.job_id,
333            },
334            client_id=self.workspace.client_id,
335            client_secret=self.workspace.client_secret,
336        )
337        return self._job_with_attempts_info
338
339    def get_attempts(self) -> list[SyncAttempt]:
340        """Return a list of attempts for this sync job."""
341        job_with_attempts = self._fetch_job_with_attempts()
342        attempts_data = job_with_attempts.get("attempts", [])
343
344        return [
345            SyncAttempt(
346                workspace=self.workspace,
347                connection=self.connection,
348                job_id=self.job_id,
349                attempt_number=i,
350                _attempt_data=attempt_data,
351            )
352            for i, attempt_data in enumerate(attempts_data, start=0)
353        ]
354
355    def raise_failure_status(
356        self,
357        *,
358        refresh_status: bool = False,
359    ) -> None:
360        """Raise an exception if the sync job failed.
361
362        By default, this method will use the latest status available. If you want to refresh the
363        status before checking for failure, set `refresh_status=True`. If the job has failed, this
364        method will raise a `AirbyteConnectionSyncError`.
365
366        Otherwise, do nothing.
367        """
368        if not refresh_status and self._latest_job_info:
369            latest_status = self._latest_job_info.status
370        else:
371            latest_status = self.get_job_status()
372
373        if latest_status in FAILED_STATUSES:
374            raise AirbyteConnectionSyncError(
375                workspace=self.workspace,
376                connection_id=self.connection.connection_id,
377                job_id=self.job_id,
378                job_status=self.get_job_status(),
379            )
380
381    def wait_for_completion(
382        self,
383        *,
384        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
385        raise_timeout: bool = True,
386        raise_failure: bool = False,
387    ) -> JobStatusEnum:
388        """Wait for a job to finish running."""
389        start_time = time.time()
390        while True:
391            latest_status = self.get_job_status()
392            if latest_status in FINAL_STATUSES:
393                if raise_failure:
394                    # No-op if the job succeeded or is still running:
395                    self.raise_failure_status()
396
397                return latest_status
398
399            if time.time() - start_time > wait_timeout:
400                if raise_timeout:
401                    raise AirbyteConnectionSyncTimeoutError(
402                        workspace=self.workspace,
403                        connection_id=self.connection.connection_id,
404                        job_id=self.job_id,
405                        job_status=latest_status,
406                        timeout=wait_timeout,
407                    )
408
409                return latest_status  # This will be a non-final status
410
411            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
412
413    def get_sql_cache(self) -> CacheBase:
414        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
415        if self._cache:
416            return self._cache
417
418        destination_configuration = self._get_destination_configuration()
419        self._cache = destination_to_cache(destination_configuration=destination_configuration)
420        return self._cache
421
422    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
423        """Return a SQL Engine for querying a SQL-based destination."""
424        return self.get_sql_cache().get_sql_engine()
425
426    def get_sql_table_name(self, stream_name: str) -> str:
427        """Return the SQL table name of the named stream."""
428        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
429
430    def get_sql_table(
431        self,
432        stream_name: str,
433    ) -> sqlalchemy.Table:
434        """Return a SQLAlchemy table object for the named stream."""
435        return self.get_sql_cache().processor.get_sql_table(stream_name)
436
437    def get_dataset(self, stream_name: str) -> CachedDataset:
438        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
439
440        This can be used to read and analyze the data in a SQL-based destination.
441
442        TODO: In a future iteration, we can consider providing stream configuration information
443              (catalog information) to the `CachedDataset` object via the "Get stream properties"
444              API: https://reference.airbyte.com/reference/getstreamproperties
445        """
446        return CachedDataset(
447            self.get_sql_cache(),
448            stream_name=stream_name,
449            stream_configuration=False,  # Don't look for stream configuration in cache.
450        )
451
452    def get_sql_database_name(self) -> str:
453        """Return the SQL database name."""
454        cache = self.get_sql_cache()
455        return cache.get_database_name()
456
457    def get_sql_schema_name(self) -> str:
458        """Return the SQL schema name."""
459        cache = self.get_sql_cache()
460        return cache.schema_name
461
462    @property
463    def stream_names(self) -> list[str]:
464        """Return the set of stream names."""
465        return self.connection.stream_names
466
467    @final
468    @property
469    def streams(
470        self,
471    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
472        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
473
474        This is a convenience wrapper around the `stream_names`
475        property and `get_dataset()` method.
476        """
477        return self._SyncResultStreams(self)
478
479    class _SyncResultStreams(Mapping[str, CachedDataset]):
480        """A mapping of stream names to cached datasets."""
481
482        def __init__(
483            self,
484            parent: SyncResult,
485            /,
486        ) -> None:
487            self.parent: SyncResult = parent
488
489        def __getitem__(self, key: str) -> CachedDataset:
490            return self.parent.get_dataset(stream_name=key)
491
492        def __iter__(self) -> Iterator[str]:
493            return iter(self.parent.stream_names)
494
495        def __len__(self) -> int:
496            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
273    def is_job_complete(self) -> bool:
274        """Check if the sync job is complete."""
275        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
277    def get_job_status(self) -> JobStatusEnum:
278        """Check if the sync job is still running."""
279        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
294    @property
295    def bytes_synced(self) -> int:
296        """Return the number of records processed."""
297        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
299    @property
300    def records_synced(self) -> int:
301        """Return the number of records processed."""
302        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
304    @property
305    def start_time(self) -> datetime:
306        """Return the start time of the sync job in UTC."""
307        try:
308            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
309        except (ValueError, TypeError) as e:
310            if "Invalid isoformat string" in str(e):
311                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
312                    api_root=self.workspace.api_root,
313                    path="/jobs/get",
314                    json={"id": self.job_id},
315                    client_id=self.workspace.client_id,
316                    client_secret=self.workspace.client_secret,
317                )
318                raw_start_time = job_info_raw.get("startTime")
319                if raw_start_time:
320                    return ab_datetime_parse(raw_start_time)
321            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
339    def get_attempts(self) -> list[SyncAttempt]:
340        """Return a list of attempts for this sync job."""
341        job_with_attempts = self._fetch_job_with_attempts()
342        attempts_data = job_with_attempts.get("attempts", [])
343
344        return [
345            SyncAttempt(
346                workspace=self.workspace,
347                connection=self.connection,
348                job_id=self.job_id,
349                attempt_number=i,
350                _attempt_data=attempt_data,
351            )
352            for i, attempt_data in enumerate(attempts_data, start=0)
353        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
355    def raise_failure_status(
356        self,
357        *,
358        refresh_status: bool = False,
359    ) -> None:
360        """Raise an exception if the sync job failed.
361
362        By default, this method will use the latest status available. If you want to refresh the
363        status before checking for failure, set `refresh_status=True`. If the job has failed, this
364        method will raise a `AirbyteConnectionSyncError`.
365
366        Otherwise, do nothing.
367        """
368        if not refresh_status and self._latest_job_info:
369            latest_status = self._latest_job_info.status
370        else:
371            latest_status = self.get_job_status()
372
373        if latest_status in FAILED_STATUSES:
374            raise AirbyteConnectionSyncError(
375                workspace=self.workspace,
376                connection_id=self.connection.connection_id,
377                job_id=self.job_id,
378                job_status=self.get_job_status(),
379            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
381    def wait_for_completion(
382        self,
383        *,
384        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
385        raise_timeout: bool = True,
386        raise_failure: bool = False,
387    ) -> JobStatusEnum:
388        """Wait for a job to finish running."""
389        start_time = time.time()
390        while True:
391            latest_status = self.get_job_status()
392            if latest_status in FINAL_STATUSES:
393                if raise_failure:
394                    # No-op if the job succeeded or is still running:
395                    self.raise_failure_status()
396
397                return latest_status
398
399            if time.time() - start_time > wait_timeout:
400                if raise_timeout:
401                    raise AirbyteConnectionSyncTimeoutError(
402                        workspace=self.workspace,
403                        connection_id=self.connection.connection_id,
404                        job_id=self.job_id,
405                        job_status=latest_status,
406                        timeout=wait_timeout,
407                    )
408
409                return latest_status  # This will be a non-final status
410
411            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
413    def get_sql_cache(self) -> CacheBase:
414        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
415        if self._cache:
416            return self._cache
417
418        destination_configuration = self._get_destination_configuration()
419        self._cache = destination_to_cache(destination_configuration=destination_configuration)
420        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
422    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
423        """Return a SQL Engine for querying a SQL-based destination."""
424        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
426    def get_sql_table_name(self, stream_name: str) -> str:
427        """Return the SQL table name of the named stream."""
428        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
430    def get_sql_table(
431        self,
432        stream_name: str,
433    ) -> sqlalchemy.Table:
434        """Return a SQLAlchemy table object for the named stream."""
435        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
437    def get_dataset(self, stream_name: str) -> CachedDataset:
438        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
439
440        This can be used to read and analyze the data in a SQL-based destination.
441
442        TODO: In a future iteration, we can consider providing stream configuration information
443              (catalog information) to the `CachedDataset` object via the "Get stream properties"
444              API: https://reference.airbyte.com/reference/getstreamproperties
445        """
446        return CachedDataset(
447            self.get_sql_cache(),
448            stream_name=stream_name,
449            stream_configuration=False,  # Don't look for stream configuration in cache.
450        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
452    def get_sql_database_name(self) -> str:
453        """Return the SQL database name."""
454        cache = self.get_sql_cache()
455        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
457    def get_sql_schema_name(self) -> str:
458        """Return the SQL schema name."""
459        cache = self.get_sql_cache()
460        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
462    @property
463    def stream_names(self) -> list[str]:
464        """Return the set of stream names."""
465        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
467    @final
468    @property
469    def streams(
470        self,
471    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
472        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
473
474        This is a convenience wrapper around the `stream_names`
475        property and `get_dataset()` method.
476        """
477        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>