airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Usage Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

ℹ️ Experimental Features

You can use the airbyte.cloud.experimental module to access experimental features. These additional features are subject to change and may not be available in all environments.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Usage Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50
51ℹ️ **Experimental Features**
52
53You can use the `airbyte.cloud.experimental` module to access experimental features.
54These additional features are subject to change and may not be available in all environments.
55"""  # noqa: RUF002  # Allow emoji
56
57from __future__ import annotations
58
59from airbyte.cloud import connections, constants, sync_results, workspaces
60from airbyte.cloud.connections import CloudConnection
61from airbyte.cloud.constants import JobStatusEnum
62from airbyte.cloud.sync_results import SyncResult
63from airbyte.cloud.workspaces import CloudWorkspace
64
65
66__all__ = [
67    # Submodules
68    "workspaces",
69    "connections",
70    "constants",
71    "sync_results",
72    # Classes
73    "CloudWorkspace",
74    "CloudConnection",
75    "SyncResult",
76    # Enums
77    "JobStatusEnum",
78]
@dataclass
class CloudWorkspace:
 36@dataclass
 37class CloudWorkspace:
 38    """A remote workspace on the Airbyte Cloud.
 39
 40    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 41    instances, both OSS and Enterprise.
 42    """
 43
 44    workspace_id: str
 45    api_key: str
 46    api_root: str = CLOUD_API_ROOT
 47
 48    @property
 49    def workspace_url(self) -> str | None:
 50        """The URL of the workspace."""
 51        return f"{self.api_root}/workspaces/{self.workspace_id}"
 52
 53    # Test connection and creds
 54
 55    def connect(self) -> None:
 56        """Check that the workspace is reachable and raise an exception otherwise.
 57
 58        Note: It is not necessary to call this method before calling other operations. It
 59              serves primarily as a simple check to ensure that the workspace is reachable
 60              and credentials are correct.
 61        """
 62        _ = get_workspace(
 63            api_root=self.api_root,
 64            api_key=self.api_key,
 65            workspace_id=self.workspace_id,
 66        )
 67        print(f"Successfully connected to workspace: {self.workspace_url}")
 68
 69    # Deploy and delete sources
 70
 71    # TODO: Make this a public API
 72    # https://github.com/airbytehq/pyairbyte/issues/228
 73    def _deploy_source(
 74        self,
 75        source: Source,
 76    ) -> str:
 77        """Deploy a source to the workspace.
 78
 79        Returns the newly deployed source ID.
 80        """
 81        source_configuration = source.get_config().copy()
 82        source_configuration["sourceType"] = source.name.replace("source-", "")
 83
 84        deployed_source = create_source(
 85            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
 86            api_root=self.api_root,
 87            api_key=self.api_key,
 88            workspace_id=self.workspace_id,
 89            config=source_configuration,
 90        )
 91
 92        # Set the deployment Ids on the source object
 93        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
 94        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
 95        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
 96
 97        return deployed_source.source_id
 98
 99    def _permanently_delete_source(
100        self,
101        source: str | Source,
102    ) -> None:
103        """Delete a source from the workspace.
104
105        You can pass either the source ID `str` or a deployed `Source` object.
106        """
107        if not isinstance(source, str | Source):
108            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
109
110        if isinstance(source, Source):
111            if not source._deployed_source_id:  # noqa: SLF001
112                raise ValueError("Source has not been deployed.")  # noqa: TRY003
113
114            source_id = source._deployed_source_id  # noqa: SLF001
115
116        elif isinstance(source, str):
117            source_id = source
118
119        delete_source(
120            source_id=source_id,
121            api_root=self.api_root,
122            api_key=self.api_key,
123        )
124
125    # Deploy and delete destinations
126
127    # TODO: Make this a public API
128    # https://github.com/airbytehq/pyairbyte/issues/228
129    def _deploy_cache_as_destination(
130        self,
131        cache: CacheBase,
132    ) -> str:
133        """Deploy a cache to the workspace as a new destination.
134
135        Returns the newly deployed destination ID.
136        """
137        cache_type_name = cache.__class__.__name__.replace("Cache", "")
138
139        deployed_destination: DestinationResponse = create_destination(
140            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
141            api_root=self.api_root,
142            api_key=self.api_key,
143            workspace_id=self.workspace_id,
144            config=get_destination_config_from_cache(cache),
145        )
146
147        # Set the deployment Ids on the source object
148        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
149        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
150        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
151
152        return deployed_destination.destination_id
153
154    def _permanently_delete_destination(
155        self,
156        *,
157        destination: str | None = None,
158        cache: CacheBase | None = None,
159    ) -> None:
160        """Delete a deployed destination from the workspace.
161
162        You can pass either the `Cache` class or the deployed destination ID as a `str`.
163        """
164        if destination is None and cache is None:
165            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
166        if destination is not None and cache is not None:
167            raise ValueError(  # noqa: TRY003
168                "You must provide either a destination ID or a cache object, not both."
169            )
170
171        if cache:
172            if not cache._deployed_destination_id:  # noqa: SLF001
173                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
174
175            destination = cache._deployed_destination_id  # noqa: SLF001
176
177        if destination is None:
178            raise ValueError("No destination ID provided.")  # noqa: TRY003
179
180        delete_destination(
181            destination_id=destination,
182            api_root=self.api_root,
183            api_key=self.api_key,
184        )
185
186    # Deploy and delete connections
187
188    # TODO: Make this a public API
189    # https://github.com/airbytehq/pyairbyte/issues/228
190    def _deploy_connection(
191        self,
192        source: Source | str,
193        cache: CacheBase | None = None,
194        destination: str | None = None,
195        table_prefix: str | None = None,
196        selected_streams: list[str] | None = None,
197    ) -> CloudConnection:
198        """Deploy a source and cache to the workspace as a new connection.
199
200        Returns the newly deployed connection ID as a `str`.
201
202        Args:
203            source (Source | str): The source to deploy. You can pass either an already deployed
204                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
205                it will be deployed automatically.
206            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
207                `cache` or `destination`, but not both.
208            destination (str, optional): The destination ID to use. You can provide
209                `cache` or `destination`, but not both.
210            table_prefix (str, optional): The table prefix to use for the cache. If not provided,
211                the cache's table prefix will be used.
212            selected_streams (list[str], optional): The selected stream names to use for the
213                connection. If not provided, the source's selected streams will be used.
214        """
215        # Resolve source ID
216        source_id: str
217        if isinstance(source, Source):
218            selected_streams = selected_streams or source.get_selected_streams()
219            source_id = (
220                source._deployed_source_id  # noqa: SLF001  # Access to non-public API
221                or self._deploy_source(source)
222            )
223        else:
224            source_id = source
225            if not selected_streams:
226                raise exc.PyAirbyteInputError(
227                    guidance="You must provide `selected_streams` when deploying a source ID."
228                )
229
230        # Resolve destination ID
231        destination_id: str
232        if destination:
233            destination_id = destination
234        elif cache:
235            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
236            if not cache._deployed_destination_id:  # noqa: SLF001
237                destination_id = self._deploy_cache_as_destination(cache)
238            else:
239                destination_id = cache._deployed_destination_id  # noqa: SLF001
240        else:
241            raise exc.PyAirbyteInputError(
242                guidance="You must provide either a destination ID or a cache object."
243            )
244
245        assert source_id is not None
246        assert destination_id is not None
247
248        deployed_connection = create_connection(
249            name="Connection (Deployed by PyAirbyte)",
250            source_id=source_id,
251            destination_id=destination_id,
252            api_root=self.api_root,
253            api_key=self.api_key,
254            workspace_id=self.workspace_id,
255            selected_stream_names=selected_streams,
256            prefix=table_prefix or "",
257        )
258
259        if isinstance(source, Source):
260            source._deployed_api_root = self.api_root  # noqa: SLF001
261            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
262            source._deployed_source_id = source_id  # noqa: SLF001
263        if cache:
264            cache._deployed_api_root = self.api_root  # noqa: SLF001
265            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
266            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
267
268        return CloudConnection(
269            workspace=self,
270            connection_id=deployed_connection.connection_id,
271            source=deployed_connection.source_id,
272            destination=deployed_connection.destination_id,
273        )
274
275    def get_connection(
276        self,
277        connection_id: str,
278    ) -> CloudConnection:
279        """Get a connection by ID.
280
281        This method does not fetch data from the API. It returns a `CloudConnection` object,
282        which will be loaded lazily as needed.
283        """
284        return CloudConnection(
285            workspace=self,
286            connection_id=connection_id,
287        )
288
289    def _permanently_delete_connection(
290        self,
291        connection: str | CloudConnection,
292        *,
293        delete_source: bool = False,
294        delete_destination: bool = False,
295    ) -> None:
296        """Delete a deployed connection from the workspace."""
297        if connection is None:
298            raise ValueError("No connection ID provided.")  # noqa: TRY003
299
300        if isinstance(connection, str):
301            connection = CloudConnection(
302                workspace=self,
303                connection_id=connection,
304            )
305
306        delete_connection(
307            connection_id=connection.connection_id,
308            api_root=self.api_root,
309            api_key=self.api_key,
310            workspace_id=self.workspace_id,
311        )
312        if delete_source:
313            self._permanently_delete_source(source=connection.source_id)
314
315        if delete_destination:
316            self._permanently_delete_destination(destination=connection.destination_id)
317
318    # Run syncs
319
320    def run_sync(
321        self,
322        connection_id: str,
323        *,
324        wait: bool = True,
325        wait_timeout: int = 300,
326    ) -> SyncResult:
327        """Run a sync on a deployed connection."""
328        connection = CloudConnection(
329            workspace=self,
330            connection_id=connection_id,
331        )
332        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
333
334    # Get sync results and previous sync logs
335
336    def get_sync_result(
337        self,
338        connection_id: str,
339        job_id: str | None = None,
340    ) -> SyncResult | None:
341        """Get the sync result for a connection job.
342
343        If `job_id` is not provided, the most recent sync job will be used.
344
345        Returns `None` if job_id is omitted and no previous jobs are found.
346        """
347        connection = CloudConnection(
348            workspace=self,
349            connection_id=connection_id,
350        )
351        if job_id is None:
352            results = self.get_previous_sync_logs(
353                connection_id=connection_id,
354                limit=1,
355            )
356            if results:
357                return results[0]
358
359            return None
360        connection = CloudConnection(
361            workspace=self,
362            connection_id=connection_id,
363        )
364        return SyncResult(
365            workspace=self,
366            connection=connection,
367            job_id=job_id,
368        )
369
370    def get_previous_sync_logs(
371        self,
372        connection_id: str,
373        *,
374        limit: int = 10,
375    ) -> list[SyncResult]:
376        """Get the previous sync logs for a connection."""
377        connection = CloudConnection(
378            workspace=self,
379            connection_id=connection_id,
380        )
381        return connection.get_previous_sync_logs(
382            limit=limit,
383        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, api_key: str, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_key: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
48    @property
49    def workspace_url(self) -> str | None:
50        """The URL of the workspace."""
51        return f"{self.api_root}/workspaces/{self.workspace_id}"

The URL of the workspace.

def connect(self) -> None:
55    def connect(self) -> None:
56        """Check that the workspace is reachable and raise an exception otherwise.
57
58        Note: It is not necessary to call this method before calling other operations. It
59              serves primarily as a simple check to ensure that the workspace is reachable
60              and credentials are correct.
61        """
62        _ = get_workspace(
63            api_root=self.api_root,
64            api_key=self.api_key,
65            workspace_id=self.workspace_id,
66        )
67        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
275    def get_connection(
276        self,
277        connection_id: str,
278    ) -> CloudConnection:
279        """Get a connection by ID.
280
281        This method does not fetch data from the API. It returns a `CloudConnection` object,
282        which will be loaded lazily as needed.
283        """
284        return CloudConnection(
285            workspace=self,
286            connection_id=connection_id,
287        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def run_sync( self, connection_id: str, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
320    def run_sync(
321        self,
322        connection_id: str,
323        *,
324        wait: bool = True,
325        wait_timeout: int = 300,
326    ) -> SyncResult:
327        """Run a sync on a deployed connection."""
328        connection = CloudConnection(
329            workspace=self,
330            connection_id=connection_id,
331        )
332        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)

Run a sync on a deployed connection.

def get_sync_result( self, connection_id: str, job_id: str | None = None) -> SyncResult | None:
336    def get_sync_result(
337        self,
338        connection_id: str,
339        job_id: str | None = None,
340    ) -> SyncResult | None:
341        """Get the sync result for a connection job.
342
343        If `job_id` is not provided, the most recent sync job will be used.
344
345        Returns `None` if job_id is omitted and no previous jobs are found.
346        """
347        connection = CloudConnection(
348            workspace=self,
349            connection_id=connection_id,
350        )
351        if job_id is None:
352            results = self.get_previous_sync_logs(
353                connection_id=connection_id,
354                limit=1,
355            )
356            if results:
357                return results[0]
358
359            return None
360        connection = CloudConnection(
361            workspace=self,
362            connection_id=connection_id,
363        )
364        return SyncResult(
365            workspace=self,
366            connection=connection,
367            job_id=job_id,
368        )

Get the sync result for a connection job.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_previous_sync_logs( self, connection_id: str, *, limit: int = 10) -> list[SyncResult]:
370    def get_previous_sync_logs(
371        self,
372        connection_id: str,
373        *,
374        limit: int = 10,
375    ) -> list[SyncResult]:
376        """Get the previous sync logs for a connection."""
377        connection = CloudConnection(
378            workspace=self,
379            connection_id=connection_id,
380        )
381        return connection.get_previous_sync_logs(
382            limit=limit,
383        )

Get the previous sync logs for a connection.

class CloudConnection:
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49
 50    def _fetch_connection_info(self) -> ConnectionResponse:
 51        """Populate the connection with data from the API."""
 52        return api_util.get_connection(
 53            workspace_id=self.workspace.workspace_id,
 54            connection_id=self.connection_id,
 55            api_root=self.workspace.api_root,
 56            api_key=self.workspace.api_key,
 57        )
 58
 59    # Properties
 60
 61    @property
 62    def source_id(self) -> str:
 63        """The ID of the source."""
 64        if not self._source_id:
 65            if not self._connection_info:
 66                self._connection_info = self._fetch_connection_info()
 67
 68            self._source_id = self._connection_info.source_id
 69
 70        return cast(str, self._source_id)
 71
 72    @property
 73    def destination_id(self) -> str:
 74        """The ID of the destination."""
 75        if not self._destination_id:
 76            if not self._connection_info:
 77                self._connection_info = self._fetch_connection_info()
 78
 79            self._destination_id = self._connection_info.source_id
 80
 81        return cast(str, self._destination_id)
 82
 83    @property
 84    def stream_names(self) -> list[str]:
 85        """The stream names."""
 86        if not self._connection_info:
 87            self._connection_info = self._fetch_connection_info()
 88
 89        return [stream.name for stream in self._connection_info.configurations.streams]
 90
 91    @property
 92    def table_prefix(self) -> str:
 93        """The table prefix."""
 94        if not self._connection_info:
 95            self._connection_info = self._fetch_connection_info()
 96
 97        return self._connection_info.prefix
 98
 99    @property
100    def connection_url(self) -> str | None:
101        """The URL to the connection."""
102        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
103
104    @property
105    def job_history_url(self) -> str | None:
106        """The URL to the job history for the connection."""
107        return f"{self.connection_url}/job-history"
108
109    # Run Sync
110
111    def run_sync(
112        self,
113        *,
114        wait: bool = True,
115        wait_timeout: int = 300,
116    ) -> SyncResult:
117        """Run a sync."""
118        connection_response = api_util.run_connection(
119            connection_id=self.connection_id,
120            api_root=self.workspace.api_root,
121            api_key=self.workspace.api_key,
122            workspace_id=self.workspace.workspace_id,
123        )
124        sync_result = SyncResult(
125            workspace=self.workspace,
126            connection=self,
127            job_id=connection_response.job_id,
128        )
129
130        if wait:
131            sync_result.wait_for_completion(
132                wait_timeout=wait_timeout,
133                raise_failure=True,
134                raise_timeout=True,
135            )
136
137        return sync_result
138
139    # Logs
140
141    def get_previous_sync_logs(
142        self,
143        *,
144        limit: int = 10,
145    ) -> list[SyncResult]:
146        """Get the previous sync logs for a connection."""
147        sync_logs: list[JobResponse] = api_util.get_job_logs(
148            connection_id=self.connection_id,
149            api_root=self.workspace.api_root,
150            api_key=self.workspace.api_key,
151            workspace_id=self.workspace.workspace_id,
152            limit=limit,
153        )
154        return [
155            SyncResult(
156                workspace=self.workspace,
157                connection=self,
158                job_id=sync_log.job_id,
159                _latest_job_info=sync_log,
160            )
161            for sync_log in sync_logs
162        ]
163
164    def get_sync_result(
165        self,
166        job_id: str | None = None,
167    ) -> SyncResult | None:
168        """Get the sync result for the connection.
169
170        If `job_id` is not provided, the most recent sync job will be used.
171
172        Returns `None` if job_id is omitted and no previous jobs are found.
173        """
174        if job_id is None:
175            # Get the most recent sync job
176            results = self.get_previous_sync_logs(
177                limit=1,
178            )
179            if results:
180                return results[0]
181
182            return None
183
184        # Get the sync job by ID (lazy loaded)
185        return SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=job_id,
189        )
190
191    # Deletions
192
193    def _permanently_delete(
194        self,
195        *,
196        delete_source: bool = False,
197        delete_destination: bool = False,
198    ) -> None:
199        """Delete the connection.
200
201        Args:
202            delete_source: Whether to also delete the source.
203            delete_destination: Whether to also delete the destination.
204        """
205        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
206            connection=self
207        )
208
209        if delete_source:
210            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
211                source=self.source_id
212            )
213
214        if delete_destination:
215            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
216                destination=self.destination_id,
217            )

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
25    def __init__(
26        self,
27        workspace: CloudWorkspace,
28        connection_id: str,
29        source: str | None = None,
30        destination: str | None = None,
31    ) -> None:
32        """It is not recommended to create a `CloudConnection` object directly.
33
34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
35        """
36        self.connection_id = connection_id
37        """The ID of the connection."""
38
39        self.workspace = workspace
40        """The workspace that the connection belongs to."""
41
42        self._source_id = source
43        """The ID of the source."""
44
45        self._destination_id = destination
46        """The ID of the destination."""
47
48        self._connection_info: ConnectionResponse | None = None

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
61    @property
62    def source_id(self) -> str:
63        """The ID of the source."""
64        if not self._source_id:
65            if not self._connection_info:
66                self._connection_info = self._fetch_connection_info()
67
68            self._source_id = self._connection_info.source_id
69
70        return cast(str, self._source_id)

The ID of the source.

destination_id: str
72    @property
73    def destination_id(self) -> str:
74        """The ID of the destination."""
75        if not self._destination_id:
76            if not self._connection_info:
77                self._connection_info = self._fetch_connection_info()
78
79            self._destination_id = self._connection_info.source_id
80
81        return cast(str, self._destination_id)

The ID of the destination.

stream_names: list[str]
83    @property
84    def stream_names(self) -> list[str]:
85        """The stream names."""
86        if not self._connection_info:
87            self._connection_info = self._fetch_connection_info()
88
89        return [stream.name for stream in self._connection_info.configurations.streams]

The stream names.

table_prefix: str
91    @property
92    def table_prefix(self) -> str:
93        """The table prefix."""
94        if not self._connection_info:
95            self._connection_info = self._fetch_connection_info()
96
97        return self._connection_info.prefix

The table prefix.

connection_url: str | None
 99    @property
100    def connection_url(self) -> str | None:
101        """The URL to the connection."""
102        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The URL to the connection.

job_history_url: str | None
104    @property
105    def job_history_url(self) -> str | None:
106        """The URL to the job history for the connection."""
107        return f"{self.connection_url}/job-history"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
111    def run_sync(
112        self,
113        *,
114        wait: bool = True,
115        wait_timeout: int = 300,
116    ) -> SyncResult:
117        """Run a sync."""
118        connection_response = api_util.run_connection(
119            connection_id=self.connection_id,
120            api_root=self.workspace.api_root,
121            api_key=self.workspace.api_key,
122            workspace_id=self.workspace.workspace_id,
123        )
124        sync_result = SyncResult(
125            workspace=self.workspace,
126            connection=self,
127            job_id=connection_response.job_id,
128        )
129
130        if wait:
131            sync_result.wait_for_completion(
132                wait_timeout=wait_timeout,
133                raise_failure=True,
134                raise_timeout=True,
135            )
136
137        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
141    def get_previous_sync_logs(
142        self,
143        *,
144        limit: int = 10,
145    ) -> list[SyncResult]:
146        """Get the previous sync logs for a connection."""
147        sync_logs: list[JobResponse] = api_util.get_job_logs(
148            connection_id=self.connection_id,
149            api_root=self.workspace.api_root,
150            api_key=self.workspace.api_key,
151            workspace_id=self.workspace.workspace_id,
152            limit=limit,
153        )
154        return [
155            SyncResult(
156                workspace=self.workspace,
157                connection=self,
158                job_id=sync_log.job_id,
159                _latest_job_info=sync_log,
160            )
161            for sync_log in sync_logs
162        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: str | None = None) -> SyncResult | None:
164    def get_sync_result(
165        self,
166        job_id: str | None = None,
167    ) -> SyncResult | None:
168        """Get the sync result for the connection.
169
170        If `job_id` is not provided, the most recent sync job will be used.
171
172        Returns `None` if job_id is omitted and no previous jobs are found.
173        """
174        if job_id is None:
175            # Get the most recent sync job
176            results = self.get_previous_sync_logs(
177                limit=1,
178            )
179            if results:
180                return results[0]
181
182            return None
183
184        # Get the sync job by ID (lazy loaded)
185        return SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=job_id,
189        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: str
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"
150
151    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
152        """Return connection info for the sync job."""
153        if self._connection_response and not force_refresh:
154            return self._connection_response
155
156        self._connection_response = api_util.get_connection(
157            workspace_id=self.workspace.workspace_id,
158            api_root=self.workspace.api_root,
159            api_key=self.workspace.api_key,
160            connection_id=self.connection.connection_id,
161        )
162        return self._connection_response
163
164    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
165        """Return the destination configuration for the sync job."""
166        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
167        destination_response = api_util.get_destination(
168            destination_id=connection_info.destination_id,
169            api_root=self.workspace.api_root,
170            api_key=self.workspace.api_key,
171        )
172        return destination_response.configuration
173
174    def is_job_complete(self) -> bool:
175        """Check if the sync job is complete."""
176        return self.get_job_status() in FINAL_STATUSES
177
178    def get_job_status(self) -> JobStatusEnum:
179        """Check if the sync job is still running."""
180        return self._fetch_latest_job_info().status
181
182    def _fetch_latest_job_info(self) -> JobResponse:
183        """Return the job info for the sync job."""
184        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
185            return self._latest_job_info
186
187        self._latest_job_info = api_util.get_job_info(
188            job_id=self.job_id,
189            api_root=self.workspace.api_root,
190            api_key=self.workspace.api_key,
191        )
192        return self._latest_job_info
193
194    @property
195    def bytes_synced(self) -> int:
196        """Return the number of records processed."""
197        return self._fetch_latest_job_info().bytes_synced
198
199    @property
200    def records_synced(self) -> int:
201        """Return the number of records processed."""
202        return self._fetch_latest_job_info().rows_synced
203
204    @property
205    def start_time(self) -> datetime:
206        """Return the start time of the sync job in UTC."""
207        # Parse from ISO 8601 format:
208        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
209
210    def raise_failure_status(
211        self,
212        *,
213        refresh_status: bool = False,
214    ) -> None:
215        """Raise an exception if the sync job failed.
216
217        By default, this method will use the latest status available. If you want to refresh the
218        status before checking for failure, set `refresh_status=True`. If the job has failed, this
219        method will raise a `AirbyteConnectionSyncError`.
220
221        Otherwise, do nothing.
222        """
223        if not refresh_status and self._latest_job_info:
224            latest_status = self._latest_job_info.status
225        else:
226            latest_status = self.get_job_status()
227
228        if latest_status in FAILED_STATUSES:
229            raise AirbyteConnectionSyncError(
230                workspace=self.workspace,
231                connection_id=self.connection.connection_id,
232                job_id=self.job_id,
233                job_status=self.get_job_status(),
234            )
235
236    def wait_for_completion(
237        self,
238        *,
239        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
240        raise_timeout: bool = True,
241        raise_failure: bool = False,
242    ) -> JobStatusEnum:
243        """Wait for a job to finish running."""
244        start_time = time.time()
245        while True:
246            latest_status = self.get_job_status()
247            if latest_status in FINAL_STATUSES:
248                if raise_failure:
249                    # No-op if the job succeeded or is still running:
250                    self.raise_failure_status()
251
252                return latest_status
253
254            if time.time() - start_time > wait_timeout:
255                if raise_timeout:
256                    raise AirbyteConnectionSyncTimeoutError(
257                        workspace=self.workspace,
258                        connection_id=self.connection.connection_id,
259                        job_id=self.job_id,
260                        job_status=latest_status,
261                        timeout=wait_timeout,
262                    )
263
264                return latest_status  # This will be a non-final status
265
266            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
267
268    def get_sql_cache(self) -> CacheBase:
269        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
270        if self._cache:
271            return self._cache
272
273        destination_configuration: dict[str, Any] = self._get_destination_configuration()
274        self._cache = create_cache_from_destination_config(
275            destination_configuration=destination_configuration
276        )
277        return self._cache
278
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        return self.get_sql_cache().get_sql_engine()
282
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
286
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        return self.get_sql_cache().processor.get_sql_table(stream_name)
293
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298
299        TODO: In a future iteration, we can consider providing stream configuration information
300              (catalog information) to the `CachedDataset` object via the "Get stream properties"
301              API: https://reference.airbyte.com/reference/getstreamproperties
302        """
303        return CachedDataset(
304            self.get_sql_cache(),
305            stream_name=stream_name,
306            stream_configuration=False,  # Don't look for stream configuration in cache.
307        )
308
309    def get_sql_database_name(self) -> str:
310        """Return the SQL database name."""
311        cache = self.get_sql_cache()
312        return cache.get_database_name()
313
314    def get_sql_schema_name(self) -> str:
315        """Return the SQL schema name."""
316        cache = self.get_sql_cache()
317        return cache.schema_name
318
319    @property
320    def stream_names(self) -> list[str]:
321        """Return the set of stream names."""
322        return self.connection.stream_names
323
324    @final
325    @property
326    def streams(
327        self,
328    ) -> _SyncResultStreams:
329        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
330
331        This is a convenience wrapper around the `stream_names`
332        property and `get_dataset()` method.
333        """
334        return self._SyncResultStreams(self)
335
336    class _SyncResultStreams(Mapping[str, CachedDataset]):
337        """A mapping of stream names to cached datasets."""
338
339        def __init__(
340            self,
341            parent: SyncResult,
342            /,
343        ) -> None:
344            self.parent: SyncResult = parent
345
346        def __getitem__(self, key: str) -> CachedDataset:
347            return self.parent.get_dataset(stream_name=key)
348
349        def __iter__(self) -> Iterator[str]:
350            return iter(self.parent.stream_names)
351
352        def __len__(self) -> int:
353            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: str, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: str
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"

Return the URL of the sync job.

def is_job_complete(self) -> bool:
174    def is_job_complete(self) -> bool:
175        """Check if the sync job is complete."""
176        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
178    def get_job_status(self) -> JobStatusEnum:
179        """Check if the sync job is still running."""
180        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
194    @property
195    def bytes_synced(self) -> int:
196        """Return the number of records processed."""
197        return self._fetch_latest_job_info().bytes_synced

Return the number of records processed.

records_synced: int
199    @property
200    def records_synced(self) -> int:
201        """Return the number of records processed."""
202        return self._fetch_latest_job_info().rows_synced

Return the number of records processed.

start_time: datetime.datetime
204    @property
205    def start_time(self) -> datetime:
206        """Return the start time of the sync job in UTC."""
207        # Parse from ISO 8601 format:
208        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
210    def raise_failure_status(
211        self,
212        *,
213        refresh_status: bool = False,
214    ) -> None:
215        """Raise an exception if the sync job failed.
216
217        By default, this method will use the latest status available. If you want to refresh the
218        status before checking for failure, set `refresh_status=True`. If the job has failed, this
219        method will raise a `AirbyteConnectionSyncError`.
220
221        Otherwise, do nothing.
222        """
223        if not refresh_status and self._latest_job_info:
224            latest_status = self._latest_job_info.status
225        else:
226            latest_status = self.get_job_status()
227
228        if latest_status in FAILED_STATUSES:
229            raise AirbyteConnectionSyncError(
230                workspace=self.workspace,
231                connection_id=self.connection.connection_id,
232                job_id=self.job_id,
233                job_status=self.get_job_status(),
234            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
236    def wait_for_completion(
237        self,
238        *,
239        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
240        raise_timeout: bool = True,
241        raise_failure: bool = False,
242    ) -> JobStatusEnum:
243        """Wait for a job to finish running."""
244        start_time = time.time()
245        while True:
246            latest_status = self.get_job_status()
247            if latest_status in FINAL_STATUSES:
248                if raise_failure:
249                    # No-op if the job succeeded or is still running:
250                    self.raise_failure_status()
251
252                return latest_status
253
254            if time.time() - start_time > wait_timeout:
255                if raise_timeout:
256                    raise AirbyteConnectionSyncTimeoutError(
257                        workspace=self.workspace,
258                        connection_id=self.connection.connection_id,
259                        job_id=self.job_id,
260                        job_status=latest_status,
261                        timeout=wait_timeout,
262                    )
263
264                return latest_status  # This will be a non-final status
265
266            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
268    def get_sql_cache(self) -> CacheBase:
269        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
270        if self._cache:
271            return self._cache
272
273        destination_configuration: dict[str, Any] = self._get_destination_configuration()
274        self._cache = create_cache_from_destination_config(
275            destination_configuration=destination_configuration
276        )
277        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298
299        TODO: In a future iteration, we can consider providing stream configuration information
300              (catalog information) to the `CachedDataset` object via the "Get stream properties"
301              API: https://reference.airbyte.com/reference/getstreamproperties
302        """
303        return CachedDataset(
304            self.get_sql_cache(),
305            stream_name=stream_name,
306            stream_configuration=False,  # Don't look for stream configuration in cache.
307        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
309    def get_sql_database_name(self) -> str:
310        """Return the SQL database name."""
311        cache = self.get_sql_cache()
312        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
314    def get_sql_schema_name(self) -> str:
315        """Return the SQL schema name."""
316        cache = self.get_sql_cache()
317        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
319    @property
320    def stream_names(self) -> list[str]:
321        """Return the set of stream names."""
322        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
324    @final
325    @property
326    def streams(
327        self,
328    ) -> _SyncResultStreams:
329        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
330
331        This is a convenience wrapper around the `stream_names`
332        property and `get_dataset()` method.
333        """
334        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 7class JobStatusEnum(str, Enum):
 8    PENDING = 'pending'
 9    RUNNING = 'running'
10    INCOMPLETE = 'incomplete'
11    FAILED = 'failed'
12    SUCCEEDED = 'succeeded'
13    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans