airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.connections import CloudConnection
57from airbyte.cloud.constants import JobStatusEnum
58from airbyte.cloud.sync_results import SyncResult
59from airbyte.cloud.workspaces import CloudWorkspace
60
61
62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
63if TYPE_CHECKING:
64    # ruff: noqa: TC004
65    from airbyte.cloud import connections, constants, sync_results, workspaces
66
67
68__all__ = [
69    # Submodules
70    "workspaces",
71    "connections",
72    "constants",
73    "sync_results",
74    # Classes
75    "CloudWorkspace",
76    "CloudConnection",
77    "SyncResult",
78    # Enums
79    "JobStatusEnum",
80]
@dataclass
class CloudWorkspace:
 57@dataclass
 58class CloudWorkspace:
 59    """A remote workspace on the Airbyte Cloud.
 60
 61    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 62    instances, both OSS and Enterprise.
 63    """
 64
 65    workspace_id: str
 66    client_id: SecretString
 67    client_secret: SecretString
 68    api_root: str = api_util.CLOUD_API_ROOT
 69
 70    def __post_init__(self) -> None:
 71        """Ensure that the client ID and secret are handled securely."""
 72        self.client_id = SecretString(self.client_id)
 73        self.client_secret = SecretString(self.client_secret)
 74
 75    @property
 76    def workspace_url(self) -> str | None:
 77        """The web URL of the workspace."""
 78        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 79
 80    # Test connection and creds
 81
 82    def connect(self) -> None:
 83        """Check that the workspace is reachable and raise an exception otherwise.
 84
 85        Note: It is not necessary to call this method before calling other operations. It
 86              serves primarily as a simple check to ensure that the workspace is reachable
 87              and credentials are correct.
 88        """
 89        _ = api_util.get_workspace(
 90            api_root=self.api_root,
 91            workspace_id=self.workspace_id,
 92            client_id=self.client_id,
 93            client_secret=self.client_secret,
 94        )
 95        print(f"Successfully connected to workspace: {self.workspace_url}")
 96
 97    # Get sources, destinations, and connections
 98
 99    def get_connection(
100        self,
101        connection_id: str,
102    ) -> CloudConnection:
103        """Get a connection by ID.
104
105        This method does not fetch data from the API. It returns a `CloudConnection` object,
106        which will be loaded lazily as needed.
107        """
108        return CloudConnection(
109            workspace=self,
110            connection_id=connection_id,
111        )
112
113    def get_source(
114        self,
115        source_id: str,
116    ) -> CloudSource:
117        """Get a source by ID.
118
119        This method does not fetch data from the API. It returns a `CloudSource` object,
120        which will be loaded lazily as needed.
121        """
122        return CloudSource(
123            workspace=self,
124            connector_id=source_id,
125        )
126
127    def get_destination(
128        self,
129        destination_id: str,
130    ) -> CloudDestination:
131        """Get a destination by ID.
132
133        This method does not fetch data from the API. It returns a `CloudDestination` object,
134        which will be loaded lazily as needed.
135        """
136        return CloudDestination(
137            workspace=self,
138            connector_id=destination_id,
139        )
140
141    # Deploy sources and destinations
142
143    def deploy_source(
144        self,
145        name: str,
146        source: Source,
147        *,
148        unique: bool = True,
149        random_name_suffix: bool = False,
150    ) -> CloudSource:
151        """Deploy a source to the workspace.
152
153        Returns the newly deployed source.
154
155        Args:
156            name: The name to use when deploying.
157            source: The source object to deploy.
158            unique: Whether to require a unique name. If `True`, duplicate names
159                are not allowed. Defaults to `True`.
160            random_name_suffix: Whether to append a random suffix to the name.
161        """
162        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
163        source_config_dict["sourceType"] = source.name.replace("source-", "")
164
165        if random_name_suffix:
166            name += f" (ID: {text_util.generate_random_suffix()})"
167
168        if unique:
169            existing = self.list_sources(name=name)
170            if existing:
171                raise exc.AirbyteDuplicateResourcesError(
172                    resource_type="source",
173                    resource_name=name,
174                )
175
176        deployed_source = api_util.create_source(
177            name=name,
178            api_root=self.api_root,
179            workspace_id=self.workspace_id,
180            config=source_config_dict,
181            client_id=self.client_id,
182            client_secret=self.client_secret,
183        )
184        return CloudSource(
185            workspace=self,
186            connector_id=deployed_source.source_id,
187        )
188
189    def deploy_destination(
190        self,
191        name: str,
192        destination: Destination | dict[str, Any],
193        *,
194        unique: bool = True,
195        random_name_suffix: bool = False,
196    ) -> CloudDestination:
197        """Deploy a destination to the workspace.
198
199        Returns the newly deployed destination ID.
200
201        Args:
202            name: The name to use when deploying.
203            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
204                dictionary of configuration values.
205            unique: Whether to require a unique name. If `True`, duplicate names
206                are not allowed. Defaults to `True`.
207            random_name_suffix: Whether to append a random suffix to the name.
208        """
209        if isinstance(destination, Destination):
210            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
211            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
212            # raise ValueError(destination_conf_dict)
213        else:
214            destination_conf_dict = destination.copy()
215            if "destinationType" not in destination_conf_dict:
216                raise exc.PyAirbyteInputError(
217                    message="Missing `destinationType` in configuration dictionary.",
218                )
219
220        if random_name_suffix:
221            name += f" (ID: {text_util.generate_random_suffix()})"
222
223        if unique:
224            existing = self.list_destinations(name=name)
225            if existing:
226                raise exc.AirbyteDuplicateResourcesError(
227                    resource_type="destination",
228                    resource_name=name,
229                )
230
231        deployed_destination = api_util.create_destination(
232            name=name,
233            api_root=self.api_root,
234            workspace_id=self.workspace_id,
235            config=destination_conf_dict,  # Wants a dataclass but accepts dict
236            client_id=self.client_id,
237            client_secret=self.client_secret,
238        )
239        return CloudDestination(
240            workspace=self,
241            connector_id=deployed_destination.destination_id,
242        )
243
244    def permanently_delete_source(
245        self,
246        source: str | CloudSource,
247    ) -> None:
248        """Delete a source from the workspace.
249
250        You can pass either the source ID `str` or a deployed `Source` object.
251        """
252        if not isinstance(source, (str, CloudSource)):
253            raise exc.PyAirbyteInputError(
254                message="Invalid source type.",
255                input_value=type(source).__name__,
256            )
257
258        api_util.delete_source(
259            source_id=source.connector_id if isinstance(source, CloudSource) else source,
260            api_root=self.api_root,
261            client_id=self.client_id,
262            client_secret=self.client_secret,
263        )
264
265    # Deploy and delete destinations
266
267    def permanently_delete_destination(
268        self,
269        destination: str | CloudDestination,
270    ) -> None:
271        """Delete a deployed destination from the workspace.
272
273        You can pass either the `Cache` class or the deployed destination ID as a `str`.
274        """
275        if not isinstance(destination, (str, CloudDestination)):
276            raise exc.PyAirbyteInputError(
277                message="Invalid destination type.",
278                input_value=type(destination).__name__,
279            )
280
281        api_util.delete_destination(
282            destination_id=(
283                destination if isinstance(destination, str) else destination.destination_id
284            ),
285            api_root=self.api_root,
286            client_id=self.client_id,
287            client_secret=self.client_secret,
288        )
289
290    # Deploy and delete connections
291
292    def deploy_connection(
293        self,
294        connection_name: str,
295        *,
296        source: CloudSource | str,
297        selected_streams: list[str],
298        destination: CloudDestination | str,
299        table_prefix: str | None = None,
300    ) -> CloudConnection:
301        """Create a new connection between an already deployed source and destination.
302
303        Returns the newly deployed connection object.
304
305        Args:
306            connection_name: The name of the connection.
307            source: The deployed source. You can pass a source ID or a CloudSource object.
308            destination: The deployed destination. You can pass a destination ID or a
309                CloudDestination object.
310            table_prefix: Optional. The table prefix to use when syncing to the destination.
311            selected_streams: The selected stream names to sync within the connection.
312        """
313        if not selected_streams:
314            raise exc.PyAirbyteInputError(
315                guidance="You must provide `selected_streams` when creating a connection."
316            )
317
318        source_id: str = source if isinstance(source, str) else source.connector_id
319        destination_id: str = (
320            destination if isinstance(destination, str) else destination.connector_id
321        )
322
323        deployed_connection = api_util.create_connection(
324            name=connection_name,
325            source_id=source_id,
326            destination_id=destination_id,
327            api_root=self.api_root,
328            workspace_id=self.workspace_id,
329            selected_stream_names=selected_streams,
330            prefix=table_prefix or "",
331            client_id=self.client_id,
332            client_secret=self.client_secret,
333        )
334
335        return CloudConnection(
336            workspace=self,
337            connection_id=deployed_connection.connection_id,
338            source=deployed_connection.source_id,
339            destination=deployed_connection.destination_id,
340        )
341
342    def permanently_delete_connection(
343        self,
344        connection: str | CloudConnection,
345        *,
346        cascade_delete_source: bool = False,
347        cascade_delete_destination: bool = False,
348    ) -> None:
349        """Delete a deployed connection from the workspace."""
350        if connection is None:
351            raise ValueError("No connection ID provided.")
352
353        if isinstance(connection, str):
354            connection = CloudConnection(
355                workspace=self,
356                connection_id=connection,
357            )
358
359        api_util.delete_connection(
360            connection_id=connection.connection_id,
361            api_root=self.api_root,
362            workspace_id=self.workspace_id,
363            client_id=self.client_id,
364            client_secret=self.client_secret,
365        )
366
367        if cascade_delete_source:
368            self.permanently_delete_source(source=connection.source_id)
369        if cascade_delete_destination:
370            self.permanently_delete_destination(destination=connection.destination_id)
371
372    # List sources, destinations, and connections
373
374    def list_connections(
375        self,
376        name: str | None = None,
377        *,
378        name_filter: Callable | None = None,
379    ) -> list[CloudConnection]:
380        """List connections by name in the workspace.
381
382        TODO: Add pagination support
383        """
384        connections = api_util.list_connections(
385            api_root=self.api_root,
386            workspace_id=self.workspace_id,
387            name=name,
388            name_filter=name_filter,
389            client_id=self.client_id,
390            client_secret=self.client_secret,
391        )
392        return [
393            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
394                workspace=self,
395                connection_response=connection,
396            )
397            for connection in connections
398            if name is None or connection.name == name
399        ]
400
401    def list_sources(
402        self,
403        name: str | None = None,
404        *,
405        name_filter: Callable | None = None,
406    ) -> list[CloudSource]:
407        """List all sources in the workspace.
408
409        TODO: Add pagination support
410        """
411        sources = api_util.list_sources(
412            api_root=self.api_root,
413            workspace_id=self.workspace_id,
414            name=name,
415            name_filter=name_filter,
416            client_id=self.client_id,
417            client_secret=self.client_secret,
418        )
419        return [
420            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
421                workspace=self,
422                source_response=source,
423            )
424            for source in sources
425            if name is None or source.name == name
426        ]
427
428    def list_destinations(
429        self,
430        name: str | None = None,
431        *,
432        name_filter: Callable | None = None,
433    ) -> list[CloudDestination]:
434        """List all destinations in the workspace.
435
436        TODO: Add pagination support
437        """
438        destinations = api_util.list_destinations(
439            api_root=self.api_root,
440            workspace_id=self.workspace_id,
441            name=name,
442            name_filter=name_filter,
443            client_id=self.client_id,
444            client_secret=self.client_secret,
445        )
446        return [
447            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
448                workspace=self,
449                destination_response=destination,
450            )
451            for destination in destinations
452            if name is None or destination.name == name
453        ]

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
75    @property
76    def workspace_url(self) -> str | None:
77        """The web URL of the workspace."""
78        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def connect(self) -> None:
82    def connect(self) -> None:
83        """Check that the workspace is reachable and raise an exception otherwise.
84
85        Note: It is not necessary to call this method before calling other operations. It
86              serves primarily as a simple check to ensure that the workspace is reachable
87              and credentials are correct.
88        """
89        _ = api_util.get_workspace(
90            api_root=self.api_root,
91            workspace_id=self.workspace_id,
92            client_id=self.client_id,
93            client_secret=self.client_secret,
94        )
95        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
 99    def get_connection(
100        self,
101        connection_id: str,
102    ) -> CloudConnection:
103        """Get a connection by ID.
104
105        This method does not fetch data from the API. It returns a `CloudConnection` object,
106        which will be loaded lazily as needed.
107        """
108        return CloudConnection(
109            workspace=self,
110            connection_id=connection_id,
111        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
113    def get_source(
114        self,
115        source_id: str,
116    ) -> CloudSource:
117        """Get a source by ID.
118
119        This method does not fetch data from the API. It returns a `CloudSource` object,
120        which will be loaded lazily as needed.
121        """
122        return CloudSource(
123            workspace=self,
124            connector_id=source_id,
125        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
127    def get_destination(
128        self,
129        destination_id: str,
130    ) -> CloudDestination:
131        """Get a destination by ID.
132
133        This method does not fetch data from the API. It returns a `CloudDestination` object,
134        which will be loaded lazily as needed.
135        """
136        return CloudDestination(
137            workspace=self,
138            connector_id=destination_id,
139        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
143    def deploy_source(
144        self,
145        name: str,
146        source: Source,
147        *,
148        unique: bool = True,
149        random_name_suffix: bool = False,
150    ) -> CloudSource:
151        """Deploy a source to the workspace.
152
153        Returns the newly deployed source.
154
155        Args:
156            name: The name to use when deploying.
157            source: The source object to deploy.
158            unique: Whether to require a unique name. If `True`, duplicate names
159                are not allowed. Defaults to `True`.
160            random_name_suffix: Whether to append a random suffix to the name.
161        """
162        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
163        source_config_dict["sourceType"] = source.name.replace("source-", "")
164
165        if random_name_suffix:
166            name += f" (ID: {text_util.generate_random_suffix()})"
167
168        if unique:
169            existing = self.list_sources(name=name)
170            if existing:
171                raise exc.AirbyteDuplicateResourcesError(
172                    resource_type="source",
173                    resource_name=name,
174                )
175
176        deployed_source = api_util.create_source(
177            name=name,
178            api_root=self.api_root,
179            workspace_id=self.workspace_id,
180            config=source_config_dict,
181            client_id=self.client_id,
182            client_secret=self.client_secret,
183        )
184        return CloudSource(
185            workspace=self,
186            connector_id=deployed_source.source_id,
187        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
189    def deploy_destination(
190        self,
191        name: str,
192        destination: Destination | dict[str, Any],
193        *,
194        unique: bool = True,
195        random_name_suffix: bool = False,
196    ) -> CloudDestination:
197        """Deploy a destination to the workspace.
198
199        Returns the newly deployed destination ID.
200
201        Args:
202            name: The name to use when deploying.
203            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
204                dictionary of configuration values.
205            unique: Whether to require a unique name. If `True`, duplicate names
206                are not allowed. Defaults to `True`.
207            random_name_suffix: Whether to append a random suffix to the name.
208        """
209        if isinstance(destination, Destination):
210            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
211            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
212            # raise ValueError(destination_conf_dict)
213        else:
214            destination_conf_dict = destination.copy()
215            if "destinationType" not in destination_conf_dict:
216                raise exc.PyAirbyteInputError(
217                    message="Missing `destinationType` in configuration dictionary.",
218                )
219
220        if random_name_suffix:
221            name += f" (ID: {text_util.generate_random_suffix()})"
222
223        if unique:
224            existing = self.list_destinations(name=name)
225            if existing:
226                raise exc.AirbyteDuplicateResourcesError(
227                    resource_type="destination",
228                    resource_name=name,
229                )
230
231        deployed_destination = api_util.create_destination(
232            name=name,
233            api_root=self.api_root,
234            workspace_id=self.workspace_id,
235            config=destination_conf_dict,  # Wants a dataclass but accepts dict
236            client_id=self.client_id,
237            client_secret=self.client_secret,
238        )
239        return CloudDestination(
240            workspace=self,
241            connector_id=deployed_destination.destination_id,
242        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
244    def permanently_delete_source(
245        self,
246        source: str | CloudSource,
247    ) -> None:
248        """Delete a source from the workspace.
249
250        You can pass either the source ID `str` or a deployed `Source` object.
251        """
252        if not isinstance(source, (str, CloudSource)):
253            raise exc.PyAirbyteInputError(
254                message="Invalid source type.",
255                input_value=type(source).__name__,
256            )
257
258        api_util.delete_source(
259            source_id=source.connector_id if isinstance(source, CloudSource) else source,
260            api_root=self.api_root,
261            client_id=self.client_id,
262            client_secret=self.client_secret,
263        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
267    def permanently_delete_destination(
268        self,
269        destination: str | CloudDestination,
270    ) -> None:
271        """Delete a deployed destination from the workspace.
272
273        You can pass either the `Cache` class or the deployed destination ID as a `str`.
274        """
275        if not isinstance(destination, (str, CloudDestination)):
276            raise exc.PyAirbyteInputError(
277                message="Invalid destination type.",
278                input_value=type(destination).__name__,
279            )
280
281        api_util.delete_destination(
282            destination_id=(
283                destination if isinstance(destination, str) else destination.destination_id
284            ),
285            api_root=self.api_root,
286            client_id=self.client_id,
287            client_secret=self.client_secret,
288        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
292    def deploy_connection(
293        self,
294        connection_name: str,
295        *,
296        source: CloudSource | str,
297        selected_streams: list[str],
298        destination: CloudDestination | str,
299        table_prefix: str | None = None,
300    ) -> CloudConnection:
301        """Create a new connection between an already deployed source and destination.
302
303        Returns the newly deployed connection object.
304
305        Args:
306            connection_name: The name of the connection.
307            source: The deployed source. You can pass a source ID or a CloudSource object.
308            destination: The deployed destination. You can pass a destination ID or a
309                CloudDestination object.
310            table_prefix: Optional. The table prefix to use when syncing to the destination.
311            selected_streams: The selected stream names to sync within the connection.
312        """
313        if not selected_streams:
314            raise exc.PyAirbyteInputError(
315                guidance="You must provide `selected_streams` when creating a connection."
316            )
317
318        source_id: str = source if isinstance(source, str) else source.connector_id
319        destination_id: str = (
320            destination if isinstance(destination, str) else destination.connector_id
321        )
322
323        deployed_connection = api_util.create_connection(
324            name=connection_name,
325            source_id=source_id,
326            destination_id=destination_id,
327            api_root=self.api_root,
328            workspace_id=self.workspace_id,
329            selected_stream_names=selected_streams,
330            prefix=table_prefix or "",
331            client_id=self.client_id,
332            client_secret=self.client_secret,
333        )
334
335        return CloudConnection(
336            workspace=self,
337            connection_id=deployed_connection.connection_id,
338            source=deployed_connection.source_id,
339            destination=deployed_connection.destination_id,
340        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
342    def permanently_delete_connection(
343        self,
344        connection: str | CloudConnection,
345        *,
346        cascade_delete_source: bool = False,
347        cascade_delete_destination: bool = False,
348    ) -> None:
349        """Delete a deployed connection from the workspace."""
350        if connection is None:
351            raise ValueError("No connection ID provided.")
352
353        if isinstance(connection, str):
354            connection = CloudConnection(
355                workspace=self,
356                connection_id=connection,
357            )
358
359        api_util.delete_connection(
360            connection_id=connection.connection_id,
361            api_root=self.api_root,
362            workspace_id=self.workspace_id,
363            client_id=self.client_id,
364            client_secret=self.client_secret,
365        )
366
367        if cascade_delete_source:
368            self.permanently_delete_source(source=connection.source_id)
369        if cascade_delete_destination:
370            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
374    def list_connections(
375        self,
376        name: str | None = None,
377        *,
378        name_filter: Callable | None = None,
379    ) -> list[CloudConnection]:
380        """List connections by name in the workspace.
381
382        TODO: Add pagination support
383        """
384        connections = api_util.list_connections(
385            api_root=self.api_root,
386            workspace_id=self.workspace_id,
387            name=name,
388            name_filter=name_filter,
389            client_id=self.client_id,
390            client_secret=self.client_secret,
391        )
392        return [
393            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
394                workspace=self,
395                connection_response=connection,
396            )
397            for connection in connections
398            if name is None or connection.name == name
399        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
401    def list_sources(
402        self,
403        name: str | None = None,
404        *,
405        name_filter: Callable | None = None,
406    ) -> list[CloudSource]:
407        """List all sources in the workspace.
408
409        TODO: Add pagination support
410        """
411        sources = api_util.list_sources(
412            api_root=self.api_root,
413            workspace_id=self.workspace_id,
414            name=name,
415            name_filter=name_filter,
416            client_id=self.client_id,
417            client_secret=self.client_secret,
418        )
419        return [
420            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
421                workspace=self,
422                source_response=source,
423            )
424            for source in sources
425            if name is None or source.name == name
426        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
428    def list_destinations(
429        self,
430        name: str | None = None,
431        *,
432        name_filter: Callable | None = None,
433    ) -> list[CloudDestination]:
434        """List all destinations in the workspace.
435
436        TODO: Add pagination support
437        """
438        destinations = api_util.list_destinations(
439            api_root=self.api_root,
440            workspace_id=self.workspace_id,
441            name=name,
442            name_filter=name_filter,
443            client_id=self.client_id,
444            client_secret=self.client_secret,
445        )
446        return [
447            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
448                workspace=self,
449                destination_response=destination,
450            )
451            for destination in destinations
452            if name is None or destination.name == name
453        ]

List all destinations in the workspace.

TODO: Add pagination support

class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    @classmethod
 69    def _from_connection_response(
 70        cls,
 71        workspace: CloudWorkspace,
 72        connection_response: ConnectionResponse,
 73    ) -> CloudConnection:
 74        """Create a CloudConnection from a ConnectionResponse."""
 75        result = cls(
 76            workspace=workspace,
 77            connection_id=connection_response.connection_id,
 78            source=connection_response.source_id,
 79            destination=connection_response.destination_id,
 80        )
 81        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
 82        return result
 83
 84    # Properties
 85
 86    @property
 87    def name(self) -> str | None:
 88        """Get the display name of the connection, if available.
 89
 90        E.g. "My Postgres to Snowflake", not the connection ID.
 91        """
 92        if not self._connection_info:
 93            self._connection_info = self._fetch_connection_info()
 94
 95        return self._connection_info.name
 96
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return cast("str", self._source_id)
107
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object
119
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.source_id
128
129        return cast("str", self._destination_id)
130
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object
142
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]
150
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""
158
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
163
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"
168
169    # Run Sync
170
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result
199
200    def __repr__(self) -> str:
201        """String representation of the connection."""
202        return (
203            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
204            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
205        )
206
207    # Logs
208
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]
232
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )
259
260    # Deletions
261
262    def permanently_delete(
263        self,
264        *,
265        cascade_delete_source: bool = False,
266        cascade_delete_destination: bool = False,
267    ) -> None:
268        """Delete the connection.
269
270        Args:
271            cascade_delete_source: Whether to also delete the source.
272            cascade_delete_destination: Whether to also delete the destination.
273        """
274        self.workspace.permanently_delete_connection(self)
275
276        if cascade_delete_source:
277            self.workspace.permanently_delete_source(self.source_id)
278
279        if cascade_delete_destination:
280            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

name: str | None
86    @property
87    def name(self) -> str | None:
88        """Get the display name of the connection, if available.
89
90        E.g. "My Postgres to Snowflake", not the connection ID.
91        """
92        if not self._connection_info:
93            self._connection_info = self._fetch_connection_info()
94
95        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return cast("str", self._source_id)

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object

Get the source object.

destination_id: str
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.source_id
128
129        return cast("str", self._destination_id)

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
262    def permanently_delete(
263        self,
264        *,
265        cascade_delete_source: bool = False,
266        cascade_delete_destination: bool = False,
267    ) -> None:
268        """Delete the connection.
269
270        Args:
271            cascade_delete_source: Whether to also delete the source.
272            cascade_delete_destination: Whether to also delete the destination.
273        """
274        self.workspace.permanently_delete_connection(self)
275
276        if cascade_delete_source:
277            self.workspace.permanently_delete_source(self.source_id)
278
279        if cascade_delete_destination:
280            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class SyncResult:
216@dataclass
217class SyncResult:
218    """The result of a sync operation.
219
220    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
221    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
222    """
223
224    workspace: CloudWorkspace
225    connection: CloudConnection
226    job_id: int
227    table_name_prefix: str = ""
228    table_name_suffix: str = ""
229    _latest_job_info: JobResponse | None = None
230    _connection_response: ConnectionResponse | None = None
231    _cache: CacheBase | None = None
232    _job_with_attempts_info: dict[str, Any] | None = None
233
234    @property
235    def job_url(self) -> str:
236        """Return the URL of the sync job.
237
238        Note: This currently returns the connection's job history URL, as there is no direct URL
239        to a specific job in the Airbyte Cloud web app.
240
241        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
242              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
243        """
244        return f"{self.connection.job_history_url}"
245
246    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
247        """Return connection info for the sync job."""
248        if self._connection_response and not force_refresh:
249            return self._connection_response
250
251        self._connection_response = api_util.get_connection(
252            workspace_id=self.workspace.workspace_id,
253            api_root=self.workspace.api_root,
254            connection_id=self.connection.connection_id,
255            client_id=self.workspace.client_id,
256            client_secret=self.workspace.client_secret,
257        )
258        return self._connection_response
259
260    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
261        """Return the destination configuration for the sync job."""
262        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
263        destination_response = api_util.get_destination(
264            destination_id=connection_info.destination_id,
265            api_root=self.workspace.api_root,
266            client_id=self.workspace.client_id,
267            client_secret=self.workspace.client_secret,
268        )
269        return asdict(destination_response.configuration)
270
271    def is_job_complete(self) -> bool:
272        """Check if the sync job is complete."""
273        return self.get_job_status() in FINAL_STATUSES
274
275    def get_job_status(self) -> JobStatusEnum:
276        """Check if the sync job is still running."""
277        return self._fetch_latest_job_info().status
278
279    def _fetch_latest_job_info(self) -> JobResponse:
280        """Return the job info for the sync job."""
281        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
282            return self._latest_job_info
283
284        self._latest_job_info = api_util.get_job_info(
285            job_id=self.job_id,
286            api_root=self.workspace.api_root,
287            client_id=self.workspace.client_id,
288            client_secret=self.workspace.client_secret,
289        )
290        return self._latest_job_info
291
292    @property
293    def bytes_synced(self) -> int:
294        """Return the number of records processed."""
295        return self._fetch_latest_job_info().bytes_synced or 0
296
297    @property
298    def records_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().rows_synced or 0
301
302    @property
303    def start_time(self) -> datetime:
304        """Return the start time of the sync job in UTC."""
305        try:
306            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
307        except (ValueError, TypeError) as e:
308            if "Invalid isoformat string" in str(e):
309                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
310                    api_root=self.workspace.api_root,
311                    path="/jobs/get",
312                    json={"id": self.job_id},
313                    client_id=self.workspace.client_id,
314                    client_secret=self.workspace.client_secret,
315                )
316                raw_start_time = job_info_raw.get("startTime")
317                if raw_start_time:
318                    return ab_datetime_parse(raw_start_time)
319            raise
320
321    def _fetch_job_with_attempts(self) -> dict[str, Any]:
322        """Fetch job info with attempts from Config API using lazy loading pattern."""
323        if self._job_with_attempts_info is not None:
324            return self._job_with_attempts_info
325
326        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
327            api_root=self.workspace.api_root,
328            path="/jobs/get",
329            json={
330                "id": self.job_id,
331            },
332            client_id=self.workspace.client_id,
333            client_secret=self.workspace.client_secret,
334        )
335        return self._job_with_attempts_info
336
337    def get_attempts(self) -> list[SyncAttempt]:
338        """Return a list of attempts for this sync job."""
339        job_with_attempts = self._fetch_job_with_attempts()
340        attempts_data = job_with_attempts.get("attempts", [])
341
342        return [
343            SyncAttempt(
344                workspace=self.workspace,
345                connection=self.connection,
346                job_id=self.job_id,
347                attempt_number=i,
348                _attempt_data=attempt_data,
349            )
350            for i, attempt_data in enumerate(attempts_data, start=0)
351        ]
352
353    def raise_failure_status(
354        self,
355        *,
356        refresh_status: bool = False,
357    ) -> None:
358        """Raise an exception if the sync job failed.
359
360        By default, this method will use the latest status available. If you want to refresh the
361        status before checking for failure, set `refresh_status=True`. If the job has failed, this
362        method will raise a `AirbyteConnectionSyncError`.
363
364        Otherwise, do nothing.
365        """
366        if not refresh_status and self._latest_job_info:
367            latest_status = self._latest_job_info.status
368        else:
369            latest_status = self.get_job_status()
370
371        if latest_status in FAILED_STATUSES:
372            raise AirbyteConnectionSyncError(
373                workspace=self.workspace,
374                connection_id=self.connection.connection_id,
375                job_id=self.job_id,
376                job_status=self.get_job_status(),
377            )
378
379    def wait_for_completion(
380        self,
381        *,
382        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
383        raise_timeout: bool = True,
384        raise_failure: bool = False,
385    ) -> JobStatusEnum:
386        """Wait for a job to finish running."""
387        start_time = time.time()
388        while True:
389            latest_status = self.get_job_status()
390            if latest_status in FINAL_STATUSES:
391                if raise_failure:
392                    # No-op if the job succeeded or is still running:
393                    self.raise_failure_status()
394
395                return latest_status
396
397            if time.time() - start_time > wait_timeout:
398                if raise_timeout:
399                    raise AirbyteConnectionSyncTimeoutError(
400                        workspace=self.workspace,
401                        connection_id=self.connection.connection_id,
402                        job_id=self.job_id,
403                        job_status=latest_status,
404                        timeout=wait_timeout,
405                    )
406
407                return latest_status  # This will be a non-final status
408
409            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
410
411    def get_sql_cache(self) -> CacheBase:
412        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
413        if self._cache:
414            return self._cache
415
416        destination_configuration = self._get_destination_configuration()
417        self._cache = destination_to_cache(destination_configuration=destination_configuration)
418        return self._cache
419
420    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
421        """Return a SQL Engine for querying a SQL-based destination."""
422        return self.get_sql_cache().get_sql_engine()
423
424    def get_sql_table_name(self, stream_name: str) -> str:
425        """Return the SQL table name of the named stream."""
426        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
427
428    def get_sql_table(
429        self,
430        stream_name: str,
431    ) -> sqlalchemy.Table:
432        """Return a SQLAlchemy table object for the named stream."""
433        return self.get_sql_cache().processor.get_sql_table(stream_name)
434
435    def get_dataset(self, stream_name: str) -> CachedDataset:
436        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
437
438        This can be used to read and analyze the data in a SQL-based destination.
439
440        TODO: In a future iteration, we can consider providing stream configuration information
441              (catalog information) to the `CachedDataset` object via the "Get stream properties"
442              API: https://reference.airbyte.com/reference/getstreamproperties
443        """
444        return CachedDataset(
445            self.get_sql_cache(),
446            stream_name=stream_name,
447            stream_configuration=False,  # Don't look for stream configuration in cache.
448        )
449
450    def get_sql_database_name(self) -> str:
451        """Return the SQL database name."""
452        cache = self.get_sql_cache()
453        return cache.get_database_name()
454
455    def get_sql_schema_name(self) -> str:
456        """Return the SQL schema name."""
457        cache = self.get_sql_cache()
458        return cache.schema_name
459
460    @property
461    def stream_names(self) -> list[str]:
462        """Return the set of stream names."""
463        return self.connection.stream_names
464
465    @final
466    @property
467    def streams(
468        self,
469    ) -> _SyncResultStreams:
470        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
471
472        This is a convenience wrapper around the `stream_names`
473        property and `get_dataset()` method.
474        """
475        return self._SyncResultStreams(self)
476
477    class _SyncResultStreams(Mapping[str, CachedDataset]):
478        """A mapping of stream names to cached datasets."""
479
480        def __init__(
481            self,
482            parent: SyncResult,
483            /,
484        ) -> None:
485            self.parent: SyncResult = parent
486
487        def __getitem__(self, key: str) -> CachedDataset:
488            return self.parent.get_dataset(stream_name=key)
489
490        def __iter__(self) -> Iterator[str]:
491            return iter(self.parent.stream_names)
492
493        def __len__(self) -> int:
494            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
234    @property
235    def job_url(self) -> str:
236        """Return the URL of the sync job.
237
238        Note: This currently returns the connection's job history URL, as there is no direct URL
239        to a specific job in the Airbyte Cloud web app.
240
241        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
242              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
243        """
244        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
271    def is_job_complete(self) -> bool:
272        """Check if the sync job is complete."""
273        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
275    def get_job_status(self) -> JobStatusEnum:
276        """Check if the sync job is still running."""
277        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
292    @property
293    def bytes_synced(self) -> int:
294        """Return the number of records processed."""
295        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
297    @property
298    def records_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
302    @property
303    def start_time(self) -> datetime:
304        """Return the start time of the sync job in UTC."""
305        try:
306            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
307        except (ValueError, TypeError) as e:
308            if "Invalid isoformat string" in str(e):
309                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
310                    api_root=self.workspace.api_root,
311                    path="/jobs/get",
312                    json={"id": self.job_id},
313                    client_id=self.workspace.client_id,
314                    client_secret=self.workspace.client_secret,
315                )
316                raw_start_time = job_info_raw.get("startTime")
317                if raw_start_time:
318                    return ab_datetime_parse(raw_start_time)
319            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
337    def get_attempts(self) -> list[SyncAttempt]:
338        """Return a list of attempts for this sync job."""
339        job_with_attempts = self._fetch_job_with_attempts()
340        attempts_data = job_with_attempts.get("attempts", [])
341
342        return [
343            SyncAttempt(
344                workspace=self.workspace,
345                connection=self.connection,
346                job_id=self.job_id,
347                attempt_number=i,
348                _attempt_data=attempt_data,
349            )
350            for i, attempt_data in enumerate(attempts_data, start=0)
351        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
353    def raise_failure_status(
354        self,
355        *,
356        refresh_status: bool = False,
357    ) -> None:
358        """Raise an exception if the sync job failed.
359
360        By default, this method will use the latest status available. If you want to refresh the
361        status before checking for failure, set `refresh_status=True`. If the job has failed, this
362        method will raise a `AirbyteConnectionSyncError`.
363
364        Otherwise, do nothing.
365        """
366        if not refresh_status and self._latest_job_info:
367            latest_status = self._latest_job_info.status
368        else:
369            latest_status = self.get_job_status()
370
371        if latest_status in FAILED_STATUSES:
372            raise AirbyteConnectionSyncError(
373                workspace=self.workspace,
374                connection_id=self.connection.connection_id,
375                job_id=self.job_id,
376                job_status=self.get_job_status(),
377            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
379    def wait_for_completion(
380        self,
381        *,
382        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
383        raise_timeout: bool = True,
384        raise_failure: bool = False,
385    ) -> JobStatusEnum:
386        """Wait for a job to finish running."""
387        start_time = time.time()
388        while True:
389            latest_status = self.get_job_status()
390            if latest_status in FINAL_STATUSES:
391                if raise_failure:
392                    # No-op if the job succeeded or is still running:
393                    self.raise_failure_status()
394
395                return latest_status
396
397            if time.time() - start_time > wait_timeout:
398                if raise_timeout:
399                    raise AirbyteConnectionSyncTimeoutError(
400                        workspace=self.workspace,
401                        connection_id=self.connection.connection_id,
402                        job_id=self.job_id,
403                        job_status=latest_status,
404                        timeout=wait_timeout,
405                    )
406
407                return latest_status  # This will be a non-final status
408
409            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
411    def get_sql_cache(self) -> CacheBase:
412        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
413        if self._cache:
414            return self._cache
415
416        destination_configuration = self._get_destination_configuration()
417        self._cache = destination_to_cache(destination_configuration=destination_configuration)
418        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
420    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
421        """Return a SQL Engine for querying a SQL-based destination."""
422        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
424    def get_sql_table_name(self, stream_name: str) -> str:
425        """Return the SQL table name of the named stream."""
426        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
428    def get_sql_table(
429        self,
430        stream_name: str,
431    ) -> sqlalchemy.Table:
432        """Return a SQLAlchemy table object for the named stream."""
433        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
435    def get_dataset(self, stream_name: str) -> CachedDataset:
436        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
437
438        This can be used to read and analyze the data in a SQL-based destination.
439
440        TODO: In a future iteration, we can consider providing stream configuration information
441              (catalog information) to the `CachedDataset` object via the "Get stream properties"
442              API: https://reference.airbyte.com/reference/getstreamproperties
443        """
444        return CachedDataset(
445            self.get_sql_cache(),
446            stream_name=stream_name,
447            stream_configuration=False,  # Don't look for stream configuration in cache.
448        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
450    def get_sql_database_name(self) -> str:
451        """Return the SQL database name."""
452        cache = self.get_sql_cache()
453        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
455    def get_sql_schema_name(self) -> str:
456        """Return the SQL schema name."""
457        cache = self.get_sql_cache()
458        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
460    @property
461    def stream_names(self) -> list[str]:
462        """Return the set of stream names."""
463        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
465    @final
466    @property
467    def streams(
468        self,
469    ) -> _SyncResultStreams:
470        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
471
472        This is a convenience wrapper around the `stream_names`
473        property and `get_dataset()` method.
474        """
475        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans