airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Sync Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50"""
51
52from __future__ import annotations
53
54from typing import TYPE_CHECKING
55
56from airbyte.cloud.connections import CloudConnection
57from airbyte.cloud.constants import JobStatusEnum
58from airbyte.cloud.sync_results import SyncResult
59from airbyte.cloud.workspaces import CloudWorkspace
60
61
62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
63if TYPE_CHECKING:
64    # ruff: noqa: TC004
65    from airbyte.cloud import connections, constants, sync_results, workspaces
66
67
68__all__ = [
69    # Submodules
70    "workspaces",
71    "connections",
72    "constants",
73    "sync_results",
74    # Classes
75    "CloudWorkspace",
76    "CloudConnection",
77    "SyncResult",
78    # Enums
79    "JobStatusEnum",
80]
@dataclass
class CloudWorkspace:
 57@dataclass
 58class CloudWorkspace:
 59    """A remote workspace on the Airbyte Cloud.
 60
 61    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 62    instances, both OSS and Enterprise.
 63    """
 64
 65    workspace_id: str
 66    client_id: SecretString
 67    client_secret: SecretString
 68    api_root: str = api_util.CLOUD_API_ROOT
 69
 70    def __post_init__(self) -> None:
 71        """Ensure that the client ID and secret are handled securely."""
 72        self.client_id = SecretString(self.client_id)
 73        self.client_secret = SecretString(self.client_secret)
 74
 75    @property
 76    def workspace_url(self) -> str | None:
 77        """The web URL of the workspace."""
 78        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
 79
 80    # Test connection and creds
 81
 82    def connect(self) -> None:
 83        """Check that the workspace is reachable and raise an exception otherwise.
 84
 85        Note: It is not necessary to call this method before calling other operations. It
 86              serves primarily as a simple check to ensure that the workspace is reachable
 87              and credentials are correct.
 88        """
 89        _ = api_util.get_workspace(
 90            api_root=self.api_root,
 91            workspace_id=self.workspace_id,
 92            client_id=self.client_id,
 93            client_secret=self.client_secret,
 94        )
 95        print(f"Successfully connected to workspace: {self.workspace_url}")
 96
 97    # Get sources, destinations, and connections
 98
 99    def get_connection(
100        self,
101        connection_id: str,
102    ) -> CloudConnection:
103        """Get a connection by ID.
104
105        This method does not fetch data from the API. It returns a `CloudConnection` object,
106        which will be loaded lazily as needed.
107        """
108        return CloudConnection(
109            workspace=self,
110            connection_id=connection_id,
111        )
112
113    def get_source(
114        self,
115        source_id: str,
116    ) -> CloudSource:
117        """Get a source by ID.
118
119        This method does not fetch data from the API. It returns a `CloudSource` object,
120        which will be loaded lazily as needed.
121        """
122        return CloudSource(
123            workspace=self,
124            connector_id=source_id,
125        )
126
127    def get_destination(
128        self,
129        destination_id: str,
130    ) -> CloudDestination:
131        """Get a destination by ID.
132
133        This method does not fetch data from the API. It returns a `CloudDestination` object,
134        which will be loaded lazily as needed.
135        """
136        return CloudDestination(
137            workspace=self,
138            connector_id=destination_id,
139        )
140
141    # Deploy sources and destinations
142
143    def deploy_source(
144        self,
145        name: str,
146        source: Source,
147        *,
148        unique: bool = True,
149        random_name_suffix: bool = False,
150    ) -> CloudSource:
151        """Deploy a source to the workspace.
152
153        Returns the newly deployed source.
154
155        Args:
156            name: The name to use when deploying.
157            source: The source object to deploy.
158            unique: Whether to require a unique name. If `True`, duplicate names
159                are not allowed. Defaults to `True`.
160            random_name_suffix: Whether to append a random suffix to the name.
161        """
162        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
163        source_config_dict["sourceType"] = source.name.replace("source-", "")
164
165        if random_name_suffix:
166            name += f" (ID: {text_util.generate_random_suffix()})"
167
168        if unique:
169            existing = self.list_sources(name=name)
170            if existing:
171                raise exc.AirbyteDuplicateResourcesError(
172                    resource_type="source",
173                    resource_name=name,
174                )
175
176        deployed_source = api_util.create_source(
177            name=name,
178            api_root=self.api_root,
179            workspace_id=self.workspace_id,
180            config=source_config_dict,
181            client_id=self.client_id,
182            client_secret=self.client_secret,
183        )
184        return CloudSource(
185            workspace=self,
186            connector_id=deployed_source.source_id,
187        )
188
189    def deploy_destination(
190        self,
191        name: str,
192        destination: Destination | dict[str, Any],
193        *,
194        unique: bool = True,
195        random_name_suffix: bool = False,
196    ) -> CloudDestination:
197        """Deploy a destination to the workspace.
198
199        Returns the newly deployed destination ID.
200
201        Args:
202            name: The name to use when deploying.
203            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
204                dictionary of configuration values.
205            unique: Whether to require a unique name. If `True`, duplicate names
206                are not allowed. Defaults to `True`.
207            random_name_suffix: Whether to append a random suffix to the name.
208        """
209        if isinstance(destination, Destination):
210            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
211            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
212            # raise ValueError(destination_conf_dict)
213        else:
214            destination_conf_dict = destination.copy()
215            if "destinationType" not in destination_conf_dict:
216                raise exc.PyAirbyteInputError(
217                    message="Missing `destinationType` in configuration dictionary.",
218                )
219
220        if random_name_suffix:
221            name += f" (ID: {text_util.generate_random_suffix()})"
222
223        if unique:
224            existing = self.list_destinations(name=name)
225            if existing:
226                raise exc.AirbyteDuplicateResourcesError(
227                    resource_type="destination",
228                    resource_name=name,
229                )
230
231        deployed_destination = api_util.create_destination(
232            name=name,
233            api_root=self.api_root,
234            workspace_id=self.workspace_id,
235            config=destination_conf_dict,  # Wants a dataclass but accepts dict
236            client_id=self.client_id,
237            client_secret=self.client_secret,
238        )
239        return CloudDestination(
240            workspace=self,
241            connector_id=deployed_destination.destination_id,
242        )
243
244    def permanently_delete_source(
245        self,
246        source: str | CloudSource,
247    ) -> None:
248        """Delete a source from the workspace.
249
250        You can pass either the source ID `str` or a deployed `Source` object.
251        """
252        if not isinstance(source, (str, CloudSource)):
253            raise exc.PyAirbyteInputError(
254                message="Invalid source type.",
255                input_value=type(source).__name__,
256            )
257
258        api_util.delete_source(
259            source_id=source.connector_id if isinstance(source, CloudSource) else source,
260            api_root=self.api_root,
261            client_id=self.client_id,
262            client_secret=self.client_secret,
263        )
264
265    # Deploy and delete destinations
266
267    def permanently_delete_destination(
268        self,
269        destination: str | CloudDestination,
270    ) -> None:
271        """Delete a deployed destination from the workspace.
272
273        You can pass either the `Cache` class or the deployed destination ID as a `str`.
274        """
275        if not isinstance(destination, (str, CloudDestination)):
276            raise exc.PyAirbyteInputError(
277                message="Invalid destination type.",
278                input_value=type(destination).__name__,
279            )
280
281        api_util.delete_destination(
282            destination_id=(
283                destination if isinstance(destination, str) else destination.destination_id
284            ),
285            api_root=self.api_root,
286            client_id=self.client_id,
287            client_secret=self.client_secret,
288        )
289
290    # Deploy and delete connections
291
292    def deploy_connection(
293        self,
294        connection_name: str,
295        *,
296        source: CloudSource | str,
297        selected_streams: list[str],
298        destination: CloudDestination | str,
299        table_prefix: str | None = None,
300    ) -> CloudConnection:
301        """Create a new connection between an already deployed source and destination.
302
303        Returns the newly deployed connection object.
304
305        Args:
306            connection_name: The name of the connection.
307            source: The deployed source. You can pass a source ID or a CloudSource object.
308            destination: The deployed destination. You can pass a destination ID or a
309                CloudDestination object.
310            table_prefix: Optional. The table prefix to use when syncing to the destination.
311            selected_streams: The selected stream names to sync within the connection.
312        """
313        if not selected_streams:
314            raise exc.PyAirbyteInputError(
315                guidance="You must provide `selected_streams` when creating a connection."
316            )
317
318        source_id: str = source if isinstance(source, str) else source.connector_id
319        destination_id: str = (
320            destination if isinstance(destination, str) else destination.connector_id
321        )
322
323        deployed_connection = api_util.create_connection(
324            name=connection_name,
325            source_id=source_id,
326            destination_id=destination_id,
327            api_root=self.api_root,
328            workspace_id=self.workspace_id,
329            selected_stream_names=selected_streams,
330            prefix=table_prefix or "",
331            client_id=self.client_id,
332            client_secret=self.client_secret,
333        )
334
335        return CloudConnection(
336            workspace=self,
337            connection_id=deployed_connection.connection_id,
338            source=deployed_connection.source_id,
339            destination=deployed_connection.destination_id,
340        )
341
342    def permanently_delete_connection(
343        self,
344        connection: str | CloudConnection,
345        *,
346        cascade_delete_source: bool = False,
347        cascade_delete_destination: bool = False,
348    ) -> None:
349        """Delete a deployed connection from the workspace."""
350        if connection is None:
351            raise ValueError("No connection ID provided.")
352
353        if isinstance(connection, str):
354            connection = CloudConnection(
355                workspace=self,
356                connection_id=connection,
357            )
358
359        api_util.delete_connection(
360            connection_id=connection.connection_id,
361            api_root=self.api_root,
362            workspace_id=self.workspace_id,
363            client_id=self.client_id,
364            client_secret=self.client_secret,
365        )
366
367        if cascade_delete_source:
368            self.permanently_delete_source(source=connection.source_id)
369        if cascade_delete_destination:
370            self.permanently_delete_destination(destination=connection.destination_id)
371
372    # List sources, destinations, and connections
373
374    def list_connections(
375        self,
376        name: str | None = None,
377        *,
378        name_filter: Callable | None = None,
379    ) -> list[CloudConnection]:
380        """List connections by name in the workspace.
381
382        TODO: Add pagination support
383        """
384        connections = api_util.list_connections(
385            api_root=self.api_root,
386            workspace_id=self.workspace_id,
387            name=name,
388            name_filter=name_filter,
389            client_id=self.client_id,
390            client_secret=self.client_secret,
391        )
392        return [
393            CloudConnection(
394                workspace=self,
395                connection_id=connection.connection_id,
396                source=None,
397                destination=None,
398            )
399            for connection in connections
400            if name is None or connection.name == name
401        ]
402
403    def list_sources(
404        self,
405        name: str | None = None,
406        *,
407        name_filter: Callable | None = None,
408    ) -> list[CloudSource]:
409        """List all sources in the workspace.
410
411        TODO: Add pagination support
412        """
413        sources = api_util.list_sources(
414            api_root=self.api_root,
415            workspace_id=self.workspace_id,
416            name=name,
417            name_filter=name_filter,
418            client_id=self.client_id,
419            client_secret=self.client_secret,
420        )
421        return [
422            CloudSource(
423                workspace=self,
424                connector_id=source.source_id,
425            )
426            for source in sources
427            if name is None or source.name == name
428        ]
429
430    def list_destinations(
431        self,
432        name: str | None = None,
433        *,
434        name_filter: Callable | None = None,
435    ) -> list[CloudDestination]:
436        """List all destinations in the workspace.
437
438        TODO: Add pagination support
439        """
440        destinations = api_util.list_destinations(
441            api_root=self.api_root,
442            workspace_id=self.workspace_id,
443            name=name,
444            name_filter=name_filter,
445            client_id=self.client_id,
446            client_secret=self.client_secret,
447        )
448        return [
449            CloudDestination(
450                workspace=self,
451                connector_id=destination.destination_id,
452            )
453            for destination in destinations
454            if name is None or destination.name == name
455        ]

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, client_id: airbyte.secrets.SecretString, client_secret: airbyte.secrets.SecretString, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
75    @property
76    def workspace_url(self) -> str | None:
77        """The web URL of the workspace."""
78        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def connect(self) -> None:
82    def connect(self) -> None:
83        """Check that the workspace is reachable and raise an exception otherwise.
84
85        Note: It is not necessary to call this method before calling other operations. It
86              serves primarily as a simple check to ensure that the workspace is reachable
87              and credentials are correct.
88        """
89        _ = api_util.get_workspace(
90            api_root=self.api_root,
91            workspace_id=self.workspace_id,
92            client_id=self.client_id,
93            client_secret=self.client_secret,
94        )
95        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
 99    def get_connection(
100        self,
101        connection_id: str,
102    ) -> CloudConnection:
103        """Get a connection by ID.
104
105        This method does not fetch data from the API. It returns a `CloudConnection` object,
106        which will be loaded lazily as needed.
107        """
108        return CloudConnection(
109            workspace=self,
110            connection_id=connection_id,
111        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
113    def get_source(
114        self,
115        source_id: str,
116    ) -> CloudSource:
117        """Get a source by ID.
118
119        This method does not fetch data from the API. It returns a `CloudSource` object,
120        which will be loaded lazily as needed.
121        """
122        return CloudSource(
123            workspace=self,
124            connector_id=source_id,
125        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
127    def get_destination(
128        self,
129        destination_id: str,
130    ) -> CloudDestination:
131        """Get a destination by ID.
132
133        This method does not fetch data from the API. It returns a `CloudDestination` object,
134        which will be loaded lazily as needed.
135        """
136        return CloudDestination(
137            workspace=self,
138            connector_id=destination_id,
139        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
143    def deploy_source(
144        self,
145        name: str,
146        source: Source,
147        *,
148        unique: bool = True,
149        random_name_suffix: bool = False,
150    ) -> CloudSource:
151        """Deploy a source to the workspace.
152
153        Returns the newly deployed source.
154
155        Args:
156            name: The name to use when deploying.
157            source: The source object to deploy.
158            unique: Whether to require a unique name. If `True`, duplicate names
159                are not allowed. Defaults to `True`.
160            random_name_suffix: Whether to append a random suffix to the name.
161        """
162        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
163        source_config_dict["sourceType"] = source.name.replace("source-", "")
164
165        if random_name_suffix:
166            name += f" (ID: {text_util.generate_random_suffix()})"
167
168        if unique:
169            existing = self.list_sources(name=name)
170            if existing:
171                raise exc.AirbyteDuplicateResourcesError(
172                    resource_type="source",
173                    resource_name=name,
174                )
175
176        deployed_source = api_util.create_source(
177            name=name,
178            api_root=self.api_root,
179            workspace_id=self.workspace_id,
180            config=source_config_dict,
181            client_id=self.client_id,
182            client_secret=self.client_secret,
183        )
184        return CloudSource(
185            workspace=self,
186            connector_id=deployed_source.source_id,
187        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
189    def deploy_destination(
190        self,
191        name: str,
192        destination: Destination | dict[str, Any],
193        *,
194        unique: bool = True,
195        random_name_suffix: bool = False,
196    ) -> CloudDestination:
197        """Deploy a destination to the workspace.
198
199        Returns the newly deployed destination ID.
200
201        Args:
202            name: The name to use when deploying.
203            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
204                dictionary of configuration values.
205            unique: Whether to require a unique name. If `True`, duplicate names
206                are not allowed. Defaults to `True`.
207            random_name_suffix: Whether to append a random suffix to the name.
208        """
209        if isinstance(destination, Destination):
210            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
211            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
212            # raise ValueError(destination_conf_dict)
213        else:
214            destination_conf_dict = destination.copy()
215            if "destinationType" not in destination_conf_dict:
216                raise exc.PyAirbyteInputError(
217                    message="Missing `destinationType` in configuration dictionary.",
218                )
219
220        if random_name_suffix:
221            name += f" (ID: {text_util.generate_random_suffix()})"
222
223        if unique:
224            existing = self.list_destinations(name=name)
225            if existing:
226                raise exc.AirbyteDuplicateResourcesError(
227                    resource_type="destination",
228                    resource_name=name,
229                )
230
231        deployed_destination = api_util.create_destination(
232            name=name,
233            api_root=self.api_root,
234            workspace_id=self.workspace_id,
235            config=destination_conf_dict,  # Wants a dataclass but accepts dict
236            client_id=self.client_id,
237            client_secret=self.client_secret,
238        )
239        return CloudDestination(
240            workspace=self,
241            connector_id=deployed_destination.destination_id,
242        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source(self, source: str | airbyte.cloud.connectors.CloudSource) -> None:
244    def permanently_delete_source(
245        self,
246        source: str | CloudSource,
247    ) -> None:
248        """Delete a source from the workspace.
249
250        You can pass either the source ID `str` or a deployed `Source` object.
251        """
252        if not isinstance(source, (str, CloudSource)):
253            raise exc.PyAirbyteInputError(
254                message="Invalid source type.",
255                input_value=type(source).__name__,
256            )
257
258        api_util.delete_source(
259            source_id=source.connector_id if isinstance(source, CloudSource) else source,
260            api_root=self.api_root,
261            client_id=self.client_id,
262            client_secret=self.client_secret,
263        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination) -> None:
267    def permanently_delete_destination(
268        self,
269        destination: str | CloudDestination,
270    ) -> None:
271        """Delete a deployed destination from the workspace.
272
273        You can pass either the `Cache` class or the deployed destination ID as a `str`.
274        """
275        if not isinstance(destination, (str, CloudDestination)):
276            raise exc.PyAirbyteInputError(
277                message="Invalid destination type.",
278                input_value=type(destination).__name__,
279            )
280
281        api_util.delete_destination(
282            destination_id=(
283                destination if isinstance(destination, str) else destination.destination_id
284            ),
285            api_root=self.api_root,
286            client_id=self.client_id,
287            client_secret=self.client_secret,
288        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
292    def deploy_connection(
293        self,
294        connection_name: str,
295        *,
296        source: CloudSource | str,
297        selected_streams: list[str],
298        destination: CloudDestination | str,
299        table_prefix: str | None = None,
300    ) -> CloudConnection:
301        """Create a new connection between an already deployed source and destination.
302
303        Returns the newly deployed connection object.
304
305        Args:
306            connection_name: The name of the connection.
307            source: The deployed source. You can pass a source ID or a CloudSource object.
308            destination: The deployed destination. You can pass a destination ID or a
309                CloudDestination object.
310            table_prefix: Optional. The table prefix to use when syncing to the destination.
311            selected_streams: The selected stream names to sync within the connection.
312        """
313        if not selected_streams:
314            raise exc.PyAirbyteInputError(
315                guidance="You must provide `selected_streams` when creating a connection."
316            )
317
318        source_id: str = source if isinstance(source, str) else source.connector_id
319        destination_id: str = (
320            destination if isinstance(destination, str) else destination.connector_id
321        )
322
323        deployed_connection = api_util.create_connection(
324            name=connection_name,
325            source_id=source_id,
326            destination_id=destination_id,
327            api_root=self.api_root,
328            workspace_id=self.workspace_id,
329            selected_stream_names=selected_streams,
330            prefix=table_prefix or "",
331            client_id=self.client_id,
332            client_secret=self.client_secret,
333        )
334
335        return CloudConnection(
336            workspace=self,
337            connection_id=deployed_connection.connection_id,
338            source=deployed_connection.source_id,
339            destination=deployed_connection.destination_id,
340        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
342    def permanently_delete_connection(
343        self,
344        connection: str | CloudConnection,
345        *,
346        cascade_delete_source: bool = False,
347        cascade_delete_destination: bool = False,
348    ) -> None:
349        """Delete a deployed connection from the workspace."""
350        if connection is None:
351            raise ValueError("No connection ID provided.")
352
353        if isinstance(connection, str):
354            connection = CloudConnection(
355                workspace=self,
356                connection_id=connection,
357            )
358
359        api_util.delete_connection(
360            connection_id=connection.connection_id,
361            api_root=self.api_root,
362            workspace_id=self.workspace_id,
363            client_id=self.client_id,
364            client_secret=self.client_secret,
365        )
366
367        if cascade_delete_source:
368            self.permanently_delete_source(source=connection.source_id)
369        if cascade_delete_destination:
370            self.permanently_delete_destination(destination=connection.destination_id)

Delete a deployed connection from the workspace.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[CloudConnection]:
374    def list_connections(
375        self,
376        name: str | None = None,
377        *,
378        name_filter: Callable | None = None,
379    ) -> list[CloudConnection]:
380        """List connections by name in the workspace.
381
382        TODO: Add pagination support
383        """
384        connections = api_util.list_connections(
385            api_root=self.api_root,
386            workspace_id=self.workspace_id,
387            name=name,
388            name_filter=name_filter,
389            client_id=self.client_id,
390            client_secret=self.client_secret,
391        )
392        return [
393            CloudConnection(
394                workspace=self,
395                connection_id=connection.connection_id,
396                source=None,
397                destination=None,
398            )
399            for connection in connections
400            if name is None or connection.name == name
401        ]

List connections by name in the workspace.

TODO: Add pagination support

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
403    def list_sources(
404        self,
405        name: str | None = None,
406        *,
407        name_filter: Callable | None = None,
408    ) -> list[CloudSource]:
409        """List all sources in the workspace.
410
411        TODO: Add pagination support
412        """
413        sources = api_util.list_sources(
414            api_root=self.api_root,
415            workspace_id=self.workspace_id,
416            name=name,
417            name_filter=name_filter,
418            client_id=self.client_id,
419            client_secret=self.client_secret,
420        )
421        return [
422            CloudSource(
423                workspace=self,
424                connector_id=source.source_id,
425            )
426            for source in sources
427            if name is None or source.name == name
428        ]

List all sources in the workspace.

TODO: Add pagination support

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
430    def list_destinations(
431        self,
432        name: str | None = None,
433        *,
434        name_filter: Callable | None = None,
435    ) -> list[CloudDestination]:
436        """List all destinations in the workspace.
437
438        TODO: Add pagination support
439        """
440        destinations = api_util.list_destinations(
441            api_root=self.api_root,
442            workspace_id=self.workspace_id,
443            name=name,
444            name_filter=name_filter,
445            client_id=self.client_id,
446            client_secret=self.client_secret,
447        )
448        return [
449            CloudDestination(
450                workspace=self,
451                connector_id=destination.destination_id,
452            )
453            for destination in destinations
454            if name is None or destination.name == name
455        ]

List all destinations in the workspace.

TODO: Add pagination support

class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    # Properties
 69
 70    @property
 71    def source_id(self) -> str:
 72        """The ID of the source."""
 73        if not self._source_id:
 74            if not self._connection_info:
 75                self._connection_info = self._fetch_connection_info()
 76
 77            self._source_id = self._connection_info.source_id
 78
 79        return cast("str", self._source_id)
 80
 81    @property
 82    def source(self) -> CloudSource:
 83        """Get the source object."""
 84        if self._cloud_source_object:
 85            return self._cloud_source_object
 86
 87        self._cloud_source_object = CloudSource(
 88            workspace=self.workspace,
 89            connector_id=self.source_id,
 90        )
 91        return self._cloud_source_object
 92
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)
103
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object
115
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]
123
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""
131
132    @property
133    def connection_url(self) -> str | None:
134        """The web URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
136
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/timeline"
141
142    # Run Sync
143
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result
172
173    def __repr__(self) -> str:
174        """String representation of the connection."""
175        return (
176            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
177            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
178        )
179
180    # Logs
181
182    def get_previous_sync_logs(
183        self,
184        *,
185        limit: int = 10,
186    ) -> list[SyncResult]:
187        """Get the previous sync logs for a connection."""
188        sync_logs: list[JobResponse] = api_util.get_job_logs(
189            connection_id=self.connection_id,
190            api_root=self.workspace.api_root,
191            workspace_id=self.workspace.workspace_id,
192            limit=limit,
193            client_id=self.workspace.client_id,
194            client_secret=self.workspace.client_secret,
195        )
196        return [
197            SyncResult(
198                workspace=self.workspace,
199                connection=self,
200                job_id=sync_log.job_id,
201                _latest_job_info=sync_log,
202            )
203            for sync_log in sync_logs
204        ]
205
206    def get_sync_result(
207        self,
208        job_id: int | None = None,
209    ) -> SyncResult | None:
210        """Get the sync result for the connection.
211
212        If `job_id` is not provided, the most recent sync job will be used.
213
214        Returns `None` if job_id is omitted and no previous jobs are found.
215        """
216        if job_id is None:
217            # Get the most recent sync job
218            results = self.get_previous_sync_logs(
219                limit=1,
220            )
221            if results:
222                return results[0]
223
224            return None
225
226        # Get the sync job by ID (lazy loaded)
227        return SyncResult(
228            workspace=self.workspace,
229            connection=self,
230            job_id=job_id,
231        )
232
233    # Deletions
234
235    def permanently_delete(
236        self,
237        *,
238        cascade_delete_source: bool = False,
239        cascade_delete_destination: bool = False,
240    ) -> None:
241        """Delete the connection.
242
243        Args:
244            cascade_delete_source: Whether to also delete the source.
245            cascade_delete_destination: Whether to also delete the destination.
246        """
247        self.workspace.permanently_delete_connection(self)
248
249        if cascade_delete_source:
250            self.workspace.permanently_delete_source(self.source_id)
251
252        if cascade_delete_destination:
253            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
70    @property
71    def source_id(self) -> str:
72        """The ID of the source."""
73        if not self._source_id:
74            if not self._connection_info:
75                self._connection_info = self._fetch_connection_info()
76
77            self._source_id = self._connection_info.source_id
78
79        return cast("str", self._source_id)

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
81    @property
82    def source(self) -> CloudSource:
83        """Get the source object."""
84        if self._cloud_source_object:
85            return self._cloud_source_object
86
87        self._cloud_source_object = CloudSource(
88            workspace=self.workspace,
89            connector_id=self.source_id,
90        )
91        return self._cloud_source_object

Get the source object.

destination_id: str
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
132    @property
133    def connection_url(self) -> str | None:
134        """The web URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
182    def get_previous_sync_logs(
183        self,
184        *,
185        limit: int = 10,
186    ) -> list[SyncResult]:
187        """Get the previous sync logs for a connection."""
188        sync_logs: list[JobResponse] = api_util.get_job_logs(
189            connection_id=self.connection_id,
190            api_root=self.workspace.api_root,
191            workspace_id=self.workspace.workspace_id,
192            limit=limit,
193            client_id=self.workspace.client_id,
194            client_secret=self.workspace.client_secret,
195        )
196        return [
197            SyncResult(
198                workspace=self.workspace,
199                connection=self,
200                job_id=sync_log.job_id,
201                _latest_job_info=sync_log,
202            )
203            for sync_log in sync_logs
204        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
206    def get_sync_result(
207        self,
208        job_id: int | None = None,
209    ) -> SyncResult | None:
210        """Get the sync result for the connection.
211
212        If `job_id` is not provided, the most recent sync job will be used.
213
214        Returns `None` if job_id is omitted and no previous jobs are found.
215        """
216        if job_id is None:
217            # Get the most recent sync job
218            results = self.get_previous_sync_logs(
219                limit=1,
220            )
221            if results:
222                return results[0]
223
224            return None
225
226        # Get the sync job by ID (lazy loaded)
227        return SyncResult(
228            workspace=self.workspace,
229            connection=self,
230            job_id=job_id,
231        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
235    def permanently_delete(
236        self,
237        *,
238        cascade_delete_source: bool = False,
239        cascade_delete_destination: bool = False,
240    ) -> None:
241        """Delete the connection.
242
243        Args:
244            cascade_delete_source: Whether to also delete the source.
245            cascade_delete_destination: Whether to also delete the destination.
246        """
247        self.workspace.permanently_delete_connection(self)
248
249        if cascade_delete_source:
250            self.workspace.permanently_delete_source(self.source_id)
251
252        if cascade_delete_destination:
253            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: int
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job.
149
150        Note: This currently returns the connection's job history URL, as there is no direct URL
151        to a specific job in the Airbyte Cloud web app.
152
153        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
154              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
155        """
156        return f"{self.connection.job_history_url}"
157
158    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
159        """Return connection info for the sync job."""
160        if self._connection_response and not force_refresh:
161            return self._connection_response
162
163        self._connection_response = api_util.get_connection(
164            workspace_id=self.workspace.workspace_id,
165            api_root=self.workspace.api_root,
166            connection_id=self.connection.connection_id,
167            client_id=self.workspace.client_id,
168            client_secret=self.workspace.client_secret,
169        )
170        return self._connection_response
171
172    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
173        """Return the destination configuration for the sync job."""
174        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
175        destination_response = api_util.get_destination(
176            destination_id=connection_info.destination_id,
177            api_root=self.workspace.api_root,
178            client_id=self.workspace.client_id,
179            client_secret=self.workspace.client_secret,
180        )
181        return asdict(destination_response.configuration)
182
183    def is_job_complete(self) -> bool:
184        """Check if the sync job is complete."""
185        return self.get_job_status() in FINAL_STATUSES
186
187    def get_job_status(self) -> JobStatusEnum:
188        """Check if the sync job is still running."""
189        return self._fetch_latest_job_info().status
190
191    def _fetch_latest_job_info(self) -> JobResponse:
192        """Return the job info for the sync job."""
193        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
194            return self._latest_job_info
195
196        self._latest_job_info = api_util.get_job_info(
197            job_id=self.job_id,
198            api_root=self.workspace.api_root,
199            client_id=self.workspace.client_id,
200            client_secret=self.workspace.client_secret,
201        )
202        return self._latest_job_info
203
204    @property
205    def bytes_synced(self) -> int:
206        """Return the number of records processed."""
207        return self._fetch_latest_job_info().bytes_synced or 0
208
209    @property
210    def records_synced(self) -> int:
211        """Return the number of records processed."""
212        return self._fetch_latest_job_info().rows_synced or 0
213
214    @property
215    def start_time(self) -> datetime:
216        """Return the start time of the sync job in UTC."""
217        # Parse from ISO 8601 format:
218        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
219
220    def raise_failure_status(
221        self,
222        *,
223        refresh_status: bool = False,
224    ) -> None:
225        """Raise an exception if the sync job failed.
226
227        By default, this method will use the latest status available. If you want to refresh the
228        status before checking for failure, set `refresh_status=True`. If the job has failed, this
229        method will raise a `AirbyteConnectionSyncError`.
230
231        Otherwise, do nothing.
232        """
233        if not refresh_status and self._latest_job_info:
234            latest_status = self._latest_job_info.status
235        else:
236            latest_status = self.get_job_status()
237
238        if latest_status in FAILED_STATUSES:
239            raise AirbyteConnectionSyncError(
240                workspace=self.workspace,
241                connection_id=self.connection.connection_id,
242                job_id=self.job_id,
243                job_status=self.get_job_status(),
244            )
245
246    def wait_for_completion(
247        self,
248        *,
249        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
250        raise_timeout: bool = True,
251        raise_failure: bool = False,
252    ) -> JobStatusEnum:
253        """Wait for a job to finish running."""
254        start_time = time.time()
255        while True:
256            latest_status = self.get_job_status()
257            if latest_status in FINAL_STATUSES:
258                if raise_failure:
259                    # No-op if the job succeeded or is still running:
260                    self.raise_failure_status()
261
262                return latest_status
263
264            if time.time() - start_time > wait_timeout:
265                if raise_timeout:
266                    raise AirbyteConnectionSyncTimeoutError(
267                        workspace=self.workspace,
268                        connection_id=self.connection.connection_id,
269                        job_id=self.job_id,
270                        job_status=latest_status,
271                        timeout=wait_timeout,
272                    )
273
274                return latest_status  # This will be a non-final status
275
276            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
277
278    def get_sql_cache(self) -> CacheBase:
279        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
280        if self._cache:
281            return self._cache
282
283        destination_configuration = self._get_destination_configuration()
284        self._cache = destination_to_cache(destination_configuration=destination_configuration)
285        return self._cache
286
287    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
288        """Return a SQL Engine for querying a SQL-based destination."""
289        return self.get_sql_cache().get_sql_engine()
290
291    def get_sql_table_name(self, stream_name: str) -> str:
292        """Return the SQL table name of the named stream."""
293        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
294
295    def get_sql_table(
296        self,
297        stream_name: str,
298    ) -> sqlalchemy.Table:
299        """Return a SQLAlchemy table object for the named stream."""
300        return self.get_sql_cache().processor.get_sql_table(stream_name)
301
302    def get_dataset(self, stream_name: str) -> CachedDataset:
303        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
304
305        This can be used to read and analyze the data in a SQL-based destination.
306
307        TODO: In a future iteration, we can consider providing stream configuration information
308              (catalog information) to the `CachedDataset` object via the "Get stream properties"
309              API: https://reference.airbyte.com/reference/getstreamproperties
310        """
311        return CachedDataset(
312            self.get_sql_cache(),
313            stream_name=stream_name,
314            stream_configuration=False,  # Don't look for stream configuration in cache.
315        )
316
317    def get_sql_database_name(self) -> str:
318        """Return the SQL database name."""
319        cache = self.get_sql_cache()
320        return cache.get_database_name()
321
322    def get_sql_schema_name(self) -> str:
323        """Return the SQL schema name."""
324        cache = self.get_sql_cache()
325        return cache.schema_name
326
327    @property
328    def stream_names(self) -> list[str]:
329        """Return the set of stream names."""
330        return self.connection.stream_names
331
332    @final
333    @property
334    def streams(
335        self,
336    ) -> _SyncResultStreams:
337        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
338
339        This is a convenience wrapper around the `stream_names`
340        property and `get_dataset()` method.
341        """
342        return self._SyncResultStreams(self)
343
344    class _SyncResultStreams(Mapping[str, CachedDataset]):
345        """A mapping of stream names to cached datasets."""
346
347        def __init__(
348            self,
349            parent: SyncResult,
350            /,
351        ) -> None:
352            self.parent: SyncResult = parent
353
354        def __getitem__(self, key: str) -> CachedDataset:
355            return self.parent.get_dataset(stream_name=key)
356
357        def __iter__(self) -> Iterator[str]:
358            return iter(self.parent.stream_names)
359
360        def __len__(self) -> int:
361            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job.
149
150        Note: This currently returns the connection's job history URL, as there is no direct URL
151        to a specific job in the Airbyte Cloud web app.
152
153        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
154              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
155        """
156        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
183    def is_job_complete(self) -> bool:
184        """Check if the sync job is complete."""
185        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
187    def get_job_status(self) -> JobStatusEnum:
188        """Check if the sync job is still running."""
189        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
204    @property
205    def bytes_synced(self) -> int:
206        """Return the number of records processed."""
207        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
209    @property
210    def records_synced(self) -> int:
211        """Return the number of records processed."""
212        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
214    @property
215    def start_time(self) -> datetime:
216        """Return the start time of the sync job in UTC."""
217        # Parse from ISO 8601 format:
218        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
220    def raise_failure_status(
221        self,
222        *,
223        refresh_status: bool = False,
224    ) -> None:
225        """Raise an exception if the sync job failed.
226
227        By default, this method will use the latest status available. If you want to refresh the
228        status before checking for failure, set `refresh_status=True`. If the job has failed, this
229        method will raise a `AirbyteConnectionSyncError`.
230
231        Otherwise, do nothing.
232        """
233        if not refresh_status and self._latest_job_info:
234            latest_status = self._latest_job_info.status
235        else:
236            latest_status = self.get_job_status()
237
238        if latest_status in FAILED_STATUSES:
239            raise AirbyteConnectionSyncError(
240                workspace=self.workspace,
241                connection_id=self.connection.connection_id,
242                job_id=self.job_id,
243                job_status=self.get_job_status(),
244            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
246    def wait_for_completion(
247        self,
248        *,
249        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
250        raise_timeout: bool = True,
251        raise_failure: bool = False,
252    ) -> JobStatusEnum:
253        """Wait for a job to finish running."""
254        start_time = time.time()
255        while True:
256            latest_status = self.get_job_status()
257            if latest_status in FINAL_STATUSES:
258                if raise_failure:
259                    # No-op if the job succeeded or is still running:
260                    self.raise_failure_status()
261
262                return latest_status
263
264            if time.time() - start_time > wait_timeout:
265                if raise_timeout:
266                    raise AirbyteConnectionSyncTimeoutError(
267                        workspace=self.workspace,
268                        connection_id=self.connection.connection_id,
269                        job_id=self.job_id,
270                        job_status=latest_status,
271                        timeout=wait_timeout,
272                    )
273
274                return latest_status  # This will be a non-final status
275
276            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
278    def get_sql_cache(self) -> CacheBase:
279        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
280        if self._cache:
281            return self._cache
282
283        destination_configuration = self._get_destination_configuration()
284        self._cache = destination_to_cache(destination_configuration=destination_configuration)
285        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
287    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
288        """Return a SQL Engine for querying a SQL-based destination."""
289        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
291    def get_sql_table_name(self, stream_name: str) -> str:
292        """Return the SQL table name of the named stream."""
293        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
295    def get_sql_table(
296        self,
297        stream_name: str,
298    ) -> sqlalchemy.Table:
299        """Return a SQLAlchemy table object for the named stream."""
300        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
302    def get_dataset(self, stream_name: str) -> CachedDataset:
303        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
304
305        This can be used to read and analyze the data in a SQL-based destination.
306
307        TODO: In a future iteration, we can consider providing stream configuration information
308              (catalog information) to the `CachedDataset` object via the "Get stream properties"
309              API: https://reference.airbyte.com/reference/getstreamproperties
310        """
311        return CachedDataset(
312            self.get_sql_cache(),
313            stream_name=stream_name,
314            stream_configuration=False,  # Don't look for stream configuration in cache.
315        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
317    def get_sql_database_name(self) -> str:
318        """Return the SQL database name."""
319        cache = self.get_sql_cache()
320        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
322    def get_sql_schema_name(self) -> str:
323        """Return the SQL schema name."""
324        cache = self.get_sql_cache()
325        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
327    @property
328    def stream_names(self) -> list[str]:
329        """Return the set of stream names."""
330        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
332    @final
333    @property
334    def streams(
335        self,
336    ) -> _SyncResultStreams:
337        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
338
339        This is a convenience wrapper around the `stream_names`
340        property and `get_dataset()` method.
341        """
342        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 8class JobStatusEnum(str, Enum):
 9    PENDING = 'pending'
10    RUNNING = 'running'
11    INCOMPLETE = 'incomplete'
12    FAILED = 'failed'
13    SUCCEEDED = 'succeeded'
14    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans