airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.connections import CloudConnection
57from airbyte.cloud.constants import JobStatusEnum
58from airbyte.cloud.sync_results import SyncResult
59from airbyte.cloud.workspaces import CloudWorkspace
60
61
62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
63if TYPE_CHECKING:
64    # ruff: noqa: TC004
65    from airbyte.cloud import connections, constants, sync_results, workspaces
66
67
68__all__ = [
69    # Submodules
70    "workspaces",
71    "connections",
72    "constants",
73    "sync_results",
74    # Classes
75    "CloudWorkspace",
76    "CloudConnection",
77    "SyncResult",
78    # Enums
79    "JobStatusEnum",
80]
@dataclass
class CloudWorkspace:
 64@dataclass
 65class CloudWorkspace:
 66    """A remote workspace on the Airbyte Cloud.
 67
 68    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 69    instances, both OSS and Enterprise.
 70    """
 71
 72    workspace_id: str
 73    client_id: SecretString
 74    client_secret: SecretString
 75    api_root: str = api_util.CLOUD_API_ROOT
 76
 77    def __post_init__(self) -> None:
 78        """Ensure that the client ID and secret are handled securely."""
 79        self.client_id = SecretString(self.client_id)
 80        self.client_secret = SecretString(self.client_secret)
 81
 82    @property
 83    def workspace_url(self) -> str | None:
 84        """The web URL of the workspace."""
 85        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 86
 87    # Test connection and creds
 88
 89    def connect(self) -> None:
 90        """Check that the workspace is reachable and raise an exception otherwise.
 91
 92        Note: It is not necessary to call this method before calling other operations. It
 93              serves primarily as a simple check to ensure that the workspace is reachable
 94              and credentials are correct.
 95        """
 96        _ = api_util.get_workspace(
 97            api_root=self.api_root,
 98            workspace_id=self.workspace_id,
 99            client_id=self.client_id,
100            client_secret=self.client_secret,
101        )
102        print(f"Successfully connected to workspace: {self.workspace_url}")
103
104    # Get sources, destinations, and connections
105
106    def get_connection(
107        self,
108        connection_id: str,
109    ) -> CloudConnection:
110        """Get a connection by ID.
111
112        This method does not fetch data from the API. It returns a `CloudConnection` object,
113        which will be loaded lazily as needed.
114        """
115        return CloudConnection(
116            workspace=self,
117            connection_id=connection_id,
118        )
119
120    def get_source(
121        self,
122        source_id: str,
123    ) -> CloudSource:
124        """Get a source by ID.
125
126        This method does not fetch data from the API. It returns a `CloudSource` object,
127        which will be loaded lazily as needed.
128        """
129        return CloudSource(
130            workspace=self,
131            connector_id=source_id,
132        )
133
134    def get_destination(
135        self,
136        destination_id: str,
137    ) -> CloudDestination:
138        """Get a destination by ID.
139
140        This method does not fetch data from the API. It returns a `CloudDestination` object,
141        which will be loaded lazily as needed.
142        """
143        return CloudDestination(
144            workspace=self,
145            connector_id=destination_id,
146        )
147
148    # Deploy sources and destinations
149
150    def deploy_source(
151        self,
152        name: str,
153        source: Source,
154        *,
155        unique: bool = True,
156        random_name_suffix: bool = False,
157    ) -> CloudSource:
158        """Deploy a source to the workspace.
159
160        Returns the newly deployed source.
161
162        Args:
163            name: The name to use when deploying.
164            source: The source object to deploy.
165            unique: Whether to require a unique name. If `True`, duplicate names
166                are not allowed. Defaults to `True`.
167            random_name_suffix: Whether to append a random suffix to the name.
168        """
169        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
170        source_config_dict["sourceType"] = source.name.replace("source-", "")
171
172        if random_name_suffix:
173            name += f" (ID: {text_util.generate_random_suffix()})"
174
175        if unique:
176            existing = self.list_sources(name=name)
177            if existing:
178                raise exc.AirbyteDuplicateResourcesError(
179                    resource_type="source",
180                    resource_name=name,
181                )
182
183        deployed_source = api_util.create_source(
184            name=name,
185            api_root=self.api_root,
186            workspace_id=self.workspace_id,
187            config=source_config_dict,
188            client_id=self.client_id,
189            client_secret=self.client_secret,
190        )
191        return CloudSource(
192            workspace=self,
193            connector_id=deployed_source.source_id,
194        )
195
196    def deploy_destination(
197        self,
198        name: str,
199        destination: Destination | dict[str, Any],
200        *,
201        unique: bool = True,
202        random_name_suffix: bool = False,
203    ) -> CloudDestination:
204        """Deploy a destination to the workspace.
205
206        Returns the newly deployed destination ID.
207
208        Args:
209            name: The name to use when deploying.
210            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
211                dictionary of configuration values.
212            unique: Whether to require a unique name. If `True`, duplicate names
213                are not allowed. Defaults to `True`.
214            random_name_suffix: Whether to append a random suffix to the name.
215        """
216        if isinstance(destination, Destination):
217            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
218            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
219            # raise ValueError(destination_conf_dict)
220        else:
221            destination_conf_dict = destination.copy()
222            if "destinationType" not in destination_conf_dict:
223                raise exc.PyAirbyteInputError(
224                    message="Missing `destinationType` in configuration dictionary.",
225                )
226
227        if random_name_suffix:
228            name += f" (ID: {text_util.generate_random_suffix()})"
229
230        if unique:
231            existing = self.list_destinations(name=name)
232            if existing:
233                raise exc.AirbyteDuplicateResourcesError(
234                    resource_type="destination",
235                    resource_name=name,
236                )
237
238        deployed_destination = api_util.create_destination(
239            name=name,
240            api_root=self.api_root,
241            workspace_id=self.workspace_id,
242            config=destination_conf_dict,  # Wants a dataclass but accepts dict
243            client_id=self.client_id,
244            client_secret=self.client_secret,
245        )
246        return CloudDestination(
247            workspace=self,
248            connector_id=deployed_destination.destination_id,
249        )
250
251    def permanently_delete_source(
252        self,
253        source: str | CloudSource,
254    ) -> None:
255        """Delete a source from the workspace.
256
257        You can pass either the source ID `str` or a deployed `Source` object.
258        """
259        if not isinstance(source, (str, CloudSource)):
260            raise exc.PyAirbyteInputError(
261                message="Invalid source type.",
262                input_value=type(source).__name__,
263            )
264
265        api_util.delete_source(
266            source_id=source.connector_id if isinstance(source, CloudSource) else source,
267            api_root=self.api_root,
268            client_id=self.client_id,
269            client_secret=self.client_secret,
270        )
271
272    # Deploy and delete destinations
273
274    def permanently_delete_destination(
275        self,
276        destination: str | CloudDestination,
277    ) -> None:
278        """Delete a deployed destination from the workspace.
279
280        You can pass either the `Cache` class or the deployed destination ID as a `str`.
281        """
282        if not isinstance(destination, (str, CloudDestination)):
283            raise exc.PyAirbyteInputError(
284                message="Invalid destination type.",
285                input_value=type(destination).__name__,
286            )
287
288        api_util.delete_destination(
289            destination_id=(
290                destination if isinstance(destination, str) else destination.destination_id
291            ),
292            api_root=self.api_root,
293            client_id=self.client_id,
294            client_secret=self.client_secret,
295        )
296
297    # Deploy and delete connections
298
299    def deploy_connection(
300        self,
301        connection_name: str,
302        *,
303        source: CloudSource | str,
304        selected_streams: list[str],
305        destination: CloudDestination | str,
306        table_prefix: str | None = None,
307    ) -> CloudConnection:
308        """Create a new connection between an already deployed source and destination.
309
310        Returns the newly deployed connection object.
311
312        Args:
313            connection_name: The name of the connection.
314            source: The deployed source. You can pass a source ID or a CloudSource object.
315            destination: The deployed destination. You can pass a destination ID or a
316                CloudDestination object.
317            table_prefix: Optional. The table prefix to use when syncing to the destination.
318            selected_streams: The selected stream names to sync within the connection.
319        """
320        if not selected_streams:
321            raise exc.PyAirbyteInputError(
322                guidance="You must provide `selected_streams` when creating a connection."
323            )
324
325        source_id: str = source if isinstance(source, str) else source.connector_id
326        destination_id: str = (
327            destination if isinstance(destination, str) else destination.connector_id
328        )
329
330        deployed_connection = api_util.create_connection(
331            name=connection_name,
332            source_id=source_id,
333            destination_id=destination_id,
334            api_root=self.api_root,
335            workspace_id=self.workspace_id,
336            selected_stream_names=selected_streams,
337            prefix=table_prefix or "",
338            client_id=self.client_id,
339            client_secret=self.client_secret,
340        )
341
342        return CloudConnection(
343            workspace=self,
344            connection_id=deployed_connection.connection_id,
345            source=deployed_connection.source_id,
346            destination=deployed_connection.destination_id,
347        )
348
349    def permanently_delete_connection(
350        self,
351        connection: str | CloudConnection,
352        *,
353        cascade_delete_source: bool = False,
354        cascade_delete_destination: bool = False,
355    ) -> None:
356        """Delete a deployed connection from the workspace."""
357        if connection is None:
358            raise ValueError("No connection ID provided.")
359
360        if isinstance(connection, str):
361            connection = CloudConnection(
362                workspace=self,
363                connection_id=connection,
364            )
365
366        api_util.delete_connection(
367            connection_id=connection.connection_id,
368            api_root=self.api_root,
369            workspace_id=self.workspace_id,
370            client_id=self.client_id,
371            client_secret=self.client_secret,
372        )
373
374        if cascade_delete_source:
375            self.permanently_delete_source(source=connection.source_id)
376        if cascade_delete_destination:
377            self.permanently_delete_destination(destination=connection.destination_id)
378
379    # List sources, destinations, and connections
380
381    def list_connections(
382        self,
383        name: str | None = None,
384        *,
385        name_filter: Callable | None = None,
386    ) -> list[CloudConnection]:
387        """List connections by name in the workspace.
388
389        TODO: Add pagination support
390        """
391        connections = api_util.list_connections(
392            api_root=self.api_root,
393            workspace_id=self.workspace_id,
394            name=name,
395            name_filter=name_filter,
396            client_id=self.client_id,
397            client_secret=self.client_secret,
398        )
399        return [
400            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
401                workspace=self,
402                connection_response=connection,
403            )
404            for connection in connections
405            if name is None or connection.name == name
406        ]
407
408    def list_sources(
409        self,
410        name: str | None = None,
411        *,
412        name_filter: Callable | None = None,
413    ) -> list[CloudSource]:
414        """List all sources in the workspace.
415
416        TODO: Add pagination support
417        """
418        sources = api_util.list_sources(
419            api_root=self.api_root,
420            workspace_id=self.workspace_id,
421            name=name,
422            name_filter=name_filter,
423            client_id=self.client_id,
424            client_secret=self.client_secret,
425        )
426        return [
427            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
428                workspace=self,
429                source_response=source,
430            )
431            for source in sources
432            if name is None or source.name == name
433        ]
434
435    def list_destinations(
436        self,
437        name: str | None = None,
438        *,
439        name_filter: Callable | None = None,
440    ) -> list[CloudDestination]:
441        """List all destinations in the workspace.
442
443        TODO: Add pagination support
444        """
445        destinations = api_util.list_destinations(
446            api_root=self.api_root,
447            workspace_id=self.workspace_id,
448            name=name,
449            name_filter=name_filter,
450            client_id=self.client_id,
451            client_secret=self.client_secret,
452        )
453        return [
454            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
455                workspace=self,
456                destination_response=destination,
457            )
458            for destination in destinations
459            if name is None or destination.name == name
460        ]
461
462    def publish_custom_source_definition(
463        self,
464        name: str,
465        *,
466        manifest_yaml: dict[str, Any] | Path | str | None = None,
467        docker_image: str | None = None,
468        docker_tag: str | None = None,
469        unique: bool = True,
470        pre_validate: bool = True,
471    ) -> CustomCloudSourceDefinition:
472        """Publish a custom source connector definition.
473
474        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
475        and docker_tag (for Docker connectors), but not both.
476
477        Args:
478            name: Display name for the connector definition
479            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
480            docker_image: Docker repository (e.g., 'airbyte/source-custom')
481            docker_tag: Docker image tag (e.g., '1.0.0')
482            unique: Whether to enforce name uniqueness
483            pre_validate: Whether to validate manifest client-side (YAML only)
484
485        Returns:
486            CustomCloudSourceDefinition object representing the created definition
487
488        Raises:
489            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
490            AirbyteDuplicateResourcesError: If unique=True and name already exists
491        """
492        is_yaml = manifest_yaml is not None
493        is_docker = docker_image is not None
494
495        if is_yaml == is_docker:
496            raise exc.PyAirbyteInputError(
497                message=(
498                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
499                    "docker_image + docker_tag (for Docker connectors), but not both"
500                ),
501                context={
502                    "manifest_yaml_provided": is_yaml,
503                    "docker_image_provided": is_docker,
504                },
505            )
506
507        if is_docker and docker_tag is None:
508            raise exc.PyAirbyteInputError(
509                message="docker_tag is required when docker_image is specified",
510                context={"docker_image": docker_image},
511            )
512
513        if unique:
514            existing = self.list_custom_source_definitions(
515                definition_type="yaml" if is_yaml else "docker",
516            )
517            if any(d.name == name for d in existing):
518                raise exc.AirbyteDuplicateResourcesError(
519                    resource_type="custom_source_definition",
520                    resource_name=name,
521                )
522
523        if is_yaml:
524            manifest_dict: dict[str, Any]
525            if isinstance(manifest_yaml, Path):
526                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
527            elif isinstance(manifest_yaml, str):
528                manifest_dict = yaml.safe_load(manifest_yaml)
529            elif manifest_yaml is not None:
530                manifest_dict = manifest_yaml
531            else:
532                raise exc.PyAirbyteInputError(
533                    message="manifest_yaml is required for YAML connectors",
534                    context={"name": name},
535                )
536
537            if pre_validate:
538                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
539
540            result = api_util.create_custom_yaml_source_definition(
541                name=name,
542                workspace_id=self.workspace_id,
543                manifest=manifest_dict,
544                api_root=self.api_root,
545                client_id=self.client_id,
546                client_secret=self.client_secret,
547            )
548            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
549
550        raise NotImplementedError(
551            "Docker custom source definitions are not yet supported. "
552            "Only YAML manifest-based custom sources are currently available."
553        )
554
555    def list_custom_source_definitions(
556        self,
557        *,
558        definition_type: Literal["yaml", "docker"],
559    ) -> list[CustomCloudSourceDefinition]:
560        """List custom source connector definitions.
561
562        Args:
563            definition_type: Connector type to list ("yaml" or "docker"). Required.
564
565        Returns:
566            List of CustomCloudSourceDefinition objects matching the specified type
567        """
568        if definition_type == "yaml":
569            yaml_definitions = api_util.list_custom_yaml_source_definitions(
570                workspace_id=self.workspace_id,
571                api_root=self.api_root,
572                client_id=self.client_id,
573                client_secret=self.client_secret,
574            )
575            return [
576                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
577                for d in yaml_definitions
578            ]
579
580        raise NotImplementedError(
581            "Docker custom source definitions are not yet supported. "
582            "Only YAML manifest-based custom sources are currently available."
583        )
584
585    def get_custom_source_definition(
586        self,
587        definition_id: str,
588        *,
589        definition_type: Literal["yaml", "docker"],
590    ) -> CustomCloudSourceDefinition:
591        """Get a specific custom source definition by ID.
592
593        Args:
594            definition_id: The definition ID
595            definition_type: Connector type ("yaml" or "docker"). Required.
596
597        Returns:
598            CustomCloudSourceDefinition object
599        """
600        if definition_type == "yaml":
601            result = api_util.get_custom_yaml_source_definition(
602                workspace_id=self.workspace_id,
603                definition_id=definition_id,
604                api_root=self.api_root,
605                client_id=self.client_id,
606                client_secret=self.client_secret,
607            )
608            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
609
610        raise NotImplementedError(
611            "Docker custom source definitions are not yet supported. "
612            "Only YAML manifest-based custom sources are currently available."
613        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
82    @property
83    def workspace_url(self) -> str | None:
84        """The web URL of the workspace."""
85        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def connect(self) -> None:
 89    def connect(self) -> None:
 90        """Check that the workspace is reachable and raise an exception otherwise.
 91
 92        Note: It is not necessary to call this method before calling other operations. It
 93              serves primarily as a simple check to ensure that the workspace is reachable
 94              and credentials are correct.
 95        """
 96        _ = api_util.get_workspace(
 97            api_root=self.api_root,
 98            workspace_id=self.workspace_id,
 99            client_id=self.client_id,
100            client_secret=self.client_secret,
101        )
102        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
106    def get_connection(
107        self,
108        connection_id: str,
109    ) -> CloudConnection:
110        """Get a connection by ID.
111
112        This method does not fetch data from the API. It returns a `CloudConnection` object,
113        which will be loaded lazily as needed.
114        """
115        return CloudConnection(
116            workspace=self,
117            connection_id=connection_id,
118        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
120    def get_source(
121        self,
122        source_id: str,
123    ) -> CloudSource:
124        """Get a source by ID.
125
126        This method does not fetch data from the API. It returns a `CloudSource` object,
127        which will be loaded lazily as needed.
128        """
129        return CloudSource(
130            workspace=self,
131            connector_id=source_id,
132        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
134    def get_destination(
135        self,
136        destination_id: str,
137    ) -> CloudDestination:
138        """Get a destination by ID.
139
140        This method does not fetch data from the API. It returns a `CloudDestination` object,
141        which will be loaded lazily as needed.
142        """
143        return CloudDestination(
144            workspace=self,
145            connector_id=destination_id,
146        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
150    def deploy_source(
151        self,
152        name: str,
153        source: Source,
154        *,
155        unique: bool = True,
156        random_name_suffix: bool = False,
157    ) -> CloudSource:
158        """Deploy a source to the workspace.
159
160        Returns the newly deployed source.
161
162        Args:
163            name: The name to use when deploying.
164            source: The source object to deploy.
165            unique: Whether to require a unique name. If `True`, duplicate names
166                are not allowed. Defaults to `True`.
167            random_name_suffix: Whether to append a random suffix to the name.
168        """
169        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
170        source_config_dict["sourceType"] = source.name.replace("source-", "")
171
172        if random_name_suffix:
173            name += f" (ID: {text_util.generate_random_suffix()})"
174
175        if unique:
176            existing = self.list_sources(name=name)
177            if existing:
178                raise exc.AirbyteDuplicateResourcesError(
179                    resource_type="source",
180                    resource_name=name,
181                )
182
183        deployed_source = api_util.create_source(
184            name=name,
185            api_root=self.api_root,
186            workspace_id=self.workspace_id,
187            config=source_config_dict,
188            client_id=self.client_id,
189            client_secret=self.client_secret,
190        )
191        return CloudSource(
192            workspace=self,
193            connector_id=deployed_source.source_id,
194        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
196    def deploy_destination(
197        self,
198        name: str,
199        destination: Destination | dict[str, Any],
200        *,
201        unique: bool = True,
202        random_name_suffix: bool = False,
203    ) -> CloudDestination:
204        """Deploy a destination to the workspace.
205
206        Returns the newly deployed destination ID.
207
208        Args:
209            name: The name to use when deploying.
210            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
211                dictionary of configuration values.
212            unique: Whether to require a unique name. If `True`, duplicate names
213                are not allowed. Defaults to `True`.
214            random_name_suffix: Whether to append a random suffix to the name.
215        """
216        if isinstance(destination, Destination):
217            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
218            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
219            # raise ValueError(destination_conf_dict)
220        else:
221            destination_conf_dict = destination.copy()
222            if "destinationType" not in destination_conf_dict:
223                raise exc.PyAirbyteInputError(
224                    message="Missing `destinationType` in configuration dictionary.",
225                )
226
227        if random_name_suffix:
228            name += f" (ID: {text_util.generate_random_suffix()})"
229
230        if unique:
231            existing = self.list_destinations(name=name)
232            if existing:
233                raise exc.AirbyteDuplicateResourcesError(
234                    resource_type="destination",
235                    resource_name=name,
236                )
237
238        deployed_destination = api_util.create_destination(
239            name=name,
240            api_root=self.api_root,
241            workspace_id=self.workspace_id,
242            config=destination_conf_dict,  # Wants a dataclass but accepts dict
243            client_id=self.client_id,
244            client_secret=self.client_secret,
245        )
246        return CloudDestination(
247            workspace=self,
248            connector_id=deployed_destination.destination_id,
249        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
251    def permanently_delete_source(
252        self,
253        source: str | CloudSource,
254    ) -> None:
255        """Delete a source from the workspace.
256
257        You can pass either the source ID `str` or a deployed `Source` object.
258        """
259        if not isinstance(source, (str, CloudSource)):
260            raise exc.PyAirbyteInputError(
261                message="Invalid source type.",
262                input_value=type(source).__name__,
263            )
264
265        api_util.delete_source(
266            source_id=source.connector_id if isinstance(source, CloudSource) else source,
267            api_root=self.api_root,
268            client_id=self.client_id,
269            client_secret=self.client_secret,
270        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
274    def permanently_delete_destination(
275        self,
276        destination: str | CloudDestination,
277    ) -> None:
278        """Delete a deployed destination from the workspace.
279
280        You can pass either the `Cache` class or the deployed destination ID as a `str`.
281        """
282        if not isinstance(destination, (str, CloudDestination)):
283            raise exc.PyAirbyteInputError(
284                message="Invalid destination type.",
285                input_value=type(destination).__name__,
286            )
287
288        api_util.delete_destination(
289            destination_id=(
290                destination if isinstance(destination, str) else destination.destination_id
291            ),
292            api_root=self.api_root,
293            client_id=self.client_id,
294            client_secret=self.client_secret,
295        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
299    def deploy_connection(
300        self,
301        connection_name: str,
302        *,
303        source: CloudSource | str,
304        selected_streams: list[str],
305        destination: CloudDestination | str,
306        table_prefix: str | None = None,
307    ) -> CloudConnection:
308        """Create a new connection between an already deployed source and destination.
309
310        Returns the newly deployed connection object.
311
312        Args:
313            connection_name: The name of the connection.
314            source: The deployed source. You can pass a source ID or a CloudSource object.
315            destination: The deployed destination. You can pass a destination ID or a
316                CloudDestination object.
317            table_prefix: Optional. The table prefix to use when syncing to the destination.
318            selected_streams: The selected stream names to sync within the connection.
319        """
320        if not selected_streams:
321            raise exc.PyAirbyteInputError(
322                guidance="You must provide `selected_streams` when creating a connection."
323            )
324
325        source_id: str = source if isinstance(source, str) else source.connector_id
326        destination_id: str = (
327            destination if isinstance(destination, str) else destination.connector_id
328        )
329
330        deployed_connection = api_util.create_connection(
331            name=connection_name,
332            source_id=source_id,
333            destination_id=destination_id,
334            api_root=self.api_root,
335            workspace_id=self.workspace_id,
336            selected_stream_names=selected_streams,
337            prefix=table_prefix or "",
338            client_id=self.client_id,
339            client_secret=self.client_secret,
340        )
341
342        return CloudConnection(
343            workspace=self,
344            connection_id=deployed_connection.connection_id,
345            source=deployed_connection.source_id,
346            destination=deployed_connection.destination_id,
347        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
349    def permanently_delete_connection(
350        self,
351        connection: str | CloudConnection,
352        *,
353        cascade_delete_source: bool = False,
354        cascade_delete_destination: bool = False,
355    ) -> None:
356        """Delete a deployed connection from the workspace."""
357        if connection is None:
358            raise ValueError("No connection ID provided.")
359
360        if isinstance(connection, str):
361            connection = CloudConnection(
362                workspace=self,
363                connection_id=connection,
364            )
365
366        api_util.delete_connection(
367            connection_id=connection.connection_id,
368            api_root=self.api_root,
369            workspace_id=self.workspace_id,
370            client_id=self.client_id,
371            client_secret=self.client_secret,
372        )
373
374        if cascade_delete_source:
375            self.permanently_delete_source(source=connection.source_id)
376        if cascade_delete_destination:
377            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
381    def list_connections(
382        self,
383        name: str | None = None,
384        *,
385        name_filter: Callable | None = None,
386    ) -> list[CloudConnection]:
387        """List connections by name in the workspace.
388
389        TODO: Add pagination support
390        """
391        connections = api_util.list_connections(
392            api_root=self.api_root,
393            workspace_id=self.workspace_id,
394            name=name,
395            name_filter=name_filter,
396            client_id=self.client_id,
397            client_secret=self.client_secret,
398        )
399        return [
400            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
401                workspace=self,
402                connection_response=connection,
403            )
404            for connection in connections
405            if name is None or connection.name == name
406        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
408    def list_sources(
409        self,
410        name: str | None = None,
411        *,
412        name_filter: Callable | None = None,
413    ) -> list[CloudSource]:
414        """List all sources in the workspace.
415
416        TODO: Add pagination support
417        """
418        sources = api_util.list_sources(
419            api_root=self.api_root,
420            workspace_id=self.workspace_id,
421            name=name,
422            name_filter=name_filter,
423            client_id=self.client_id,
424            client_secret=self.client_secret,
425        )
426        return [
427            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
428                workspace=self,
429                source_response=source,
430            )
431            for source in sources
432            if name is None or source.name == name
433        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
435    def list_destinations(
436        self,
437        name: str | None = None,
438        *,
439        name_filter: Callable | None = None,
440    ) -> list[CloudDestination]:
441        """List all destinations in the workspace.
442
443        TODO: Add pagination support
444        """
445        destinations = api_util.list_destinations(
446            api_root=self.api_root,
447            workspace_id=self.workspace_id,
448            name=name,
449            name_filter=name_filter,
450            client_id=self.client_id,
451            client_secret=self.client_secret,
452        )
453        return [
454            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
455                workspace=self,
456                destination_response=destination,
457            )
458            for destination in destinations
459            if name is None or destination.name == name
460        ]

List all destinations in the workspace.

TODO: Add pagination support

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
462    def publish_custom_source_definition(
463        self,
464        name: str,
465        *,
466        manifest_yaml: dict[str, Any] | Path | str | None = None,
467        docker_image: str | None = None,
468        docker_tag: str | None = None,
469        unique: bool = True,
470        pre_validate: bool = True,
471    ) -> CustomCloudSourceDefinition:
472        """Publish a custom source connector definition.
473
474        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
475        and docker_tag (for Docker connectors), but not both.
476
477        Args:
478            name: Display name for the connector definition
479            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
480            docker_image: Docker repository (e.g., 'airbyte/source-custom')
481            docker_tag: Docker image tag (e.g., '1.0.0')
482            unique: Whether to enforce name uniqueness
483            pre_validate: Whether to validate manifest client-side (YAML only)
484
485        Returns:
486            CustomCloudSourceDefinition object representing the created definition
487
488        Raises:
489            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
490            AirbyteDuplicateResourcesError: If unique=True and name already exists
491        """
492        is_yaml = manifest_yaml is not None
493        is_docker = docker_image is not None
494
495        if is_yaml == is_docker:
496            raise exc.PyAirbyteInputError(
497                message=(
498                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
499                    "docker_image + docker_tag (for Docker connectors), but not both"
500                ),
501                context={
502                    "manifest_yaml_provided": is_yaml,
503                    "docker_image_provided": is_docker,
504                },
505            )
506
507        if is_docker and docker_tag is None:
508            raise exc.PyAirbyteInputError(
509                message="docker_tag is required when docker_image is specified",
510                context={"docker_image": docker_image},
511            )
512
513        if unique:
514            existing = self.list_custom_source_definitions(
515                definition_type="yaml" if is_yaml else "docker",
516            )
517            if any(d.name == name for d in existing):
518                raise exc.AirbyteDuplicateResourcesError(
519                    resource_type="custom_source_definition",
520                    resource_name=name,
521                )
522
523        if is_yaml:
524            manifest_dict: dict[str, Any]
525            if isinstance(manifest_yaml, Path):
526                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
527            elif isinstance(manifest_yaml, str):
528                manifest_dict = yaml.safe_load(manifest_yaml)
529            elif manifest_yaml is not None:
530                manifest_dict = manifest_yaml
531            else:
532                raise exc.PyAirbyteInputError(
533                    message="manifest_yaml is required for YAML connectors",
534                    context={"name": name},
535                )
536
537            if pre_validate:
538                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
539
540            result = api_util.create_custom_yaml_source_definition(
541                name=name,
542                workspace_id=self.workspace_id,
543                manifest=manifest_dict,
544                api_root=self.api_root,
545                client_id=self.client_id,
546                client_secret=self.client_secret,
547            )
548            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
549
550        raise NotImplementedError(
551            "Docker custom source definitions are not yet supported. "
552            "Only YAML manifest-based custom sources are currently available."
553        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
555    def list_custom_source_definitions(
556        self,
557        *,
558        definition_type: Literal["yaml", "docker"],
559    ) -> list[CustomCloudSourceDefinition]:
560        """List custom source connector definitions.
561
562        Args:
563            definition_type: Connector type to list ("yaml" or "docker"). Required.
564
565        Returns:
566            List of CustomCloudSourceDefinition objects matching the specified type
567        """
568        if definition_type == "yaml":
569            yaml_definitions = api_util.list_custom_yaml_source_definitions(
570                workspace_id=self.workspace_id,
571                api_root=self.api_root,
572                client_id=self.client_id,
573                client_secret=self.client_secret,
574            )
575            return [
576                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
577                for d in yaml_definitions
578            ]
579
580        raise NotImplementedError(
581            "Docker custom source definitions are not yet supported. "
582            "Only YAML manifest-based custom sources are currently available."
583        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
585    def get_custom_source_definition(
586        self,
587        definition_id: str,
588        *,
589        definition_type: Literal["yaml", "docker"],
590    ) -> CustomCloudSourceDefinition:
591        """Get a specific custom source definition by ID.
592
593        Args:
594            definition_id: The definition ID
595            definition_type: Connector type ("yaml" or "docker"). Required.
596
597        Returns:
598            CustomCloudSourceDefinition object
599        """
600        if definition_type == "yaml":
601            result = api_util.get_custom_yaml_source_definition(
602                workspace_id=self.workspace_id,
603                definition_id=definition_id,
604                api_root=self.api_root,
605                client_id=self.client_id,
606                client_secret=self.client_secret,
607            )
608            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
609
610        raise NotImplementedError(
611            "Docker custom source definitions are not yet supported. "
612            "Only YAML manifest-based custom sources are currently available."
613        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object

class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    @classmethod
 69    def _from_connection_response(
 70        cls,
 71        workspace: CloudWorkspace,
 72        connection_response: ConnectionResponse,
 73    ) -> CloudConnection:
 74        """Create a CloudConnection from a ConnectionResponse."""
 75        result = cls(
 76            workspace=workspace,
 77            connection_id=connection_response.connection_id,
 78            source=connection_response.source_id,
 79            destination=connection_response.destination_id,
 80        )
 81        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
 82        return result
 83
 84    # Properties
 85
 86    @property
 87    def name(self) -> str | None:
 88        """Get the display name of the connection, if available.
 89
 90        E.g. "My Postgres to Snowflake", not the connection ID.
 91        """
 92        if not self._connection_info:
 93            self._connection_info = self._fetch_connection_info()
 94
 95        return self._connection_info.name
 96
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return self._source_id
107
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object
119
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.source_id
128
129        return self._destination_id
130
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object
142
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]
150
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""
158
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
163
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"
168
169    # Run Sync
170
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result
199
200    def __repr__(self) -> str:
201        """String representation of the connection."""
202        return (
203            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
204            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
205        )
206
207    # Logs
208
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]
232
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )
259
260    def rename(self, name: str) -> CloudConnection:
261        """Rename the connection.
262
263        Args:
264            name: New name for the connection
265
266        Returns:
267            Updated CloudConnection object with refreshed info
268        """
269        updated_response = api_util.patch_connection(
270            connection_id=self.connection_id,
271            api_root=self.workspace.api_root,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            name=name,
275        )
276        self._connection_info = updated_response
277        return self
278
279    def set_table_prefix(self, prefix: str) -> CloudConnection:
280        """Set the table prefix for the connection.
281
282        Args:
283            prefix: New table prefix to use when syncing to the destination
284
285        Returns:
286            Updated CloudConnection object with refreshed info
287        """
288        updated_response = api_util.patch_connection(
289            connection_id=self.connection_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            prefix=prefix,
294        )
295        self._connection_info = updated_response
296        return self
297
298    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
299        """Set the selected streams for the connection.
300
301        This is a destructive operation that can break existing connections if the
302        stream selection is changed incorrectly. Use with caution.
303
304        Args:
305            stream_names: List of stream names to sync
306
307        Returns:
308            Updated CloudConnection object with refreshed info
309        """
310        configurations = api_util.build_stream_configurations(stream_names)
311
312        updated_response = api_util.patch_connection(
313            connection_id=self.connection_id,
314            api_root=self.workspace.api_root,
315            client_id=self.workspace.client_id,
316            client_secret=self.workspace.client_secret,
317            configurations=configurations,
318        )
319        self._connection_info = updated_response
320        return self
321
322    # Deletions
323
324    def permanently_delete(
325        self,
326        *,
327        cascade_delete_source: bool = False,
328        cascade_delete_destination: bool = False,
329    ) -> None:
330        """Delete the connection.
331
332        Args:
333            cascade_delete_source: Whether to also delete the source.
334            cascade_delete_destination: Whether to also delete the destination.
335        """
336        self.workspace.permanently_delete_connection(self)
337
338        if cascade_delete_source:
339            self.workspace.permanently_delete_source(self.source_id)
340
341        if cascade_delete_destination:
342            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

name: str | None
86    @property
87    def name(self) -> str | None:
88        """Get the display name of the connection, if available.
89
90        E.g. "My Postgres to Snowflake", not the connection ID.
91        """
92        if not self._connection_info:
93            self._connection_info = self._fetch_connection_info()
94
95        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object

Get the source object.

destination_id: str
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.source_id
128
129        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def rename(self, name: str) -> CloudConnection:
260    def rename(self, name: str) -> CloudConnection:
261        """Rename the connection.
262
263        Args:
264            name: New name for the connection
265
266        Returns:
267            Updated CloudConnection object with refreshed info
268        """
269        updated_response = api_util.patch_connection(
270            connection_id=self.connection_id,
271            api_root=self.workspace.api_root,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            name=name,
275        )
276        self._connection_info = updated_response
277        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
279    def set_table_prefix(self, prefix: str) -> CloudConnection:
280        """Set the table prefix for the connection.
281
282        Args:
283            prefix: New table prefix to use when syncing to the destination
284
285        Returns:
286            Updated CloudConnection object with refreshed info
287        """
288        updated_response = api_util.patch_connection(
289            connection_id=self.connection_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            prefix=prefix,
294        )
295        self._connection_info = updated_response
296        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
298    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
299        """Set the selected streams for the connection.
300
301        This is a destructive operation that can break existing connections if the
302        stream selection is changed incorrectly. Use with caution.
303
304        Args:
305            stream_names: List of stream names to sync
306
307        Returns:
308            Updated CloudConnection object with refreshed info
309        """
310        configurations = api_util.build_stream_configurations(stream_names)
311
312        updated_response = api_util.patch_connection(
313            connection_id=self.connection_id,
314            api_root=self.workspace.api_root,
315            client_id=self.workspace.client_id,
316            client_secret=self.workspace.client_secret,
317            configurations=configurations,
318        )
319        self._connection_info = updated_response
320        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
324    def permanently_delete(
325        self,
326        *,
327        cascade_delete_source: bool = False,
328        cascade_delete_destination: bool = False,
329    ) -> None:
330        """Delete the connection.
331
332        Args:
333            cascade_delete_source: Whether to also delete the source.
334            cascade_delete_destination: Whether to also delete the destination.
335        """
336        self.workspace.permanently_delete_connection(self)
337
338        if cascade_delete_source:
339            self.workspace.permanently_delete_source(self.source_id)
340
341        if cascade_delete_destination:
342            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259        )
260        return self._connection_response
261
262    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
263        """Return the destination configuration for the sync job."""
264        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
265        destination_response = api_util.get_destination(
266            destination_id=connection_info.destination_id,
267            api_root=self.workspace.api_root,
268            client_id=self.workspace.client_id,
269            client_secret=self.workspace.client_secret,
270        )
271        return asdict(destination_response.configuration)
272
273    def is_job_complete(self) -> bool:
274        """Check if the sync job is complete."""
275        return self.get_job_status() in FINAL_STATUSES
276
277    def get_job_status(self) -> JobStatusEnum:
278        """Check if the sync job is still running."""
279        return self._fetch_latest_job_info().status
280
281    def _fetch_latest_job_info(self) -> JobResponse:
282        """Return the job info for the sync job."""
283        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
284            return self._latest_job_info
285
286        self._latest_job_info = api_util.get_job_info(
287            job_id=self.job_id,
288            api_root=self.workspace.api_root,
289            client_id=self.workspace.client_id,
290            client_secret=self.workspace.client_secret,
291        )
292        return self._latest_job_info
293
294    @property
295    def bytes_synced(self) -> int:
296        """Return the number of records processed."""
297        return self._fetch_latest_job_info().bytes_synced or 0
298
299    @property
300    def records_synced(self) -> int:
301        """Return the number of records processed."""
302        return self._fetch_latest_job_info().rows_synced or 0
303
304    @property
305    def start_time(self) -> datetime:
306        """Return the start time of the sync job in UTC."""
307        try:
308            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
309        except (ValueError, TypeError) as e:
310            if "Invalid isoformat string" in str(e):
311                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
312                    api_root=self.workspace.api_root,
313                    path="/jobs/get",
314                    json={"id": self.job_id},
315                    client_id=self.workspace.client_id,
316                    client_secret=self.workspace.client_secret,
317                )
318                raw_start_time = job_info_raw.get("startTime")
319                if raw_start_time:
320                    return ab_datetime_parse(raw_start_time)
321            raise
322
323    def _fetch_job_with_attempts(self) -> dict[str, Any]:
324        """Fetch job info with attempts from Config API using lazy loading pattern."""
325        if self._job_with_attempts_info is not None:
326            return self._job_with_attempts_info
327
328        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
329            api_root=self.workspace.api_root,
330            path="/jobs/get",
331            json={
332                "id": self.job_id,
333            },
334            client_id=self.workspace.client_id,
335            client_secret=self.workspace.client_secret,
336        )
337        return self._job_with_attempts_info
338
339    def get_attempts(self) -> list[SyncAttempt]:
340        """Return a list of attempts for this sync job."""
341        job_with_attempts = self._fetch_job_with_attempts()
342        attempts_data = job_with_attempts.get("attempts", [])
343
344        return [
345            SyncAttempt(
346                workspace=self.workspace,
347                connection=self.connection,
348                job_id=self.job_id,
349                attempt_number=i,
350                _attempt_data=attempt_data,
351            )
352            for i, attempt_data in enumerate(attempts_data, start=0)
353        ]
354
355    def raise_failure_status(
356        self,
357        *,
358        refresh_status: bool = False,
359    ) -> None:
360        """Raise an exception if the sync job failed.
361
362        By default, this method will use the latest status available. If you want to refresh the
363        status before checking for failure, set `refresh_status=True`. If the job has failed, this
364        method will raise a `AirbyteConnectionSyncError`.
365
366        Otherwise, do nothing.
367        """
368        if not refresh_status and self._latest_job_info:
369            latest_status = self._latest_job_info.status
370        else:
371            latest_status = self.get_job_status()
372
373        if latest_status in FAILED_STATUSES:
374            raise AirbyteConnectionSyncError(
375                workspace=self.workspace,
376                connection_id=self.connection.connection_id,
377                job_id=self.job_id,
378                job_status=self.get_job_status(),
379            )
380
381    def wait_for_completion(
382        self,
383        *,
384        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
385        raise_timeout: bool = True,
386        raise_failure: bool = False,
387    ) -> JobStatusEnum:
388        """Wait for a job to finish running."""
389        start_time = time.time()
390        while True:
391            latest_status = self.get_job_status()
392            if latest_status in FINAL_STATUSES:
393                if raise_failure:
394                    # No-op if the job succeeded or is still running:
395                    self.raise_failure_status()
396
397                return latest_status
398
399            if time.time() - start_time > wait_timeout:
400                if raise_timeout:
401                    raise AirbyteConnectionSyncTimeoutError(
402                        workspace=self.workspace,
403                        connection_id=self.connection.connection_id,
404                        job_id=self.job_id,
405                        job_status=latest_status,
406                        timeout=wait_timeout,
407                    )
408
409                return latest_status  # This will be a non-final status
410
411            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
412
413    def get_sql_cache(self) -> CacheBase:
414        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
415        if self._cache:
416            return self._cache
417
418        destination_configuration = self._get_destination_configuration()
419        self._cache = destination_to_cache(destination_configuration=destination_configuration)
420        return self._cache
421
422    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
423        """Return a SQL Engine for querying a SQL-based destination."""
424        return self.get_sql_cache().get_sql_engine()
425
426    def get_sql_table_name(self, stream_name: str) -> str:
427        """Return the SQL table name of the named stream."""
428        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
429
430    def get_sql_table(
431        self,
432        stream_name: str,
433    ) -> sqlalchemy.Table:
434        """Return a SQLAlchemy table object for the named stream."""
435        return self.get_sql_cache().processor.get_sql_table(stream_name)
436
437    def get_dataset(self, stream_name: str) -> CachedDataset:
438        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
439
440        This can be used to read and analyze the data in a SQL-based destination.
441
442        TODO: In a future iteration, we can consider providing stream configuration information
443              (catalog information) to the `CachedDataset` object via the "Get stream properties"
444              API: https://reference.airbyte.com/reference/getstreamproperties
445        """
446        return CachedDataset(
447            self.get_sql_cache(),
448            stream_name=stream_name,
449            stream_configuration=False,  # Don't look for stream configuration in cache.
450        )
451
452    def get_sql_database_name(self) -> str:
453        """Return the SQL database name."""
454        cache = self.get_sql_cache()
455        return cache.get_database_name()
456
457    def get_sql_schema_name(self) -> str:
458        """Return the SQL schema name."""
459        cache = self.get_sql_cache()
460        return cache.schema_name
461
462    @property
463    def stream_names(self) -> list[str]:
464        """Return the set of stream names."""
465        return self.connection.stream_names
466
467    @final
468    @property
469    def streams(
470        self,
471    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
472        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
473
474        This is a convenience wrapper around the `stream_names`
475        property and `get_dataset()` method.
476        """
477        return self._SyncResultStreams(self)
478
479    class _SyncResultStreams(Mapping[str, CachedDataset]):
480        """A mapping of stream names to cached datasets."""
481
482        def __init__(
483            self,
484            parent: SyncResult,
485            /,
486        ) -> None:
487            self.parent: SyncResult = parent
488
489        def __getitem__(self, key: str) -> CachedDataset:
490            return self.parent.get_dataset(stream_name=key)
491
492        def __iter__(self) -> Iterator[str]:
493            return iter(self.parent.stream_names)
494
495        def __len__(self) -> int:
496            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
273    def is_job_complete(self) -> bool:
274        """Check if the sync job is complete."""
275        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
277    def get_job_status(self) -> JobStatusEnum:
278        """Check if the sync job is still running."""
279        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
294    @property
295    def bytes_synced(self) -> int:
296        """Return the number of records processed."""
297        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
299    @property
300    def records_synced(self) -> int:
301        """Return the number of records processed."""
302        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
304    @property
305    def start_time(self) -> datetime:
306        """Return the start time of the sync job in UTC."""
307        try:
308            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
309        except (ValueError, TypeError) as e:
310            if "Invalid isoformat string" in str(e):
311                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
312                    api_root=self.workspace.api_root,
313                    path="/jobs/get",
314                    json={"id": self.job_id},
315                    client_id=self.workspace.client_id,
316                    client_secret=self.workspace.client_secret,
317                )
318                raw_start_time = job_info_raw.get("startTime")
319                if raw_start_time:
320                    return ab_datetime_parse(raw_start_time)
321            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
339    def get_attempts(self) -> list[SyncAttempt]:
340        """Return a list of attempts for this sync job."""
341        job_with_attempts = self._fetch_job_with_attempts()
342        attempts_data = job_with_attempts.get("attempts", [])
343
344        return [
345            SyncAttempt(
346                workspace=self.workspace,
347                connection=self.connection,
348                job_id=self.job_id,
349                attempt_number=i,
350                _attempt_data=attempt_data,
351            )
352            for i, attempt_data in enumerate(attempts_data, start=0)
353        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
355    def raise_failure_status(
356        self,
357        *,
358        refresh_status: bool = False,
359    ) -> None:
360        """Raise an exception if the sync job failed.
361
362        By default, this method will use the latest status available. If you want to refresh the
363        status before checking for failure, set `refresh_status=True`. If the job has failed, this
364        method will raise a `AirbyteConnectionSyncError`.
365
366        Otherwise, do nothing.
367        """
368        if not refresh_status and self._latest_job_info:
369            latest_status = self._latest_job_info.status
370        else:
371            latest_status = self.get_job_status()
372
373        if latest_status in FAILED_STATUSES:
374            raise AirbyteConnectionSyncError(
375                workspace=self.workspace,
376                connection_id=self.connection.connection_id,
377                job_id=self.job_id,
378                job_status=self.get_job_status(),
379            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
381    def wait_for_completion(
382        self,
383        *,
384        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
385        raise_timeout: bool = True,
386        raise_failure: bool = False,
387    ) -> JobStatusEnum:
388        """Wait for a job to finish running."""
389        start_time = time.time()
390        while True:
391            latest_status = self.get_job_status()
392            if latest_status in FINAL_STATUSES:
393                if raise_failure:
394                    # No-op if the job succeeded or is still running:
395                    self.raise_failure_status()
396
397                return latest_status
398
399            if time.time() - start_time > wait_timeout:
400                if raise_timeout:
401                    raise AirbyteConnectionSyncTimeoutError(
402                        workspace=self.workspace,
403                        connection_id=self.connection.connection_id,
404                        job_id=self.job_id,
405                        job_status=latest_status,
406                        timeout=wait_timeout,
407                    )
408
409                return latest_status  # This will be a non-final status
410
411            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
413    def get_sql_cache(self) -> CacheBase:
414        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
415        if self._cache:
416            return self._cache
417
418        destination_configuration = self._get_destination_configuration()
419        self._cache = destination_to_cache(destination_configuration=destination_configuration)
420        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
422    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
423        """Return a SQL Engine for querying a SQL-based destination."""
424        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
426    def get_sql_table_name(self, stream_name: str) -> str:
427        """Return the SQL table name of the named stream."""
428        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
430    def get_sql_table(
431        self,
432        stream_name: str,
433    ) -> sqlalchemy.Table:
434        """Return a SQLAlchemy table object for the named stream."""
435        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
437    def get_dataset(self, stream_name: str) -> CachedDataset:
438        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
439
440        This can be used to read and analyze the data in a SQL-based destination.
441
442        TODO: In a future iteration, we can consider providing stream configuration information
443              (catalog information) to the `CachedDataset` object via the "Get stream properties"
444              API: https://reference.airbyte.com/reference/getstreamproperties
445        """
446        return CachedDataset(
447            self.get_sql_cache(),
448            stream_name=stream_name,
449            stream_configuration=False,  # Don't look for stream configuration in cache.
450        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
452    def get_sql_database_name(self) -> str:
453        """Return the SQL database name."""
454        cache = self.get_sql_cache()
455        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
457    def get_sql_schema_name(self) -> str:
458        """Return the SQL schema name."""
459        cache = self.get_sql_cache()
460        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
462    @property
463    def stream_names(self) -> list[str]:
464        """Return the set of stream names."""
465        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
467    @final
468    @property
469    def streams(
470        self,
471    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
472        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
473
474        This is a convenience wrapper around the `stream_names`
475        property and `get_dataset()` method.
476        """
477        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans