airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Usage Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

ℹ️ Experimental Features

You can use the airbyte.cloud.experimental module to access experimental features. These additional features are subject to change and may not be available in all environments.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Usage Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50
51ℹ️ **Experimental Features**
52
53You can use the `airbyte.cloud.experimental` module to access experimental features.
54These additional features are subject to change and may not be available in all environments.
55"""  # noqa: RUF002  # Allow emoji
56
57from __future__ import annotations
58
59from typing import TYPE_CHECKING
60
61from airbyte.cloud.connections import CloudConnection
62from airbyte.cloud.constants import JobStatusEnum
63from airbyte.cloud.sync_results import SyncResult
64from airbyte.cloud.workspaces import CloudWorkspace
65
66
67# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
68if TYPE_CHECKING:
69    # ruff: noqa: TC004
70    from airbyte.cloud import connections, constants, sync_results, workspaces
71
72
73__all__ = [
74    # Submodules
75    "workspaces",
76    "connections",
77    "constants",
78    "sync_results",
79    # Classes
80    "CloudWorkspace",
81    "CloudConnection",
82    "SyncResult",
83    # Enums
84    "JobStatusEnum",
85]
@dataclass
class CloudWorkspace:
 28@dataclass
 29class CloudWorkspace:
 30    """A remote workspace on the Airbyte Cloud.
 31
 32    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 33    instances, both OSS and Enterprise.
 34    """
 35
 36    workspace_id: str
 37    client_id: SecretString
 38    client_secret: SecretString
 39    api_root: str = api_util.CLOUD_API_ROOT
 40
 41    def __post_init__(self) -> None:
 42        """Ensure that the client ID and secret are handled securely."""
 43        self.client_id = SecretString(self.client_id)
 44        self.client_secret = SecretString(self.client_secret)
 45
 46    @property
 47    def workspace_url(self) -> str | None:
 48        """The URL of the workspace."""
 49        return f"{self.api_root}/workspaces/{self.workspace_id}"
 50
 51    # Test connection and creds
 52
 53    def connect(self) -> None:
 54        """Check that the workspace is reachable and raise an exception otherwise.
 55
 56        Note: It is not necessary to call this method before calling other operations. It
 57              serves primarily as a simple check to ensure that the workspace is reachable
 58              and credentials are correct.
 59        """
 60        _ = api_util.get_workspace(
 61            api_root=self.api_root,
 62            workspace_id=self.workspace_id,
 63            client_id=self.client_id,
 64            client_secret=self.client_secret,
 65        )
 66        print(f"Successfully connected to workspace: {self.workspace_url}")
 67
 68    # Deploy sources and destinations
 69
 70    def deploy_source(
 71        self,
 72        name: str,
 73        source: Source,
 74        *,
 75        unique: bool = True,
 76        random_name_suffix: bool = False,
 77    ) -> CloudSource:
 78        """Deploy a source to the workspace.
 79
 80        Returns the newly deployed source.
 81
 82        Args:
 83            name: The name to use when deploying.
 84            source: The source object to deploy.
 85            unique: Whether to require a unique name. If `True`, duplicate names
 86                are not allowed. Defaults to `True`.
 87            random_name_suffix: Whether to append a random suffix to the name.
 88        """
 89        source_config_dict = source.get_config().copy()
 90        source_config_dict["sourceType"] = source.name.replace("source-", "")
 91
 92        if random_name_suffix:
 93            name += f" (ID: {text_util.generate_random_suffix()})"
 94
 95        if unique:
 96            existing = self.list_sources(name=name)
 97            if existing:
 98                raise exc.AirbyteDuplicateResourcesError(
 99                    resource_type="source",
100                    resource_name=name,
101                )
102
103        deployed_source = api_util.create_source(
104            name=name,
105            api_root=self.api_root,
106            workspace_id=self.workspace_id,
107            config=source_config_dict,
108            client_id=self.client_id,
109            client_secret=self.client_secret,
110        )
111        return CloudSource(
112            workspace=self,
113            connector_id=deployed_source.source_id,
114        )
115
116    def deploy_destination(
117        self,
118        name: str,
119        destination: Destination | dict[str, Any],
120        *,
121        unique: bool = True,
122        random_name_suffix: bool = False,
123    ) -> CloudDestination:
124        """Deploy a destination to the workspace.
125
126        Returns the newly deployed destination ID.
127
128        Args:
129            name: The name to use when deploying.
130            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
131                dictionary of configuration values.
132            unique: Whether to require a unique name. If `True`, duplicate names
133                are not allowed. Defaults to `True`.
134            random_name_suffix: Whether to append a random suffix to the name.
135        """
136        if isinstance(destination, Destination):
137            destination_conf_dict = destination.get_config().copy()
138            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
139            # raise ValueError(destination_conf_dict)
140        else:
141            destination_conf_dict = destination.copy()
142            if "destinationType" not in destination_conf_dict:
143                raise exc.PyAirbyteInputError(
144                    message="Missing `destinationType` in configuration dictionary.",
145                )
146
147        if random_name_suffix:
148            name += f" (ID: {text_util.generate_random_suffix()})"
149
150        if unique:
151            existing = self.list_destinations(name=name)
152            if existing:
153                raise exc.AirbyteDuplicateResourcesError(
154                    resource_type="destination",
155                    resource_name=name,
156                )
157
158        deployed_destination = api_util.create_destination(
159            name=name,
160            api_root=self.api_root,
161            workspace_id=self.workspace_id,
162            config=destination_conf_dict,  # Wants a dataclass but accepts dict
163            client_id=self.client_id,
164            client_secret=self.client_secret,
165        )
166        return CloudDestination(
167            workspace=self,
168            connector_id=deployed_destination.destination_id,
169        )
170
171    def permanently_delete_source(
172        self,
173        source: str | CloudSource,
174    ) -> None:
175        """Delete a source from the workspace.
176
177        You can pass either the source ID `str` or a deployed `Source` object.
178        """
179        if not isinstance(source, (str, CloudSource)):
180            raise exc.PyAirbyteInputError(
181                message="Invalid source type.",
182                input_value=type(source).__name__,
183            )
184
185        api_util.delete_source(
186            source_id=source.connector_id if isinstance(source, CloudSource) else source,
187            api_root=self.api_root,
188            client_id=self.client_id,
189            client_secret=self.client_secret,
190        )
191
192    # Deploy and delete destinations
193
194    def permanently_delete_destination(
195        self,
196        destination: str | CloudDestination,
197    ) -> None:
198        """Delete a deployed destination from the workspace.
199
200        You can pass either the `Cache` class or the deployed destination ID as a `str`.
201        """
202        if not isinstance(destination, (str, CloudDestination)):
203            raise exc.PyAirbyteInputError(
204                message="Invalid destination type.",
205                input_value=type(destination).__name__,
206            )
207
208        api_util.delete_destination(
209            destination_id=(
210                destination if isinstance(destination, str) else destination.destination_id
211            ),
212            api_root=self.api_root,
213            client_id=self.client_id,
214            client_secret=self.client_secret,
215        )
216
217    # Deploy and delete connections
218
219    def deploy_connection(
220        self,
221        connection_name: str,
222        *,
223        source: CloudSource | str,
224        selected_streams: list[str],
225        destination: CloudDestination | str,
226        table_prefix: str | None = None,
227    ) -> CloudConnection:
228        """Create a new connection between an already deployed source and destination.
229
230        Returns the newly deployed connection object.
231
232        Args:
233            connection_name: The name of the connection.
234            source: The deployed source. You can pass a source ID or a CloudSource object.
235            destination: The deployed destination. You can pass a destination ID or a
236                CloudDestination object.
237            table_prefix: Optional. The table prefix to use when syncing to the destination.
238            selected_streams: The selected stream names to sync within the connection.
239        """
240        if not selected_streams:
241            raise exc.PyAirbyteInputError(
242                guidance="You must provide `selected_streams` when creating a connection."
243            )
244
245        source_id: str = source if isinstance(source, str) else source.connector_id
246        destination_id: str = (
247            destination if isinstance(destination, str) else destination.connector_id
248        )
249
250        deployed_connection = api_util.create_connection(
251            name=connection_name,
252            source_id=source_id,
253            destination_id=destination_id,
254            api_root=self.api_root,
255            workspace_id=self.workspace_id,
256            selected_stream_names=selected_streams,
257            prefix=table_prefix or "",
258            client_id=self.client_id,
259            client_secret=self.client_secret,
260        )
261
262        return CloudConnection(
263            workspace=self,
264            connection_id=deployed_connection.connection_id,
265            source=deployed_connection.source_id,
266            destination=deployed_connection.destination_id,
267        )
268
269    def get_connection(
270        self,
271        connection_id: str,
272    ) -> CloudConnection:
273        """Get a connection by ID.
274
275        This method does not fetch data from the API. It returns a `CloudConnection` object,
276        which will be loaded lazily as needed.
277        """
278        return CloudConnection(
279            workspace=self,
280            connection_id=connection_id,
281        )
282
283    def permanently_delete_connection(
284        self,
285        connection: str | CloudConnection,
286        *,
287        cascade_delete_source: bool = False,
288        cascade_delete_destination: bool = False,
289    ) -> None:
290        """Delete a deployed connection from the workspace."""
291        if connection is None:
292            raise ValueError("No connection ID provided.")
293
294        if isinstance(connection, str):
295            connection = CloudConnection(
296                workspace=self,
297                connection_id=connection,
298            )
299
300        api_util.delete_connection(
301            connection_id=connection.connection_id,
302            api_root=self.api_root,
303            workspace_id=self.workspace_id,
304            client_id=self.client_id,
305            client_secret=self.client_secret,
306        )
307
308        if cascade_delete_source:
309            self.permanently_delete_source(source=connection.source_id)
310        if cascade_delete_destination:
311            self.permanently_delete_destination(destination=connection.destination_id)
312
313    # List sources, destinations, and connections
314
315    def list_connections(
316        self,
317        name: str | None = None,
318        *,
319        name_filter: Callable | None = None,
320    ) -> list[CloudConnection]:
321        """List connections by name in the workspace."""
322        connections = api_util.list_connections(
323            api_root=self.api_root,
324            workspace_id=self.workspace_id,
325            name=name,
326            name_filter=name_filter,
327            client_id=self.client_id,
328            client_secret=self.client_secret,
329        )
330        return [
331            CloudConnection(
332                workspace=self,
333                connection_id=connection.connection_id,
334                source=None,
335                destination=None,
336            )
337            for connection in connections
338            if name is None or connection.name == name
339        ]
340
341    def list_sources(
342        self,
343        name: str | None = None,
344        *,
345        name_filter: Callable | None = None,
346    ) -> list[CloudSource]:
347        """List all sources in the workspace."""
348        sources = api_util.list_sources(
349            api_root=self.api_root,
350            workspace_id=self.workspace_id,
351            name=name,
352            name_filter=name_filter,
353            client_id=self.client_id,
354            client_secret=self.client_secret,
355        )
356        return [
357            CloudSource(
358                workspace=self,
359                connector_id=source.source_id,
360            )
361            for source in sources
362            if name is None or source.name == name
363        ]
364
365    def list_destinations(
366        self,
367        name: str | None = None,
368        *,
369        name_filter: Callable | None = None,
370    ) -> list[CloudDestination]:
371        """List all destinations in the workspace."""
372        destinations = api_util.list_destinations(
373            api_root=self.api_root,
374            workspace_id=self.workspace_id,
375            name=name,
376            name_filter=name_filter,
377            client_id=self.client_id,
378            client_secret=self.client_secret,
379        )
380        return [
381            CloudDestination(
382                workspace=self,
383                connector_id=destination.destination_id,
384            )
385            for destination in destinations
386            if name is None or destination.name == name
387        ]

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
46    @property
47    def workspace_url(self) -> str | None:
48        """The URL of the workspace."""
49        return f"{self.api_root}/workspaces/{self.workspace_id}"

The URL of the workspace.

def connect(self) -> None:
53    def connect(self) -> None:
54        """Check that the workspace is reachable and raise an exception otherwise.
55
56        Note: It is not necessary to call this method before calling other operations. It
57              serves primarily as a simple check to ensure that the workspace is reachable
58              and credentials are correct.
59        """
60        _ = api_util.get_workspace(
61            api_root=self.api_root,
62            workspace_id=self.workspace_id,
63            client_id=self.client_id,
64            client_secret=self.client_secret,
65        )
66        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
 70    def deploy_source(
 71        self,
 72        name: str,
 73        source: Source,
 74        *,
 75        unique: bool = True,
 76        random_name_suffix: bool = False,
 77    ) -> CloudSource:
 78        """Deploy a source to the workspace.
 79
 80        Returns the newly deployed source.
 81
 82        Args:
 83            name: The name to use when deploying.
 84            source: The source object to deploy.
 85            unique: Whether to require a unique name. If `True`, duplicate names
 86                are not allowed. Defaults to `True`.
 87            random_name_suffix: Whether to append a random suffix to the name.
 88        """
 89        source_config_dict = source.get_config().copy()
 90        source_config_dict["sourceType"] = source.name.replace("source-", "")
 91
 92        if random_name_suffix:
 93            name += f" (ID: {text_util.generate_random_suffix()})"
 94
 95        if unique:
 96            existing = self.list_sources(name=name)
 97            if existing:
 98                raise exc.AirbyteDuplicateResourcesError(
 99                    resource_type="source",
100                    resource_name=name,
101                )
102
103        deployed_source = api_util.create_source(
104            name=name,
105            api_root=self.api_root,
106            workspace_id=self.workspace_id,
107            config=source_config_dict,
108            client_id=self.client_id,
109            client_secret=self.client_secret,
110        )
111        return CloudSource(
112            workspace=self,
113            connector_id=deployed_source.source_id,
114        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
116    def deploy_destination(
117        self,
118        name: str,
119        destination: Destination | dict[str, Any],
120        *,
121        unique: bool = True,
122        random_name_suffix: bool = False,
123    ) -> CloudDestination:
124        """Deploy a destination to the workspace.
125
126        Returns the newly deployed destination ID.
127
128        Args:
129            name: The name to use when deploying.
130            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
131                dictionary of configuration values.
132            unique: Whether to require a unique name. If `True`, duplicate names
133                are not allowed. Defaults to `True`.
134            random_name_suffix: Whether to append a random suffix to the name.
135        """
136        if isinstance(destination, Destination):
137            destination_conf_dict = destination.get_config().copy()
138            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
139            # raise ValueError(destination_conf_dict)
140        else:
141            destination_conf_dict = destination.copy()
142            if "destinationType" not in destination_conf_dict:
143                raise exc.PyAirbyteInputError(
144                    message="Missing `destinationType` in configuration dictionary.",
145                )
146
147        if random_name_suffix:
148            name += f" (ID: {text_util.generate_random_suffix()})"
149
150        if unique:
151            existing = self.list_destinations(name=name)
152            if existing:
153                raise exc.AirbyteDuplicateResourcesError(
154                    resource_type="destination",
155                    resource_name=name,
156                )
157
158        deployed_destination = api_util.create_destination(
159            name=name,
160            api_root=self.api_root,
161            workspace_id=self.workspace_id,
162            config=destination_conf_dict,  # Wants a dataclass but accepts dict
163            client_id=self.client_id,
164            client_secret=self.client_secret,
165        )
166        return CloudDestination(
167            workspace=self,
168            connector_id=deployed_destination.destination_id,
169        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
171    def permanently_delete_source(
172        self,
173        source: str | CloudSource,
174    ) -> None:
175        """Delete a source from the workspace.
176
177        You can pass either the source ID `str` or a deployed `Source` object.
178        """
179        if not isinstance(source, (str, CloudSource)):
180            raise exc.PyAirbyteInputError(
181                message="Invalid source type.",
182                input_value=type(source).__name__,
183            )
184
185        api_util.delete_source(
186            source_id=source.connector_id if isinstance(source, CloudSource) else source,
187            api_root=self.api_root,
188            client_id=self.client_id,
189            client_secret=self.client_secret,
190        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
194    def permanently_delete_destination(
195        self,
196        destination: str | CloudDestination,
197    ) -> None:
198        """Delete a deployed destination from the workspace.
199
200        You can pass either the `Cache` class or the deployed destination ID as a `str`.
201        """
202        if not isinstance(destination, (str, CloudDestination)):
203            raise exc.PyAirbyteInputError(
204                message="Invalid destination type.",
205                input_value=type(destination).__name__,
206            )
207
208        api_util.delete_destination(
209            destination_id=(
210                destination if isinstance(destination, str) else destination.destination_id
211            ),
212            api_root=self.api_root,
213            client_id=self.client_id,
214            client_secret=self.client_secret,
215        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
219    def deploy_connection(
220        self,
221        connection_name: str,
222        *,
223        source: CloudSource | str,
224        selected_streams: list[str],
225        destination: CloudDestination | str,
226        table_prefix: str | None = None,
227    ) -> CloudConnection:
228        """Create a new connection between an already deployed source and destination.
229
230        Returns the newly deployed connection object.
231
232        Args:
233            connection_name: The name of the connection.
234            source: The deployed source. You can pass a source ID or a CloudSource object.
235            destination: The deployed destination. You can pass a destination ID or a
236                CloudDestination object.
237            table_prefix: Optional. The table prefix to use when syncing to the destination.
238            selected_streams: The selected stream names to sync within the connection.
239        """
240        if not selected_streams:
241            raise exc.PyAirbyteInputError(
242                guidance="You must provide `selected_streams` when creating a connection."
243            )
244
245        source_id: str = source if isinstance(source, str) else source.connector_id
246        destination_id: str = (
247            destination if isinstance(destination, str) else destination.connector_id
248        )
249
250        deployed_connection = api_util.create_connection(
251            name=connection_name,
252            source_id=source_id,
253            destination_id=destination_id,
254            api_root=self.api_root,
255            workspace_id=self.workspace_id,
256            selected_stream_names=selected_streams,
257            prefix=table_prefix or "",
258            client_id=self.client_id,
259            client_secret=self.client_secret,
260        )
261
262        return CloudConnection(
263            workspace=self,
264            connection_id=deployed_connection.connection_id,
265            source=deployed_connection.source_id,
266            destination=deployed_connection.destination_id,
267        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def get_connection(self, connection_id: str) -> CloudConnection:
269    def get_connection(
270        self,
271        connection_id: str,
272    ) -> CloudConnection:
273        """Get a connection by ID.
274
275        This method does not fetch data from the API. It returns a `CloudConnection` object,
276        which will be loaded lazily as needed.
277        """
278        return CloudConnection(
279            workspace=self,
280            connection_id=connection_id,
281        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
283    def permanently_delete_connection(
284        self,
285        connection: str | CloudConnection,
286        *,
287        cascade_delete_source: bool = False,
288        cascade_delete_destination: bool = False,
289    ) -> None:
290        """Delete a deployed connection from the workspace."""
291        if connection is None:
292            raise ValueError("No connection ID provided.")
293
294        if isinstance(connection, str):
295            connection = CloudConnection(
296                workspace=self,
297                connection_id=connection,
298            )
299
300        api_util.delete_connection(
301            connection_id=connection.connection_id,
302            api_root=self.api_root,
303            workspace_id=self.workspace_id,
304            client_id=self.client_id,
305            client_secret=self.client_secret,
306        )
307
308        if cascade_delete_source:
309            self.permanently_delete_source(source=connection.source_id)
310        if cascade_delete_destination:
311            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
315    def list_connections(
316        self,
317        name: str | None = None,
318        *,
319        name_filter: Callable | None = None,
320    ) -> list[CloudConnection]:
321        """List connections by name in the workspace."""
322        connections = api_util.list_connections(
323            api_root=self.api_root,
324            workspace_id=self.workspace_id,
325            name=name,
326            name_filter=name_filter,
327            client_id=self.client_id,
328            client_secret=self.client_secret,
329        )
330        return [
331            CloudConnection(
332                workspace=self,
333                connection_id=connection.connection_id,
334                source=None,
335                destination=None,
336            )
337            for connection in connections
338            if name is None or connection.name == name
339        ]

List connections by name in the workspace.

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
341    def list_sources(
342        self,
343        name: str | None = None,
344        *,
345        name_filter: Callable | None = None,
346    ) -> list[CloudSource]:
347        """List all sources in the workspace."""
348        sources = api_util.list_sources(
349            api_root=self.api_root,
350            workspace_id=self.workspace_id,
351            name=name,
352            name_filter=name_filter,
353            client_id=self.client_id,
354            client_secret=self.client_secret,
355        )
356        return [
357            CloudSource(
358                workspace=self,
359                connector_id=source.source_id,
360            )
361            for source in sources
362            if name is None or source.name == name
363        ]

List all sources in the workspace.

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
365    def list_destinations(
366        self,
367        name: str | None = None,
368        *,
369        name_filter: Callable | None = None,
370    ) -> list[CloudDestination]:
371        """List all destinations in the workspace."""
372        destinations = api_util.list_destinations(
373            api_root=self.api_root,
374            workspace_id=self.workspace_id,
375            name=name,
376            name_filter=name_filter,
377            client_id=self.client_id,
378            client_secret=self.client_secret,
379        )
380        return [
381            CloudDestination(
382                workspace=self,
383                connector_id=destination.destination_id,
384            )
385            for destination in destinations
386            if name is None or destination.name == name
387        ]

List all destinations in the workspace.

class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    # Properties
 69
 70    @property
 71    def source_id(self) -> str:
 72        """The ID of the source."""
 73        if not self._source_id:
 74            if not self._connection_info:
 75                self._connection_info = self._fetch_connection_info()
 76
 77            self._source_id = self._connection_info.source_id
 78
 79        return cast("str", self._source_id)
 80
 81    @property
 82    def source(self) -> CloudSource:
 83        """Get the source object."""
 84        if self._cloud_source_object:
 85            return self._cloud_source_object
 86
 87        self._cloud_source_object = CloudSource(
 88            workspace=self.workspace,
 89            connector_id=self.source_id,
 90        )
 91        return self._cloud_source_object
 92
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)
103
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object
115
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]
123
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""
131
132    @property
133    def connection_url(self) -> str | None:
134        """The URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
136
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/job-history"
141
142    # Run Sync
143
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result
172
173    # Logs
174
175    def get_previous_sync_logs(
176        self,
177        *,
178        limit: int = 10,
179    ) -> list[SyncResult]:
180        """Get the previous sync logs for a connection."""
181        sync_logs: list[JobResponse] = api_util.get_job_logs(
182            connection_id=self.connection_id,
183            api_root=self.workspace.api_root,
184            workspace_id=self.workspace.workspace_id,
185            limit=limit,
186            client_id=self.workspace.client_id,
187            client_secret=self.workspace.client_secret,
188        )
189        return [
190            SyncResult(
191                workspace=self.workspace,
192                connection=self,
193                job_id=sync_log.job_id,
194                _latest_job_info=sync_log,
195            )
196            for sync_log in sync_logs
197        ]
198
199    def get_sync_result(
200        self,
201        job_id: int | None = None,
202    ) -> SyncResult | None:
203        """Get the sync result for the connection.
204
205        If `job_id` is not provided, the most recent sync job will be used.
206
207        Returns `None` if job_id is omitted and no previous jobs are found.
208        """
209        if job_id is None:
210            # Get the most recent sync job
211            results = self.get_previous_sync_logs(
212                limit=1,
213            )
214            if results:
215                return results[0]
216
217            return None
218
219        # Get the sync job by ID (lazy loaded)
220        return SyncResult(
221            workspace=self.workspace,
222            connection=self,
223            job_id=job_id,
224        )
225
226    # Deletions
227
228    def permanently_delete(
229        self,
230        *,
231        cascade_delete_source: bool = False,
232        cascade_delete_destination: bool = False,
233    ) -> None:
234        """Delete the connection.
235
236        Args:
237            cascade_delete_source: Whether to also delete the source.
238            cascade_delete_destination: Whether to also delete the destination.
239        """
240        self.workspace.permanently_delete_connection(self)
241
242        if cascade_delete_source:
243            self.workspace.permanently_delete_source(self.source_id)
244
245        if cascade_delete_destination:
246            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
70    @property
71    def source_id(self) -> str:
72        """The ID of the source."""
73        if not self._source_id:
74            if not self._connection_info:
75                self._connection_info = self._fetch_connection_info()
76
77            self._source_id = self._connection_info.source_id
78
79        return cast("str", self._source_id)

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
81    @property
82    def source(self) -> CloudSource:
83        """Get the source object."""
84        if self._cloud_source_object:
85            return self._cloud_source_object
86
87        self._cloud_source_object = CloudSource(
88            workspace=self.workspace,
89            connector_id=self.source_id,
90        )
91        return self._cloud_source_object

Get the source object.

destination_id: str
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
132    @property
133    def connection_url(self) -> str | None:
134        """The URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The URL to the connection.

job_history_url: str | None
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/job-history"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
175    def get_previous_sync_logs(
176        self,
177        *,
178        limit: int = 10,
179    ) -> list[SyncResult]:
180        """Get the previous sync logs for a connection."""
181        sync_logs: list[JobResponse] = api_util.get_job_logs(
182            connection_id=self.connection_id,
183            api_root=self.workspace.api_root,
184            workspace_id=self.workspace.workspace_id,
185            limit=limit,
186            client_id=self.workspace.client_id,
187            client_secret=self.workspace.client_secret,
188        )
189        return [
190            SyncResult(
191                workspace=self.workspace,
192                connection=self,
193                job_id=sync_log.job_id,
194                _latest_job_info=sync_log,
195            )
196            for sync_log in sync_logs
197        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
199    def get_sync_result(
200        self,
201        job_id: int | None = None,
202    ) -> SyncResult | None:
203        """Get the sync result for the connection.
204
205        If `job_id` is not provided, the most recent sync job will be used.
206
207        Returns `None` if job_id is omitted and no previous jobs are found.
208        """
209        if job_id is None:
210            # Get the most recent sync job
211            results = self.get_previous_sync_logs(
212                limit=1,
213            )
214            if results:
215                return results[0]
216
217            return None
218
219        # Get the sync job by ID (lazy loaded)
220        return SyncResult(
221            workspace=self.workspace,
222            connection=self,
223            job_id=job_id,
224        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
228    def permanently_delete(
229        self,
230        *,
231        cascade_delete_source: bool = False,
232        cascade_delete_destination: bool = False,
233    ) -> None:
234        """Delete the connection.
235
236        Args:
237            cascade_delete_source: Whether to also delete the source.
238            cascade_delete_destination: Whether to also delete the destination.
239        """
240        self.workspace.permanently_delete_connection(self)
241
242        if cascade_delete_source:
243            self.workspace.permanently_delete_source(self.source_id)
244
245        if cascade_delete_destination:
246            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: int
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"
150
151    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
152        """Return connection info for the sync job."""
153        if self._connection_response and not force_refresh:
154            return self._connection_response
155
156        self._connection_response = api_util.get_connection(
157            workspace_id=self.workspace.workspace_id,
158            api_root=self.workspace.api_root,
159            connection_id=self.connection.connection_id,
160            client_id=self.workspace.client_id,
161            client_secret=self.workspace.client_secret,
162        )
163        return self._connection_response
164
165    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
166        """Return the destination configuration for the sync job."""
167        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
168        destination_response = api_util.get_destination(
169            destination_id=connection_info.destination_id,
170            api_root=self.workspace.api_root,
171            client_id=self.workspace.client_id,
172            client_secret=self.workspace.client_secret,
173        )
174        return asdict(destination_response.configuration)
175
176    def is_job_complete(self) -> bool:
177        """Check if the sync job is complete."""
178        return self.get_job_status() in FINAL_STATUSES
179
180    def get_job_status(self) -> JobStatusEnum:
181        """Check if the sync job is still running."""
182        return self._fetch_latest_job_info().status
183
184    def _fetch_latest_job_info(self) -> JobResponse:
185        """Return the job info for the sync job."""
186        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
187            return self._latest_job_info
188
189        self._latest_job_info = api_util.get_job_info(
190            job_id=self.job_id,
191            api_root=self.workspace.api_root,
192            client_id=self.workspace.client_id,
193            client_secret=self.workspace.client_secret,
194        )
195        return self._latest_job_info
196
197    @property
198    def bytes_synced(self) -> int:
199        """Return the number of records processed."""
200        return self._fetch_latest_job_info().bytes_synced or 0
201
202    @property
203    def records_synced(self) -> int:
204        """Return the number of records processed."""
205        return self._fetch_latest_job_info().rows_synced or 0
206
207    @property
208    def start_time(self) -> datetime:
209        """Return the start time of the sync job in UTC."""
210        # Parse from ISO 8601 format:
211        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
212
213    def raise_failure_status(
214        self,
215        *,
216        refresh_status: bool = False,
217    ) -> None:
218        """Raise an exception if the sync job failed.
219
220        By default, this method will use the latest status available. If you want to refresh the
221        status before checking for failure, set `refresh_status=True`. If the job has failed, this
222        method will raise a `AirbyteConnectionSyncError`.
223
224        Otherwise, do nothing.
225        """
226        if not refresh_status and self._latest_job_info:
227            latest_status = self._latest_job_info.status
228        else:
229            latest_status = self.get_job_status()
230
231        if latest_status in FAILED_STATUSES:
232            raise AirbyteConnectionSyncError(
233                workspace=self.workspace,
234                connection_id=self.connection.connection_id,
235                job_id=self.job_id,
236                job_status=self.get_job_status(),
237            )
238
239    def wait_for_completion(
240        self,
241        *,
242        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
243        raise_timeout: bool = True,
244        raise_failure: bool = False,
245    ) -> JobStatusEnum:
246        """Wait for a job to finish running."""
247        start_time = time.time()
248        while True:
249            latest_status = self.get_job_status()
250            if latest_status in FINAL_STATUSES:
251                if raise_failure:
252                    # No-op if the job succeeded or is still running:
253                    self.raise_failure_status()
254
255                return latest_status
256
257            if time.time() - start_time > wait_timeout:
258                if raise_timeout:
259                    raise AirbyteConnectionSyncTimeoutError(
260                        workspace=self.workspace,
261                        connection_id=self.connection.connection_id,
262                        job_id=self.job_id,
263                        job_status=latest_status,
264                        timeout=wait_timeout,
265                    )
266
267                return latest_status  # This will be a non-final status
268
269            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
270
271    def get_sql_cache(self) -> CacheBase:
272        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
273        if self._cache:
274            return self._cache
275
276        destination_configuration = self._get_destination_configuration()
277        self._cache = destination_to_cache(destination_configuration=destination_configuration)
278        return self._cache
279
280    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
281        """Return a SQL Engine for querying a SQL-based destination."""
282        return self.get_sql_cache().get_sql_engine()
283
284    def get_sql_table_name(self, stream_name: str) -> str:
285        """Return the SQL table name of the named stream."""
286        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
287
288    def get_sql_table(
289        self,
290        stream_name: str,
291    ) -> sqlalchemy.Table:
292        """Return a SQLAlchemy table object for the named stream."""
293        return self.get_sql_cache().processor.get_sql_table(stream_name)
294
295    def get_dataset(self, stream_name: str) -> CachedDataset:
296        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
297
298        This can be used to read and analyze the data in a SQL-based destination.
299
300        TODO: In a future iteration, we can consider providing stream configuration information
301              (catalog information) to the `CachedDataset` object via the "Get stream properties"
302              API: https://reference.airbyte.com/reference/getstreamproperties
303        """
304        return CachedDataset(
305            self.get_sql_cache(),
306            stream_name=stream_name,
307            stream_configuration=False,  # Don't look for stream configuration in cache.
308        )
309
310    def get_sql_database_name(self) -> str:
311        """Return the SQL database name."""
312        cache = self.get_sql_cache()
313        return cache.get_database_name()
314
315    def get_sql_schema_name(self) -> str:
316        """Return the SQL schema name."""
317        cache = self.get_sql_cache()
318        return cache.schema_name
319
320    @property
321    def stream_names(self) -> list[str]:
322        """Return the set of stream names."""
323        return self.connection.stream_names
324
325    @final
326    @property
327    def streams(
328        self,
329    ) -> _SyncResultStreams:
330        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
331
332        This is a convenience wrapper around the `stream_names`
333        property and `get_dataset()` method.
334        """
335        return self._SyncResultStreams(self)
336
337    class _SyncResultStreams(Mapping[str, CachedDataset]):
338        """A mapping of stream names to cached datasets."""
339
340        def __init__(
341            self,
342            parent: SyncResult,
343            /,
344        ) -> None:
345            self.parent: SyncResult = parent
346
347        def __getitem__(self, key: str) -> CachedDataset:
348            return self.parent.get_dataset(stream_name=key)
349
350        def __iter__(self) -> Iterator[str]:
351            return iter(self.parent.stream_names)
352
353        def __len__(self) -> int:
354            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"

Return the URL of the sync job.

def is_job_complete(self) -> bool:
176    def is_job_complete(self) -> bool:
177        """Check if the sync job is complete."""
178        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
180    def get_job_status(self) -> JobStatusEnum:
181        """Check if the sync job is still running."""
182        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
197    @property
198    def bytes_synced(self) -> int:
199        """Return the number of records processed."""
200        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
202    @property
203    def records_synced(self) -> int:
204        """Return the number of records processed."""
205        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
207    @property
208    def start_time(self) -> datetime:
209        """Return the start time of the sync job in UTC."""
210        # Parse from ISO 8601 format:
211        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
213    def raise_failure_status(
214        self,
215        *,
216        refresh_status: bool = False,
217    ) -> None:
218        """Raise an exception if the sync job failed.
219
220        By default, this method will use the latest status available. If you want to refresh the
221        status before checking for failure, set `refresh_status=True`. If the job has failed, this
222        method will raise a `AirbyteConnectionSyncError`.
223
224        Otherwise, do nothing.
225        """
226        if not refresh_status and self._latest_job_info:
227            latest_status = self._latest_job_info.status
228        else:
229            latest_status = self.get_job_status()
230
231        if latest_status in FAILED_STATUSES:
232            raise AirbyteConnectionSyncError(
233                workspace=self.workspace,
234                connection_id=self.connection.connection_id,
235                job_id=self.job_id,
236                job_status=self.get_job_status(),
237            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
239    def wait_for_completion(
240        self,
241        *,
242        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
243        raise_timeout: bool = True,
244        raise_failure: bool = False,
245    ) -> JobStatusEnum:
246        """Wait for a job to finish running."""
247        start_time = time.time()
248        while True:
249            latest_status = self.get_job_status()
250            if latest_status in FINAL_STATUSES:
251                if raise_failure:
252                    # No-op if the job succeeded or is still running:
253                    self.raise_failure_status()
254
255                return latest_status
256
257            if time.time() - start_time > wait_timeout:
258                if raise_timeout:
259                    raise AirbyteConnectionSyncTimeoutError(
260                        workspace=self.workspace,
261                        connection_id=self.connection.connection_id,
262                        job_id=self.job_id,
263                        job_status=latest_status,
264                        timeout=wait_timeout,
265                    )
266
267                return latest_status  # This will be a non-final status
268
269            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
271    def get_sql_cache(self) -> CacheBase:
272        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
273        if self._cache:
274            return self._cache
275
276        destination_configuration = self._get_destination_configuration()
277        self._cache = destination_to_cache(destination_configuration=destination_configuration)
278        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
280    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
281        """Return a SQL Engine for querying a SQL-based destination."""
282        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
284    def get_sql_table_name(self, stream_name: str) -> str:
285        """Return the SQL table name of the named stream."""
286        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
288    def get_sql_table(
289        self,
290        stream_name: str,
291    ) -> sqlalchemy.Table:
292        """Return a SQLAlchemy table object for the named stream."""
293        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
295    def get_dataset(self, stream_name: str) -> CachedDataset:
296        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
297
298        This can be used to read and analyze the data in a SQL-based destination.
299
300        TODO: In a future iteration, we can consider providing stream configuration information
301              (catalog information) to the `CachedDataset` object via the "Get stream properties"
302              API: https://reference.airbyte.com/reference/getstreamproperties
303        """
304        return CachedDataset(
305            self.get_sql_cache(),
306            stream_name=stream_name,
307            stream_configuration=False,  # Don't look for stream configuration in cache.
308        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
310    def get_sql_database_name(self) -> str:
311        """Return the SQL database name."""
312        cache = self.get_sql_cache()
313        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
315    def get_sql_schema_name(self) -> str:
316        """Return the SQL schema name."""
317        cache = self.get_sql_cache()
318        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
320    @property
321    def stream_names(self) -> list[str]:
322        """Return the set of stream names."""
323        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
325    @final
326    @property
327    def streams(
328        self,
329    ) -> _SyncResultStreams:
330        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
331
332        This is a convenience wrapper around the `stream_names`
333        property and `get_dataset()` method.
334        """
335        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans