airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.connections import CloudConnection
57from airbyte.cloud.constants import JobStatusEnum
58from airbyte.cloud.sync_results import SyncResult
59from airbyte.cloud.workspaces import CloudWorkspace
60
61
62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
63if TYPE_CHECKING:
64    # ruff: noqa: TC004
65    from airbyte.cloud import connections, constants, sync_results, workspaces
66
67
68__all__ = [
69    # Submodules
70    "workspaces",
71    "connections",
72    "constants",
73    "sync_results",
74    # Classes
75    "CloudWorkspace",
76    "CloudConnection",
77    "SyncResult",
78    # Enums
79    "JobStatusEnum",
80]
@dataclass
class CloudWorkspace:
 56@dataclass
 57class CloudWorkspace:
 58    """A remote workspace on the Airbyte Cloud.
 59
 60    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 61    instances, both OSS and Enterprise.
 62    """
 63
 64    workspace_id: str
 65    client_id: SecretString
 66    client_secret: SecretString
 67    api_root: str = api_util.CLOUD_API_ROOT
 68
 69    def __post_init__(self) -> None:
 70        """Ensure that the client ID and secret are handled securely."""
 71        self.client_id = SecretString(self.client_id)
 72        self.client_secret = SecretString(self.client_secret)
 73
 74    @property
 75    def workspace_url(self) -> str | None:
 76        """The URL of the workspace."""
 77        return f"{self.api_root}/workspaces/{self.workspace_id}"
 78
 79    # Test connection and creds
 80
 81    def connect(self) -> None:
 82        """Check that the workspace is reachable and raise an exception otherwise.
 83
 84        Note: It is not necessary to call this method before calling other operations. It
 85              serves primarily as a simple check to ensure that the workspace is reachable
 86              and credentials are correct.
 87        """
 88        _ = api_util.get_workspace(
 89            api_root=self.api_root,
 90            workspace_id=self.workspace_id,
 91            client_id=self.client_id,
 92            client_secret=self.client_secret,
 93        )
 94        print(f"Successfully connected to workspace: {self.workspace_url}")
 95
 96    # Get sources, destinations, and connections
 97
 98    def get_connection(
 99        self,
100        connection_id: str,
101    ) -> CloudConnection:
102        """Get a connection by ID.
103
104        This method does not fetch data from the API. It returns a `CloudConnection` object,
105        which will be loaded lazily as needed.
106        """
107        return CloudConnection(
108            workspace=self,
109            connection_id=connection_id,
110        )
111
112    def get_source(
113        self,
114        source_id: str,
115    ) -> CloudSource:
116        """Get a source by ID.
117
118        This method does not fetch data from the API. It returns a `CloudSource` object,
119        which will be loaded lazily as needed.
120        """
121        return CloudSource(
122            workspace=self,
123            connector_id=source_id,
124        )
125
126    def get_destination(
127        self,
128        destination_id: str,
129    ) -> CloudDestination:
130        """Get a destination by ID.
131
132        This method does not fetch data from the API. It returns a `CloudDestination` object,
133        which will be loaded lazily as needed.
134        """
135        return CloudDestination(
136            workspace=self,
137            connector_id=destination_id,
138        )
139
140    # Deploy sources and destinations
141
142    def deploy_source(
143        self,
144        name: str,
145        source: Source,
146        *,
147        unique: bool = True,
148        random_name_suffix: bool = False,
149    ) -> CloudSource:
150        """Deploy a source to the workspace.
151
152        Returns the newly deployed source.
153
154        Args:
155            name: The name to use when deploying.
156            source: The source object to deploy.
157            unique: Whether to require a unique name. If `True`, duplicate names
158                are not allowed. Defaults to `True`.
159            random_name_suffix: Whether to append a random suffix to the name.
160        """
161        source_config_dict = source.get_config().copy()
162        source_config_dict["sourceType"] = source.name.replace("source-", "")
163
164        if random_name_suffix:
165            name += f" (ID: {text_util.generate_random_suffix()})"
166
167        if unique:
168            existing = self.list_sources(name=name)
169            if existing:
170                raise exc.AirbyteDuplicateResourcesError(
171                    resource_type="source",
172                    resource_name=name,
173                )
174
175        deployed_source = api_util.create_source(
176            name=name,
177            api_root=self.api_root,
178            workspace_id=self.workspace_id,
179            config=source_config_dict,
180            client_id=self.client_id,
181            client_secret=self.client_secret,
182        )
183        return CloudSource(
184            workspace=self,
185            connector_id=deployed_source.source_id,
186        )
187
188    def deploy_destination(
189        self,
190        name: str,
191        destination: Destination | dict[str, Any],
192        *,
193        unique: bool = True,
194        random_name_suffix: bool = False,
195    ) -> CloudDestination:
196        """Deploy a destination to the workspace.
197
198        Returns the newly deployed destination ID.
199
200        Args:
201            name: The name to use when deploying.
202            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
203                dictionary of configuration values.
204            unique: Whether to require a unique name. If `True`, duplicate names
205                are not allowed. Defaults to `True`.
206            random_name_suffix: Whether to append a random suffix to the name.
207        """
208        if isinstance(destination, Destination):
209            destination_conf_dict = destination.get_config().copy()
210            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
211            # raise ValueError(destination_conf_dict)
212        else:
213            destination_conf_dict = destination.copy()
214            if "destinationType" not in destination_conf_dict:
215                raise exc.PyAirbyteInputError(
216                    message="Missing `destinationType` in configuration dictionary.",
217                )
218
219        if random_name_suffix:
220            name += f" (ID: {text_util.generate_random_suffix()})"
221
222        if unique:
223            existing = self.list_destinations(name=name)
224            if existing:
225                raise exc.AirbyteDuplicateResourcesError(
226                    resource_type="destination",
227                    resource_name=name,
228                )
229
230        deployed_destination = api_util.create_destination(
231            name=name,
232            api_root=self.api_root,
233            workspace_id=self.workspace_id,
234            config=destination_conf_dict,  # Wants a dataclass but accepts dict
235            client_id=self.client_id,
236            client_secret=self.client_secret,
237        )
238        return CloudDestination(
239            workspace=self,
240            connector_id=deployed_destination.destination_id,
241        )
242
243    def permanently_delete_source(
244        self,
245        source: str | CloudSource,
246    ) -> None:
247        """Delete a source from the workspace.
248
249        You can pass either the source ID `str` or a deployed `Source` object.
250        """
251        if not isinstance(source, (str, CloudSource)):
252            raise exc.PyAirbyteInputError(
253                message="Invalid source type.",
254                input_value=type(source).__name__,
255            )
256
257        api_util.delete_source(
258            source_id=source.connector_id if isinstance(source, CloudSource) else source,
259            api_root=self.api_root,
260            client_id=self.client_id,
261            client_secret=self.client_secret,
262        )
263
264    # Deploy and delete destinations
265
266    def permanently_delete_destination(
267        self,
268        destination: str | CloudDestination,
269    ) -> None:
270        """Delete a deployed destination from the workspace.
271
272        You can pass either the `Cache` class or the deployed destination ID as a `str`.
273        """
274        if not isinstance(destination, (str, CloudDestination)):
275            raise exc.PyAirbyteInputError(
276                message="Invalid destination type.",
277                input_value=type(destination).__name__,
278            )
279
280        api_util.delete_destination(
281            destination_id=(
282                destination if isinstance(destination, str) else destination.destination_id
283            ),
284            api_root=self.api_root,
285            client_id=self.client_id,
286            client_secret=self.client_secret,
287        )
288
289    # Deploy and delete connections
290
291    def deploy_connection(
292        self,
293        connection_name: str,
294        *,
295        source: CloudSource | str,
296        selected_streams: list[str],
297        destination: CloudDestination | str,
298        table_prefix: str | None = None,
299    ) -> CloudConnection:
300        """Create a new connection between an already deployed source and destination.
301
302        Returns the newly deployed connection object.
303
304        Args:
305            connection_name: The name of the connection.
306            source: The deployed source. You can pass a source ID or a CloudSource object.
307            destination: The deployed destination. You can pass a destination ID or a
308                CloudDestination object.
309            table_prefix: Optional. The table prefix to use when syncing to the destination.
310            selected_streams: The selected stream names to sync within the connection.
311        """
312        if not selected_streams:
313            raise exc.PyAirbyteInputError(
314                guidance="You must provide `selected_streams` when creating a connection."
315            )
316
317        source_id: str = source if isinstance(source, str) else source.connector_id
318        destination_id: str = (
319            destination if isinstance(destination, str) else destination.connector_id
320        )
321
322        deployed_connection = api_util.create_connection(
323            name=connection_name,
324            source_id=source_id,
325            destination_id=destination_id,
326            api_root=self.api_root,
327            workspace_id=self.workspace_id,
328            selected_stream_names=selected_streams,
329            prefix=table_prefix or "",
330            client_id=self.client_id,
331            client_secret=self.client_secret,
332        )
333
334        return CloudConnection(
335            workspace=self,
336            connection_id=deployed_connection.connection_id,
337            source=deployed_connection.source_id,
338            destination=deployed_connection.destination_id,
339        )
340
341    def permanently_delete_connection(
342        self,
343        connection: str | CloudConnection,
344        *,
345        cascade_delete_source: bool = False,
346        cascade_delete_destination: bool = False,
347    ) -> None:
348        """Delete a deployed connection from the workspace."""
349        if connection is None:
350            raise ValueError("No connection ID provided.")
351
352        if isinstance(connection, str):
353            connection = CloudConnection(
354                workspace=self,
355                connection_id=connection,
356            )
357
358        api_util.delete_connection(
359            connection_id=connection.connection_id,
360            api_root=self.api_root,
361            workspace_id=self.workspace_id,
362            client_id=self.client_id,
363            client_secret=self.client_secret,
364        )
365
366        if cascade_delete_source:
367            self.permanently_delete_source(source=connection.source_id)
368        if cascade_delete_destination:
369            self.permanently_delete_destination(destination=connection.destination_id)
370
371    # List sources, destinations, and connections
372
373    def list_connections(
374        self,
375        name: str | None = None,
376        *,
377        name_filter: Callable | None = None,
378    ) -> list[CloudConnection]:
379        """List connections by name in the workspace."""
380        connections = api_util.list_connections(
381            api_root=self.api_root,
382            workspace_id=self.workspace_id,
383            name=name,
384            name_filter=name_filter,
385            client_id=self.client_id,
386            client_secret=self.client_secret,
387        )
388        return [
389            CloudConnection(
390                workspace=self,
391                connection_id=connection.connection_id,
392                source=None,
393                destination=None,
394            )
395            for connection in connections
396            if name is None or connection.name == name
397        ]
398
399    def list_sources(
400        self,
401        name: str | None = None,
402        *,
403        name_filter: Callable | None = None,
404    ) -> list[CloudSource]:
405        """List all sources in the workspace."""
406        sources = api_util.list_sources(
407            api_root=self.api_root,
408            workspace_id=self.workspace_id,
409            name=name,
410            name_filter=name_filter,
411            client_id=self.client_id,
412            client_secret=self.client_secret,
413        )
414        return [
415            CloudSource(
416                workspace=self,
417                connector_id=source.source_id,
418            )
419            for source in sources
420            if name is None or source.name == name
421        ]
422
423    def list_destinations(
424        self,
425        name: str | None = None,
426        *,
427        name_filter: Callable | None = None,
428    ) -> list[CloudDestination]:
429        """List all destinations in the workspace."""
430        destinations = api_util.list_destinations(
431            api_root=self.api_root,
432            workspace_id=self.workspace_id,
433            name=name,
434            name_filter=name_filter,
435            client_id=self.client_id,
436            client_secret=self.client_secret,
437        )
438        return [
439            CloudDestination(
440                workspace=self,
441                connector_id=destination.destination_id,
442            )
443            for destination in destinations
444            if name is None or destination.name == name
445        ]

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
74    @property
75    def workspace_url(self) -> str | None:
76        """The URL of the workspace."""
77        return f"{self.api_root}/workspaces/{self.workspace_id}"

The URL of the workspace.

def connect(self) -> None:
81    def connect(self) -> None:
82        """Check that the workspace is reachable and raise an exception otherwise.
83
84        Note: It is not necessary to call this method before calling other operations. It
85              serves primarily as a simple check to ensure that the workspace is reachable
86              and credentials are correct.
87        """
88        _ = api_util.get_workspace(
89            api_root=self.api_root,
90            workspace_id=self.workspace_id,
91            client_id=self.client_id,
92            client_secret=self.client_secret,
93        )
94        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
 98    def get_connection(
 99        self,
100        connection_id: str,
101    ) -> CloudConnection:
102        """Get a connection by ID.
103
104        This method does not fetch data from the API. It returns a `CloudConnection` object,
105        which will be loaded lazily as needed.
106        """
107        return CloudConnection(
108            workspace=self,
109            connection_id=connection_id,
110        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
112    def get_source(
113        self,
114        source_id: str,
115    ) -> CloudSource:
116        """Get a source by ID.
117
118        This method does not fetch data from the API. It returns a `CloudSource` object,
119        which will be loaded lazily as needed.
120        """
121        return CloudSource(
122            workspace=self,
123            connector_id=source_id,
124        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
126    def get_destination(
127        self,
128        destination_id: str,
129    ) -> CloudDestination:
130        """Get a destination by ID.
131
132        This method does not fetch data from the API. It returns a `CloudDestination` object,
133        which will be loaded lazily as needed.
134        """
135        return CloudDestination(
136            workspace=self,
137            connector_id=destination_id,
138        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
142    def deploy_source(
143        self,
144        name: str,
145        source: Source,
146        *,
147        unique: bool = True,
148        random_name_suffix: bool = False,
149    ) -> CloudSource:
150        """Deploy a source to the workspace.
151
152        Returns the newly deployed source.
153
154        Args:
155            name: The name to use when deploying.
156            source: The source object to deploy.
157            unique: Whether to require a unique name. If `True`, duplicate names
158                are not allowed. Defaults to `True`.
159            random_name_suffix: Whether to append a random suffix to the name.
160        """
161        source_config_dict = source.get_config().copy()
162        source_config_dict["sourceType"] = source.name.replace("source-", "")
163
164        if random_name_suffix:
165            name += f" (ID: {text_util.generate_random_suffix()})"
166
167        if unique:
168            existing = self.list_sources(name=name)
169            if existing:
170                raise exc.AirbyteDuplicateResourcesError(
171                    resource_type="source",
172                    resource_name=name,
173                )
174
175        deployed_source = api_util.create_source(
176            name=name,
177            api_root=self.api_root,
178            workspace_id=self.workspace_id,
179            config=source_config_dict,
180            client_id=self.client_id,
181            client_secret=self.client_secret,
182        )
183        return CloudSource(
184            workspace=self,
185            connector_id=deployed_source.source_id,
186        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
188    def deploy_destination(
189        self,
190        name: str,
191        destination: Destination | dict[str, Any],
192        *,
193        unique: bool = True,
194        random_name_suffix: bool = False,
195    ) -> CloudDestination:
196        """Deploy a destination to the workspace.
197
198        Returns the newly deployed destination ID.
199
200        Args:
201            name: The name to use when deploying.
202            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
203                dictionary of configuration values.
204            unique: Whether to require a unique name. If `True`, duplicate names
205                are not allowed. Defaults to `True`.
206            random_name_suffix: Whether to append a random suffix to the name.
207        """
208        if isinstance(destination, Destination):
209            destination_conf_dict = destination.get_config().copy()
210            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
211            # raise ValueError(destination_conf_dict)
212        else:
213            destination_conf_dict = destination.copy()
214            if "destinationType" not in destination_conf_dict:
215                raise exc.PyAirbyteInputError(
216                    message="Missing `destinationType` in configuration dictionary.",
217                )
218
219        if random_name_suffix:
220            name += f" (ID: {text_util.generate_random_suffix()})"
221
222        if unique:
223            existing = self.list_destinations(name=name)
224            if existing:
225                raise exc.AirbyteDuplicateResourcesError(
226                    resource_type="destination",
227                    resource_name=name,
228                )
229
230        deployed_destination = api_util.create_destination(
231            name=name,
232            api_root=self.api_root,
233            workspace_id=self.workspace_id,
234            config=destination_conf_dict,  # Wants a dataclass but accepts dict
235            client_id=self.client_id,
236            client_secret=self.client_secret,
237        )
238        return CloudDestination(
239            workspace=self,
240            connector_id=deployed_destination.destination_id,
241        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
243    def permanently_delete_source(
244        self,
245        source: str | CloudSource,
246    ) -> None:
247        """Delete a source from the workspace.
248
249        You can pass either the source ID `str` or a deployed `Source` object.
250        """
251        if not isinstance(source, (str, CloudSource)):
252            raise exc.PyAirbyteInputError(
253                message="Invalid source type.",
254                input_value=type(source).__name__,
255            )
256
257        api_util.delete_source(
258            source_id=source.connector_id if isinstance(source, CloudSource) else source,
259            api_root=self.api_root,
260            client_id=self.client_id,
261            client_secret=self.client_secret,
262        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
266    def permanently_delete_destination(
267        self,
268        destination: str | CloudDestination,
269    ) -> None:
270        """Delete a deployed destination from the workspace.
271
272        You can pass either the `Cache` class or the deployed destination ID as a `str`.
273        """
274        if not isinstance(destination, (str, CloudDestination)):
275            raise exc.PyAirbyteInputError(
276                message="Invalid destination type.",
277                input_value=type(destination).__name__,
278            )
279
280        api_util.delete_destination(
281            destination_id=(
282                destination if isinstance(destination, str) else destination.destination_id
283            ),
284            api_root=self.api_root,
285            client_id=self.client_id,
286            client_secret=self.client_secret,
287        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
291    def deploy_connection(
292        self,
293        connection_name: str,
294        *,
295        source: CloudSource | str,
296        selected_streams: list[str],
297        destination: CloudDestination | str,
298        table_prefix: str | None = None,
299    ) -> CloudConnection:
300        """Create a new connection between an already deployed source and destination.
301
302        Returns the newly deployed connection object.
303
304        Args:
305            connection_name: The name of the connection.
306            source: The deployed source. You can pass a source ID or a CloudSource object.
307            destination: The deployed destination. You can pass a destination ID or a
308                CloudDestination object.
309            table_prefix: Optional. The table prefix to use when syncing to the destination.
310            selected_streams: The selected stream names to sync within the connection.
311        """
312        if not selected_streams:
313            raise exc.PyAirbyteInputError(
314                guidance="You must provide `selected_streams` when creating a connection."
315            )
316
317        source_id: str = source if isinstance(source, str) else source.connector_id
318        destination_id: str = (
319            destination if isinstance(destination, str) else destination.connector_id
320        )
321
322        deployed_connection = api_util.create_connection(
323            name=connection_name,
324            source_id=source_id,
325            destination_id=destination_id,
326            api_root=self.api_root,
327            workspace_id=self.workspace_id,
328            selected_stream_names=selected_streams,
329            prefix=table_prefix or "",
330            client_id=self.client_id,
331            client_secret=self.client_secret,
332        )
333
334        return CloudConnection(
335            workspace=self,
336            connection_id=deployed_connection.connection_id,
337            source=deployed_connection.source_id,
338            destination=deployed_connection.destination_id,
339        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
341    def permanently_delete_connection(
342        self,
343        connection: str | CloudConnection,
344        *,
345        cascade_delete_source: bool = False,
346        cascade_delete_destination: bool = False,
347    ) -> None:
348        """Delete a deployed connection from the workspace."""
349        if connection is None:
350            raise ValueError("No connection ID provided.")
351
352        if isinstance(connection, str):
353            connection = CloudConnection(
354                workspace=self,
355                connection_id=connection,
356            )
357
358        api_util.delete_connection(
359            connection_id=connection.connection_id,
360            api_root=self.api_root,
361            workspace_id=self.workspace_id,
362            client_id=self.client_id,
363            client_secret=self.client_secret,
364        )
365
366        if cascade_delete_source:
367            self.permanently_delete_source(source=connection.source_id)
368        if cascade_delete_destination:
369            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
373    def list_connections(
374        self,
375        name: str | None = None,
376        *,
377        name_filter: Callable | None = None,
378    ) -> list[CloudConnection]:
379        """List connections by name in the workspace."""
380        connections = api_util.list_connections(
381            api_root=self.api_root,
382            workspace_id=self.workspace_id,
383            name=name,
384            name_filter=name_filter,
385            client_id=self.client_id,
386            client_secret=self.client_secret,
387        )
388        return [
389            CloudConnection(
390                workspace=self,
391                connection_id=connection.connection_id,
392                source=None,
393                destination=None,
394            )
395            for connection in connections
396            if name is None or connection.name == name
397        ]

List connections by name in the workspace.

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
399    def list_sources(
400        self,
401        name: str | None = None,
402        *,
403        name_filter: Callable | None = None,
404    ) -> list[CloudSource]:
405        """List all sources in the workspace."""
406        sources = api_util.list_sources(
407            api_root=self.api_root,
408            workspace_id=self.workspace_id,
409            name=name,
410            name_filter=name_filter,
411            client_id=self.client_id,
412            client_secret=self.client_secret,
413        )
414        return [
415            CloudSource(
416                workspace=self,
417                connector_id=source.source_id,
418            )
419            for source in sources
420            if name is None or source.name == name
421        ]

List all sources in the workspace.

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
423    def list_destinations(
424        self,
425        name: str | None = None,
426        *,
427        name_filter: Callable | None = None,
428    ) -> list[CloudDestination]:
429        """List all destinations in the workspace."""
430        destinations = api_util.list_destinations(
431            api_root=self.api_root,
432            workspace_id=self.workspace_id,
433            name=name,
434            name_filter=name_filter,
435            client_id=self.client_id,
436            client_secret=self.client_secret,
437        )
438        return [
439            CloudDestination(
440                workspace=self,
441                connector_id=destination.destination_id,
442            )
443            for destination in destinations
444            if name is None or destination.name == name
445        ]

List all destinations in the workspace.

class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    # Properties
 69
 70    @property
 71    def source_id(self) -> str:
 72        """The ID of the source."""
 73        if not self._source_id:
 74            if not self._connection_info:
 75                self._connection_info = self._fetch_connection_info()
 76
 77            self._source_id = self._connection_info.source_id
 78
 79        return cast("str", self._source_id)
 80
 81    @property
 82    def source(self) -> CloudSource:
 83        """Get the source object."""
 84        if self._cloud_source_object:
 85            return self._cloud_source_object
 86
 87        self._cloud_source_object = CloudSource(
 88            workspace=self.workspace,
 89            connector_id=self.source_id,
 90        )
 91        return self._cloud_source_object
 92
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)
103
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object
115
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]
123
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""
131
132    @property
133    def connection_url(self) -> str | None:
134        """The URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
136
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/job-history"
141
142    # Run Sync
143
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result
172
173    # Logs
174
175    def get_previous_sync_logs(
176        self,
177        *,
178        limit: int = 10,
179    ) -> list[SyncResult]:
180        """Get the previous sync logs for a connection."""
181        sync_logs: list[JobResponse] = api_util.get_job_logs(
182            connection_id=self.connection_id,
183            api_root=self.workspace.api_root,
184            workspace_id=self.workspace.workspace_id,
185            limit=limit,
186            client_id=self.workspace.client_id,
187            client_secret=self.workspace.client_secret,
188        )
189        return [
190            SyncResult(
191                workspace=self.workspace,
192                connection=self,
193                job_id=sync_log.job_id,
194                _latest_job_info=sync_log,
195            )
196            for sync_log in sync_logs
197        ]
198
199    def get_sync_result(
200        self,
201        job_id: int | None = None,
202    ) -> SyncResult | None:
203        """Get the sync result for the connection.
204
205        If `job_id` is not provided, the most recent sync job will be used.
206
207        Returns `None` if job_id is omitted and no previous jobs are found.
208        """
209        if job_id is None:
210            # Get the most recent sync job
211            results = self.get_previous_sync_logs(
212                limit=1,
213            )
214            if results:
215                return results[0]
216
217            return None
218
219        # Get the sync job by ID (lazy loaded)
220        return SyncResult(
221            workspace=self.workspace,
222            connection=self,
223            job_id=job_id,
224        )
225
226    # Deletions
227
228    def permanently_delete(
229        self,
230        *,
231        cascade_delete_source: bool = False,
232        cascade_delete_destination: bool = False,
233    ) -> None:
234        """Delete the connection.
235
236        Args:
237            cascade_delete_source: Whether to also delete the source.
238            cascade_delete_destination: Whether to also delete the destination.
239        """
240        self.workspace.permanently_delete_connection(self)
241
242        if cascade_delete_source:
243            self.workspace.permanently_delete_source(self.source_id)
244
245        if cascade_delete_destination:
246            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
70    @property
71    def source_id(self) -> str:
72        """The ID of the source."""
73        if not self._source_id:
74            if not self._connection_info:
75                self._connection_info = self._fetch_connection_info()
76
77            self._source_id = self._connection_info.source_id
78
79        return cast("str", self._source_id)

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
81    @property
82    def source(self) -> CloudSource:
83        """Get the source object."""
84        if self._cloud_source_object:
85            return self._cloud_source_object
86
87        self._cloud_source_object = CloudSource(
88            workspace=self.workspace,
89            connector_id=self.source_id,
90        )
91        return self._cloud_source_object

Get the source object.

destination_id: str
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
132    @property
133    def connection_url(self) -> str | None:
134        """The URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The URL to the connection.

job_history_url: str | None
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/job-history"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
175    def get_previous_sync_logs(
176        self,
177        *,
178        limit: int = 10,
179    ) -> list[SyncResult]:
180        """Get the previous sync logs for a connection."""
181        sync_logs: list[JobResponse] = api_util.get_job_logs(
182            connection_id=self.connection_id,
183            api_root=self.workspace.api_root,
184            workspace_id=self.workspace.workspace_id,
185            limit=limit,
186            client_id=self.workspace.client_id,
187            client_secret=self.workspace.client_secret,
188        )
189        return [
190            SyncResult(
191                workspace=self.workspace,
192                connection=self,
193                job_id=sync_log.job_id,
194                _latest_job_info=sync_log,
195            )
196            for sync_log in sync_logs
197        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
199    def get_sync_result(
200        self,
201        job_id: int | None = None,
202    ) -> SyncResult | None:
203        """Get the sync result for the connection.
204
205        If `job_id` is not provided, the most recent sync job will be used.
206
207        Returns `None` if job_id is omitted and no previous jobs are found.
208        """
209        if job_id is None:
210            # Get the most recent sync job
211            results = self.get_previous_sync_logs(
212                limit=1,
213            )
214            if results:
215                return results[0]
216
217            return None
218
219        # Get the sync job by ID (lazy loaded)
220        return SyncResult(
221            workspace=self.workspace,
222            connection=self,
223            job_id=job_id,
224        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
228    def permanently_delete(
229        self,
230        *,
231        cascade_delete_source: bool = False,
232        cascade_delete_destination: bool = False,
233    ) -> None:
234        """Delete the connection.
235
236        Args:
237            cascade_delete_source: Whether to also delete the source.
238            cascade_delete_destination: Whether to also delete the destination.
239        """
240        self.workspace.permanently_delete_connection(self)
241
242        if cascade_delete_source:
243            self.workspace.permanently_delete_source(self.source_id)
244
245        if cascade_delete_destination:
246            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: int
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"
150
151    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
152        """Return connection info for the sync job."""
153        if self._connection_response and not force_refresh:
154            return self._connection_response
155
156        self._connection_response = api_util.get_connection(
157            workspace_id=self.workspace.workspace_id,
158            api_root=self.workspace.api_root,
159            connection_id=self.connection.connection_id,
160            client_id=self.workspace.client_id,
161            client_secret=self.workspace.client_secret,
162        )
163        return self._connection_response
164
165    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
166        """Return the destination configuration for the sync job."""
167        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
168        destination_response = api_util.get_destination(
169            destination_id=connection_info.destination_id,
170            api_root=self.workspace.api_root,
171            client_id=self.workspace.client_id,
172            client_secret=self.workspace.client_secret,
173        )
174        return asdict(destination_response.configuration)
175
176    def is_job_complete(self) -> bool:
177        """Check if the sync job is complete."""
178        return self.get_job_status() in FINAL_STATUSES
179
180    def get_job_status(self) -> JobStatusEnum:
181        """Check if the sync job is still running."""
182        return self._fetch_latest_job_info().status
183
184    def _fetch_latest_job_info(self) -> JobResponse:
185        """Return the job info for the sync job."""
186        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
187            return self._latest_job_info
188
189        self._latest_job_info = api_util.get_job_info(
190            job_id=self.job_id,
191            api_root=self.workspace.api_root,
192            client_id=self.workspace.client_id,
193            client_secret=self.workspace.client_secret,
194        )
195        return self._latest_job_info
196
197    @property
198    def bytes_synced(self) -> int:
199        """Return the number of records processed."""
200        return self._fetch_latest_job_info().bytes_synced or 0
201
202    @property
203    def records_synced(self) -> int:
204        """Return the number of records processed."""
205        return self._fetch_latest_job_info().rows_synced or 0
206
207    @property
208    def start_time(self) -> datetime:
209        """Return the start time of the sync job in UTC."""
210        # Parse from ISO 8601 format:
211        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
212
213    def raise_failure_status(
214        self,
215        *,
216        refresh_status: bool = False,
217    ) -> None:
218        """Raise an exception if the sync job failed.
219
220        By default, this method will use the latest status available. If you want to refresh the
221        status before checking for failure, set `refresh_status=True`. If the job has failed, this
222        method will raise a `AirbyteConnectionSyncError`.
223
224        Otherwise, do nothing.
225        """
226        if not refresh_status and self._latest_job_info:
227            latest_status = self._latest_job_info.status
228        else:
229            latest_status = self.get_job_status()
230
231        if latest_status in FAILED_STATUSES:
232            raise AirbyteConnectionSyncError(
233                workspace=self.workspace,
234                connection_id=self.connection.connection_id,
235                job_id=self.job_id,
236                job_status=self.get_job_status(),
237            )
238
239    def wait_for_completion(
240        self,
241        *,
242        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
243        raise_timeout: bool = True,
244        raise_failure: bool = False,
245    ) -> JobStatusEnum:
246        """Wait for a job to finish running."""
247        start_time = time.time()
248        while True:
249            latest_status = self.get_job_status()
250            if latest_status in FINAL_STATUSES:
251                if raise_failure:
252                    # No-op if the job succeeded or is still running:
253                    self.raise_failure_status()
254
255                return latest_status
256
257            if time.time() - start_time > wait_timeout:
258                if raise_timeout:
259                    raise AirbyteConnectionSyncTimeoutError(
260                        workspace=self.workspace,
261                        connection_id=self.connection.connection_id,
262                        job_id=self.job_id,
263                        job_status=latest_status,
264                        timeout=wait_timeout,
265                    )
266
267                return latest_status  # This will be a non-final status
268
269            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
270
271    def get_sql_cache(self) -> CacheBase:
272        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
273        if self._cache:
274            return self._cache
275
276        destination_configuration = self._get_destination_configuration()
277        self._cache = destination_to_cache(destination_configuration=destination_configuration)
278        return self._cache
279
280    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
281        """Return a SQL Engine for querying a SQL-based destination."""
282        return self.get_sql_cache().get_sql_engine()
283
284    def get_sql_table_name(self, stream_name: str) -> str:
285        """Return the SQL table name of the named stream."""
286        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
287
288    def get_sql_table(
289        self,
290        stream_name: str,
291    ) -> sqlalchemy.Table:
292        """Return a SQLAlchemy table object for the named stream."""
293        return self.get_sql_cache().processor.get_sql_table(stream_name)
294
295    def get_dataset(self, stream_name: str) -> CachedDataset:
296        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
297
298        This can be used to read and analyze the data in a SQL-based destination.
299
300        TODO: In a future iteration, we can consider providing stream configuration information
301              (catalog information) to the `CachedDataset` object via the "Get stream properties"
302              API: https://reference.airbyte.com/reference/getstreamproperties
303        """
304        return CachedDataset(
305            self.get_sql_cache(),
306            stream_name=stream_name,
307            stream_configuration=False,  # Don't look for stream configuration in cache.
308        )
309
310    def get_sql_database_name(self) -> str:
311        """Return the SQL database name."""
312        cache = self.get_sql_cache()
313        return cache.get_database_name()
314
315    def get_sql_schema_name(self) -> str:
316        """Return the SQL schema name."""
317        cache = self.get_sql_cache()
318        return cache.schema_name
319
320    @property
321    def stream_names(self) -> list[str]:
322        """Return the set of stream names."""
323        return self.connection.stream_names
324
325    @final
326    @property
327    def streams(
328        self,
329    ) -> _SyncResultStreams:
330        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
331
332        This is a convenience wrapper around the `stream_names`
333        property and `get_dataset()` method.
334        """
335        return self._SyncResultStreams(self)
336
337    class _SyncResultStreams(Mapping[str, CachedDataset]):
338        """A mapping of stream names to cached datasets."""
339
340        def __init__(
341            self,
342            parent: SyncResult,
343            /,
344        ) -> None:
345            self.parent: SyncResult = parent
346
347        def __getitem__(self, key: str) -> CachedDataset:
348            return self.parent.get_dataset(stream_name=key)
349
350        def __iter__(self) -> Iterator[str]:
351            return iter(self.parent.stream_names)
352
353        def __len__(self) -> int:
354            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"

Return the URL of the sync job.

def is_job_complete(self) -> bool:
176    def is_job_complete(self) -> bool:
177        """Check if the sync job is complete."""
178        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
180    def get_job_status(self) -> JobStatusEnum:
181        """Check if the sync job is still running."""
182        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
197    @property
198    def bytes_synced(self) -> int:
199        """Return the number of records processed."""
200        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
202    @property
203    def records_synced(self) -> int:
204        """Return the number of records processed."""
205        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
207    @property
208    def start_time(self) -> datetime:
209        """Return the start time of the sync job in UTC."""
210        # Parse from ISO 8601 format:
211        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
213    def raise_failure_status(
214        self,
215        *,
216        refresh_status: bool = False,
217    ) -> None:
218        """Raise an exception if the sync job failed.
219
220        By default, this method will use the latest status available. If you want to refresh the
221        status before checking for failure, set `refresh_status=True`. If the job has failed, this
222        method will raise a `AirbyteConnectionSyncError`.
223
224        Otherwise, do nothing.
225        """
226        if not refresh_status and self._latest_job_info:
227            latest_status = self._latest_job_info.status
228        else:
229            latest_status = self.get_job_status()
230
231        if latest_status in FAILED_STATUSES:
232            raise AirbyteConnectionSyncError(
233                workspace=self.workspace,
234                connection_id=self.connection.connection_id,
235                job_id=self.job_id,
236                job_status=self.get_job_status(),
237            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
239    def wait_for_completion(
240        self,
241        *,
242        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
243        raise_timeout: bool = True,
244        raise_failure: bool = False,
245    ) -> JobStatusEnum:
246        """Wait for a job to finish running."""
247        start_time = time.time()
248        while True:
249            latest_status = self.get_job_status()
250            if latest_status in FINAL_STATUSES:
251                if raise_failure:
252                    # No-op if the job succeeded or is still running:
253                    self.raise_failure_status()
254
255                return latest_status
256
257            if time.time() - start_time > wait_timeout:
258                if raise_timeout:
259                    raise AirbyteConnectionSyncTimeoutError(
260                        workspace=self.workspace,
261                        connection_id=self.connection.connection_id,
262                        job_id=self.job_id,
263                        job_status=latest_status,
264                        timeout=wait_timeout,
265                    )
266
267                return latest_status  # This will be a non-final status
268
269            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
271    def get_sql_cache(self) -> CacheBase:
272        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
273        if self._cache:
274            return self._cache
275
276        destination_configuration = self._get_destination_configuration()
277        self._cache = destination_to_cache(destination_configuration=destination_configuration)
278        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
280    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
281        """Return a SQL Engine for querying a SQL-based destination."""
282        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
284    def get_sql_table_name(self, stream_name: str) -> str:
285        """Return the SQL table name of the named stream."""
286        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
288    def get_sql_table(
289        self,
290        stream_name: str,
291    ) -> sqlalchemy.Table:
292        """Return a SQLAlchemy table object for the named stream."""
293        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
295    def get_dataset(self, stream_name: str) -> CachedDataset:
296        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
297
298        This can be used to read and analyze the data in a SQL-based destination.
299
300        TODO: In a future iteration, we can consider providing stream configuration information
301              (catalog information) to the `CachedDataset` object via the "Get stream properties"
302              API: https://reference.airbyte.com/reference/getstreamproperties
303        """
304        return CachedDataset(
305            self.get_sql_cache(),
306            stream_name=stream_name,
307            stream_configuration=False,  # Don't look for stream configuration in cache.
308        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
310    def get_sql_database_name(self) -> str:
311        """Return the SQL database name."""
312        cache = self.get_sql_cache()
313        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
315    def get_sql_schema_name(self) -> str:
316        """Return the SQL schema name."""
317        cache = self.get_sql_cache()
318        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
320    @property
321    def stream_names(self) -> list[str]:
322        """Return the set of stream names."""
323        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
325    @final
326    @property
327    def streams(
328        self,
329    ) -> _SyncResultStreams:
330        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
331
332        This is a convenience wrapper around the `stream_names`
333        property and `get_dataset()` method.
334        """
335        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans