airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Examples

Basic Usage Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

ℹ️ Experimental Features

You can use the airbyte.cloud.experimental module to access experimental features. These additional features are subject to change and may not be available in all environments.

 1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
 3
 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
 5
 6## Examples
 7
 8### Basic Usage Example:
 9
10```python
11import airbyte as ab
12from airbyte import cloud
13
14# Initialize an Airbyte Cloud workspace object
15workspace = cloud.CloudWorkspace(
16    workspace_id="123",
17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
18)
19
20# Run a sync job on Airbyte Cloud
21connection = workspace.get_connection(connection_id="456")
22sync_result = connection.run_sync()
23print(sync_result.get_job_status())
24```
25
26### Example Read From Cloud Destination:
27
28If your destination is supported, you can read records directly from the
29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
30
31
32```python
33# Assuming we've already created a `connection` object...
34
35# Get the latest job result and print the stream names
36sync_result = connection.get_sync_result()
37print(sync_result.stream_names)
38
39# Get a dataset from the sync result
40dataset: CachedDataset = sync_result.get_dataset("users")
41
42# Get a SQLAlchemy table to use in SQL queries...
43users_table = dataset.to_sql_table()
44print(f"Table name: {users_table.name}")
45
46# Or iterate over the dataset directly
47for record in dataset:
48    print(record)
49```
50
51ℹ️ **Experimental Features**
52
53You can use the `airbyte.cloud.experimental` module to access experimental features.
54These additional features are subject to change and may not be available in all environments.
55"""  # noqa: RUF002  # Allow emoji
56
57from __future__ import annotations
58
59from airbyte.cloud import connections, constants, sync_results, workspaces
60from airbyte.cloud.connections import CloudConnection
61from airbyte.cloud.constants import JobStatusEnum
62from airbyte.cloud.sync_results import SyncResult
63from airbyte.cloud.workspaces import CloudWorkspace
64
65
66__all__ = [
67    # Submodules
68    "workspaces",
69    "connections",
70    "constants",
71    "sync_results",
72    # Classes
73    "CloudWorkspace",
74    "CloudConnection",
75    "SyncResult",
76    # Enums
77    "JobStatusEnum",
78]
@dataclass
class CloudWorkspace:
 36@dataclass
 37class CloudWorkspace:
 38    """A remote workspace on the Airbyte Cloud.
 39
 40    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 41    instances, both OSS and Enterprise.
 42    """
 43
 44    workspace_id: str
 45    api_key: str
 46    api_root: str = CLOUD_API_ROOT
 47
 48    @property
 49    def workspace_url(self) -> str | None:
 50        return f"{self.api_root}/workspaces/{self.workspace_id}"
 51
 52    # Test connection and creds
 53
 54    def connect(self) -> None:
 55        """Check that the workspace is reachable and raise an exception otherwise.
 56
 57        Note: It is not necessary to call this method before calling other operations. It
 58              serves primarily as a simple check to ensure that the workspace is reachable
 59              and credentials are correct.
 60        """
 61        _ = get_workspace(
 62            api_root=self.api_root,
 63            api_key=self.api_key,
 64            workspace_id=self.workspace_id,
 65        )
 66        print(f"Successfully connected to workspace: {self.workspace_url}")
 67
 68    # Deploy and delete sources
 69
 70    # TODO: Make this a public API
 71    def _deploy_source(
 72        self,
 73        source: Source,
 74    ) -> str:
 75        """Deploy a source to the workspace.
 76
 77        Returns the newly deployed source ID.
 78        """
 79        source_configuration = source.get_config().copy()
 80        source_configuration["sourceType"] = source.name.replace("source-", "")
 81
 82        deployed_source = create_source(
 83            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
 84            api_root=self.api_root,
 85            api_key=self.api_key,
 86            workspace_id=self.workspace_id,
 87            config=source_configuration,
 88        )
 89
 90        # Set the deployment Ids on the source object
 91        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
 92        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
 93        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
 94
 95        return deployed_source.source_id
 96
 97    def _permanently_delete_source(
 98        self,
 99        source: str | Source,
100    ) -> None:
101        """Delete a source from the workspace.
102
103        You can pass either the source ID `str` or a deployed `Source` object.
104        """
105        if not isinstance(source, (str, Source)):
106            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
107
108        if isinstance(source, Source):
109            if not source._deployed_source_id:  # noqa: SLF001
110                raise ValueError("Source has not been deployed.")  # noqa: TRY003
111
112            source_id = source._deployed_source_id  # noqa: SLF001
113
114        elif isinstance(source, str):
115            source_id = source
116
117        delete_source(
118            source_id=source_id,
119            api_root=self.api_root,
120            api_key=self.api_key,
121        )
122
123    # Deploy and delete destinations
124
125    # TODO: Make this a public API
126    def _deploy_cache_as_destination(
127        self,
128        cache: CacheBase,
129    ) -> str:
130        """Deploy a cache to the workspace as a new destination.
131
132        Returns the newly deployed destination ID.
133        """
134        cache_type_name = cache.__class__.__name__.replace("Cache", "")
135
136        deployed_destination: DestinationResponse = create_destination(
137            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
138            api_root=self.api_root,
139            api_key=self.api_key,
140            workspace_id=self.workspace_id,
141            config=get_destination_config_from_cache(cache),
142        )
143
144        # Set the deployment Ids on the source object
145        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
146        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
147        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
148
149        return deployed_destination.destination_id
150
151    def _permanently_delete_destination(
152        self,
153        *,
154        destination: str | None = None,
155        cache: CacheBase | None = None,
156    ) -> None:
157        """Delete a deployed destination from the workspace.
158
159        You can pass either the `Cache` class or the deployed destination ID as a `str`.
160        """
161        if destination is None and cache is None:
162            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
163        if destination is not None and cache is not None:
164            raise ValueError(  # noqa: TRY003
165                "You must provide either a destination ID or a cache object, not both."
166            )
167
168        if cache:
169            if not cache._deployed_destination_id:  # noqa: SLF001
170                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
171
172            destination = cache._deployed_destination_id  # noqa: SLF001
173
174        if destination is None:
175            raise ValueError("No destination ID provided.")  # noqa: TRY003
176
177        delete_destination(
178            destination_id=destination,
179            api_root=self.api_root,
180            api_key=self.api_key,
181        )
182
183    # Deploy and delete connections
184
185    # TODO: Make this a public API
186    def _deploy_connection(
187        self,
188        source: Source | str,
189        cache: CacheBase | None = None,
190        destination: str | None = None,
191        table_prefix: str | None = None,
192        selected_streams: list[str] | None = None,
193    ) -> CloudConnection:
194        """Deploy a source and cache to the workspace as a new connection.
195
196        Returns the newly deployed connection ID as a `str`.
197
198        Args:
199            source (Source | str): The source to deploy. You can pass either an already deployed
200                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
201                it will be deployed automatically.
202            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
203                `cache` or `destination`, but not both.
204            destination (str, optional): The destination ID to use. You can provide
205                `cache` or `destination`, but not both.
206        """
207        # Resolve source ID
208        source_id: str
209        if isinstance(source, Source):
210            selected_streams = selected_streams or source.get_selected_streams()
211            if source._deployed_source_id:  # noqa: SLF001
212                source_id = source._deployed_source_id  # noqa: SLF001
213            else:
214                source_id = self._deploy_source(source)
215        else:
216            source_id = source
217            if not selected_streams:
218                raise exc.PyAirbyteInputError(
219                    guidance="You must provide `selected_streams` when deploying a source ID."
220                )
221
222        # Resolve destination ID
223        destination_id: str
224        if destination:
225            destination_id = destination
226        elif cache:
227            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
228            if not cache._deployed_destination_id:  # noqa: SLF001
229                destination_id = self._deploy_cache_as_destination(cache)
230            else:
231                destination_id = cache._deployed_destination_id  # noqa: SLF001
232        else:
233            raise exc.PyAirbyteInputError(
234                guidance="You must provide either a destination ID or a cache object."
235            )
236
237        assert source_id is not None
238        assert destination_id is not None
239
240        deployed_connection = create_connection(
241            name="Connection (Deployed by PyAirbyte)",
242            source_id=source_id,
243            destination_id=destination_id,
244            api_root=self.api_root,
245            api_key=self.api_key,
246            workspace_id=self.workspace_id,
247            selected_stream_names=selected_streams,
248            prefix=table_prefix or "",
249        )
250
251        if isinstance(source, Source):
252            source._deployed_api_root = self.api_root  # noqa: SLF001
253            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
254            source._deployed_source_id = source_id  # noqa: SLF001
255        if cache:
256            cache._deployed_api_root = self.api_root  # noqa: SLF001
257            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
258            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
259
260        return CloudConnection(
261            workspace=self,
262            connection_id=deployed_connection.connection_id,
263            source=deployed_connection.source_id,
264            destination=deployed_connection.destination_id,
265        )
266
267    def get_connection(
268        self,
269        connection_id: str,
270    ) -> CloudConnection:
271        """Get a connection by ID.
272
273        This method does not fetch data from the API. It returns a `CloudConnection` object,
274        which will be loaded lazily as needed.
275        """
276        return CloudConnection(
277            workspace=self,
278            connection_id=connection_id,
279        )
280
281    def _permanently_delete_connection(
282        self,
283        connection: str | CloudConnection,
284        *,
285        delete_source: bool = False,
286        delete_destination: bool = False,
287    ) -> None:
288        """Delete a deployed connection from the workspace."""
289        if connection is None:
290            raise ValueError("No connection ID provided.")  # noqa: TRY003
291
292        if isinstance(connection, str):
293            connection = CloudConnection(
294                workspace=self,
295                connection_id=connection,
296            )
297
298        delete_connection(
299            connection_id=connection.connection_id,
300            api_root=self.api_root,
301            api_key=self.api_key,
302            workspace_id=self.workspace_id,
303        )
304        if delete_source:
305            self._permanently_delete_source(source=connection.source_id)
306
307        if delete_destination:
308            self._permanently_delete_destination(destination=connection.destination_id)
309
310    # Run syncs
311
312    def run_sync(
313        self,
314        connection_id: str,
315        *,
316        wait: bool = True,
317        wait_timeout: int = 300,
318    ) -> SyncResult:
319        """Run a sync on a deployed connection."""
320        connection = CloudConnection(
321            workspace=self,
322            connection_id=connection_id,
323        )
324        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
325
326    # Get sync results and previous sync logs
327
328    def get_sync_result(
329        self,
330        connection_id: str,
331        job_id: str | None = None,
332    ) -> SyncResult | None:
333        """Get the sync result for a connection job.
334
335        If `job_id` is not provided, the most recent sync job will be used.
336
337        Returns `None` if job_id is omitted and no previous jobs are found.
338        """
339        connection = CloudConnection(
340            workspace=self,
341            connection_id=connection_id,
342        )
343        if job_id is None:
344            results = self.get_previous_sync_logs(
345                connection_id=connection_id,
346                limit=1,
347            )
348            if results:
349                return results[0]
350
351            return None
352        connection = CloudConnection(
353            workspace=self,
354            connection_id=connection_id,
355        )
356        return SyncResult(
357            workspace=self,
358            connection=connection,
359            job_id=job_id,
360        )
361
362    def get_previous_sync_logs(
363        self,
364        connection_id: str,
365        *,
366        limit: int = 10,
367    ) -> list[SyncResult]:
368        """Get the previous sync logs for a connection."""
369        connection = CloudConnection(
370            workspace=self,
371            connection_id=connection_id,
372        )
373        return connection.get_previous_sync_logs(
374            limit=limit,
375        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, api_key: str, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_key: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
48    @property
49    def workspace_url(self) -> str | None:
50        return f"{self.api_root}/workspaces/{self.workspace_id}"
def connect(self) -> None:
54    def connect(self) -> None:
55        """Check that the workspace is reachable and raise an exception otherwise.
56
57        Note: It is not necessary to call this method before calling other operations. It
58              serves primarily as a simple check to ensure that the workspace is reachable
59              and credentials are correct.
60        """
61        _ = get_workspace(
62            api_root=self.api_root,
63            api_key=self.api_key,
64            workspace_id=self.workspace_id,
65        )
66        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
267    def get_connection(
268        self,
269        connection_id: str,
270    ) -> CloudConnection:
271        """Get a connection by ID.
272
273        This method does not fetch data from the API. It returns a `CloudConnection` object,
274        which will be loaded lazily as needed.
275        """
276        return CloudConnection(
277            workspace=self,
278            connection_id=connection_id,
279        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def run_sync( self, connection_id: str, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
312    def run_sync(
313        self,
314        connection_id: str,
315        *,
316        wait: bool = True,
317        wait_timeout: int = 300,
318    ) -> SyncResult:
319        """Run a sync on a deployed connection."""
320        connection = CloudConnection(
321            workspace=self,
322            connection_id=connection_id,
323        )
324        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)

Run a sync on a deployed connection.

def get_sync_result( self, connection_id: str, job_id: str | None = None) -> SyncResult | None:
328    def get_sync_result(
329        self,
330        connection_id: str,
331        job_id: str | None = None,
332    ) -> SyncResult | None:
333        """Get the sync result for a connection job.
334
335        If `job_id` is not provided, the most recent sync job will be used.
336
337        Returns `None` if job_id is omitted and no previous jobs are found.
338        """
339        connection = CloudConnection(
340            workspace=self,
341            connection_id=connection_id,
342        )
343        if job_id is None:
344            results = self.get_previous_sync_logs(
345                connection_id=connection_id,
346                limit=1,
347            )
348            if results:
349                return results[0]
350
351            return None
352        connection = CloudConnection(
353            workspace=self,
354            connection_id=connection_id,
355        )
356        return SyncResult(
357            workspace=self,
358            connection=connection,
359            job_id=job_id,
360        )

Get the sync result for a connection job.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_previous_sync_logs( self, connection_id: str, *, limit: int = 10) -> list[SyncResult]:
362    def get_previous_sync_logs(
363        self,
364        connection_id: str,
365        *,
366        limit: int = 10,
367    ) -> list[SyncResult]:
368        """Get the previous sync logs for a connection."""
369        connection = CloudConnection(
370            workspace=self,
371            connection_id=connection_id,
372        )
373        return connection.get_previous_sync_logs(
374            limit=limit,
375        )

Get the previous sync logs for a connection.

class CloudConnection:
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49
 50    def _fetch_connection_info(self) -> ConnectionResponse:
 51        """Populate the connection with data from the API."""
 52        return api_util.get_connection(
 53            workspace_id=self.workspace.workspace_id,
 54            connection_id=self.connection_id,
 55            api_root=self.workspace.api_root,
 56            api_key=self.workspace.api_key,
 57        )
 58
 59    # Properties
 60
 61    @property
 62    def source_id(self) -> str:
 63        """The ID of the source."""
 64        if not self._source_id:
 65            if not self._connection_info:
 66                self._connection_info = self._fetch_connection_info()
 67
 68            self._source_id = self._connection_info.source_id
 69
 70        return cast(str, self._source_id)
 71
 72    @property
 73    def destination_id(self) -> str:
 74        """The ID of the destination."""
 75        if not self._destination_id:
 76            if not self._connection_info:
 77                self._connection_info = self._fetch_connection_info()
 78
 79            self._destination_id = self._connection_info.source_id
 80
 81        return cast(str, self._destination_id)
 82
 83    @property
 84    def stream_names(self) -> list[str]:
 85        """The stream names."""
 86        if not self._connection_info:
 87            self._connection_info = self._fetch_connection_info()
 88
 89        return [stream.name for stream in self._connection_info.configurations.streams]
 90
 91    @property
 92    def table_prefix(self) -> str:
 93        """The table prefix."""
 94        if not self._connection_info:
 95            self._connection_info = self._fetch_connection_info()
 96
 97        return self._connection_info.prefix
 98
 99    @property
100    def connection_url(self) -> str | None:
101        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
102
103    @property
104    def job_history_url(self) -> str | None:
105        return f"{self.connection_url}/job-history"
106
107    # Run Sync
108
109    def run_sync(
110        self,
111        *,
112        wait: bool = True,
113        wait_timeout: int = 300,
114    ) -> SyncResult:
115        """Run a sync."""
116        connection_response = api_util.run_connection(
117            connection_id=self.connection_id,
118            api_root=self.workspace.api_root,
119            api_key=self.workspace.api_key,
120            workspace_id=self.workspace.workspace_id,
121        )
122        sync_result = SyncResult(
123            workspace=self.workspace,
124            connection=self,
125            job_id=connection_response.job_id,
126        )
127
128        if wait:
129            sync_result.wait_for_completion(
130                wait_timeout=wait_timeout,
131                raise_failure=True,
132                raise_timeout=True,
133            )
134
135        return sync_result
136
137    # Logs
138
139    def get_previous_sync_logs(
140        self,
141        *,
142        limit: int = 10,
143    ) -> list[SyncResult]:
144        """Get the previous sync logs for a connection."""
145        sync_logs: list[JobResponse] = api_util.get_job_logs(
146            connection_id=self.connection_id,
147            api_root=self.workspace.api_root,
148            api_key=self.workspace.api_key,
149            workspace_id=self.workspace.workspace_id,
150            limit=limit,
151        )
152        return [
153            SyncResult(
154                workspace=self.workspace,
155                connection=self,
156                job_id=sync_log.job_id,
157                _latest_job_info=sync_log,
158            )
159            for sync_log in sync_logs
160        ]
161
162    def get_sync_result(
163        self,
164        job_id: str | None = None,
165    ) -> SyncResult | None:
166        """Get the sync result for the connection.
167
168        If `job_id` is not provided, the most recent sync job will be used.
169
170        Returns `None` if job_id is omitted and no previous jobs are found.
171        """
172        if job_id is None:
173            # Get the most recent sync job
174            results = self.get_previous_sync_logs(
175                limit=1,
176            )
177            if results:
178                return results[0]
179
180            return None
181
182        # Get the sync job by ID (lazy loaded)
183        return SyncResult(
184            workspace=self.workspace,
185            connection=self,
186            job_id=job_id,
187        )
188
189    # Deletions
190
191    def _permanently_delete(
192        self,
193        *,
194        delete_source: bool = False,
195        delete_destination: bool = False,
196    ) -> None:
197        """Delete the connection.
198
199        Args:
200            delete_source: Whether to also delete the source.
201            delete_destination: Whether to also delete the destination.
202        """
203        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
204            connection=self
205        )
206
207        if delete_source:
208            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
209                source=self.source_id
210            )
211
212        if delete_destination:
213            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
214                destination=self.destination_id,
215            )

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
25    def __init__(
26        self,
27        workspace: CloudWorkspace,
28        connection_id: str,
29        source: str | None = None,
30        destination: str | None = None,
31    ) -> None:
32        """It is not recommended to create a `CloudConnection` object directly.
33
34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
35        """
36        self.connection_id = connection_id
37        """The ID of the connection."""
38
39        self.workspace = workspace
40        """The workspace that the connection belongs to."""
41
42        self._source_id = source
43        """The ID of the source."""
44
45        self._destination_id = destination
46        """The ID of the destination."""
47
48        self._connection_info: ConnectionResponse | None = None

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
61    @property
62    def source_id(self) -> str:
63        """The ID of the source."""
64        if not self._source_id:
65            if not self._connection_info:
66                self._connection_info = self._fetch_connection_info()
67
68            self._source_id = self._connection_info.source_id
69
70        return cast(str, self._source_id)

The ID of the source.

destination_id: str
72    @property
73    def destination_id(self) -> str:
74        """The ID of the destination."""
75        if not self._destination_id:
76            if not self._connection_info:
77                self._connection_info = self._fetch_connection_info()
78
79            self._destination_id = self._connection_info.source_id
80
81        return cast(str, self._destination_id)

The ID of the destination.

stream_names: list[str]
83    @property
84    def stream_names(self) -> list[str]:
85        """The stream names."""
86        if not self._connection_info:
87            self._connection_info = self._fetch_connection_info()
88
89        return [stream.name for stream in self._connection_info.configurations.streams]

The stream names.

table_prefix: str
91    @property
92    def table_prefix(self) -> str:
93        """The table prefix."""
94        if not self._connection_info:
95            self._connection_info = self._fetch_connection_info()
96
97        return self._connection_info.prefix

The table prefix.

connection_url: str | None
 99    @property
100    def connection_url(self) -> str | None:
101        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
job_history_url: str | None
103    @property
104    def job_history_url(self) -> str | None:
105        return f"{self.connection_url}/job-history"
def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
109    def run_sync(
110        self,
111        *,
112        wait: bool = True,
113        wait_timeout: int = 300,
114    ) -> SyncResult:
115        """Run a sync."""
116        connection_response = api_util.run_connection(
117            connection_id=self.connection_id,
118            api_root=self.workspace.api_root,
119            api_key=self.workspace.api_key,
120            workspace_id=self.workspace.workspace_id,
121        )
122        sync_result = SyncResult(
123            workspace=self.workspace,
124            connection=self,
125            job_id=connection_response.job_id,
126        )
127
128        if wait:
129            sync_result.wait_for_completion(
130                wait_timeout=wait_timeout,
131                raise_failure=True,
132                raise_timeout=True,
133            )
134
135        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[SyncResult]:
139    def get_previous_sync_logs(
140        self,
141        *,
142        limit: int = 10,
143    ) -> list[SyncResult]:
144        """Get the previous sync logs for a connection."""
145        sync_logs: list[JobResponse] = api_util.get_job_logs(
146            connection_id=self.connection_id,
147            api_root=self.workspace.api_root,
148            api_key=self.workspace.api_key,
149            workspace_id=self.workspace.workspace_id,
150            limit=limit,
151        )
152        return [
153            SyncResult(
154                workspace=self.workspace,
155                connection=self,
156                job_id=sync_log.job_id,
157                _latest_job_info=sync_log,
158            )
159            for sync_log in sync_logs
160        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: str | None = None) -> SyncResult | None:
162    def get_sync_result(
163        self,
164        job_id: str | None = None,
165    ) -> SyncResult | None:
166        """Get the sync result for the connection.
167
168        If `job_id` is not provided, the most recent sync job will be used.
169
170        Returns `None` if job_id is omitted and no previous jobs are found.
171        """
172        if job_id is None:
173            # Get the most recent sync job
174            results = self.get_previous_sync_logs(
175                limit=1,
176            )
177            if results:
178                return results[0]
179
180            return None
181
182        # Get the sync job by ID (lazy loaded)
183        return SyncResult(
184            workspace=self.workspace,
185            connection=self,
186            job_id=job_id,
187        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: str
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"
150
151    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
152        """Return connection info for the sync job."""
153        if self._connection_response and not force_refresh:
154            return self._connection_response
155
156        self._connection_response = api_util.get_connection(
157            workspace_id=self.workspace.workspace_id,
158            api_root=self.workspace.api_root,
159            api_key=self.workspace.api_key,
160            connection_id=self.connection.connection_id,
161        )
162        return self._connection_response
163
164    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
165        """Return the destination configuration for the sync job."""
166        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
167        destination_response = api_util.get_destination(
168            destination_id=connection_info.destination_id,
169            api_root=self.workspace.api_root,
170            api_key=self.workspace.api_key,
171        )
172        return destination_response.configuration
173
174    def is_job_complete(self) -> bool:
175        """Check if the sync job is complete."""
176        return self.get_job_status() in FINAL_STATUSES
177
178    def get_job_status(self) -> JobStatusEnum:
179        """Check if the sync job is still running."""
180        return self._fetch_latest_job_info().status
181
182    def _fetch_latest_job_info(self) -> JobResponse:
183        """Return the job info for the sync job."""
184        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
185            return self._latest_job_info
186
187        self._latest_job_info = api_util.get_job_info(
188            job_id=self.job_id,
189            api_root=self.workspace.api_root,
190            api_key=self.workspace.api_key,
191        )
192        return self._latest_job_info
193
194    @property
195    def bytes_synced(self) -> int:
196        """Return the number of records processed."""
197        return self._fetch_latest_job_info().bytes_synced
198
199    @property
200    def records_synced(self) -> int:
201        """Return the number of records processed."""
202        return self._fetch_latest_job_info().rows_synced
203
204    @property
205    def start_time(self) -> datetime:
206        """Return the start time of the sync job in UTC."""
207        # Parse from ISO 8601 format:
208        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
209
210    def raise_failure_status(
211        self,
212        *,
213        refresh_status: bool = False,
214    ) -> None:
215        """Raise an exception if the sync job failed.
216
217        By default, this method will use the latest status available. If you want to refresh the
218        status before checking for failure, set `refresh_status=True`. If the job has failed, this
219        method will raise a `AirbyteConnectionSyncError`.
220
221        Otherwise, do nothing.
222        """
223        if not refresh_status and self._latest_job_info:
224            latest_status = self._latest_job_info.status
225        else:
226            latest_status = self.get_job_status()
227
228        if latest_status in FAILED_STATUSES:
229            raise AirbyteConnectionSyncError(
230                workspace=self.workspace,
231                connection_id=self.connection.connection_id,
232                job_id=self.job_id,
233                job_status=self.get_job_status(),
234            )
235
236    def wait_for_completion(
237        self,
238        *,
239        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
240        raise_timeout: bool = True,
241        raise_failure: bool = False,
242    ) -> JobStatusEnum:
243        """Wait for a job to finish running."""
244        start_time = time.time()
245        while True:
246            latest_status = self.get_job_status()
247            if latest_status in FINAL_STATUSES:
248                if raise_failure:
249                    # No-op if the job succeeded or is still running:
250                    self.raise_failure_status()
251
252                return latest_status
253
254            if time.time() - start_time > wait_timeout:
255                if raise_timeout:
256                    raise AirbyteConnectionSyncTimeoutError(
257                        workspace=self.workspace,
258                        connection_id=self.connection.connection_id,
259                        job_id=self.job_id,
260                        job_status=latest_status,
261                        timeout=wait_timeout,
262                    )
263
264                return latest_status  # This will be a non-final status
265
266            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
267
268    def get_sql_cache(self) -> CacheBase:
269        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
270        if self._cache:
271            return self._cache
272
273        destination_configuration: dict[str, Any] = self._get_destination_configuration()
274        self._cache = create_cache_from_destination_config(
275            destination_configuration=destination_configuration
276        )
277        return self._cache
278
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        self.get_sql_cache().get_sql_engine()
282
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
286
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        self.get_sql_cache().processor.get_sql_table(stream_name)
293
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298        """
299        return CachedDataset(self.get_sql_cache(), stream_name=stream_name)
300
301    def get_sql_database_name(self) -> str:
302        """Return the SQL database name."""
303        cache = self.get_sql_cache()
304        return cache.get_database_name()
305
306    def get_sql_schema_name(self) -> str:
307        """Return the SQL schema name."""
308        cache = self.get_sql_cache()
309        return cache.schema_name
310
311    @property
312    def stream_names(self) -> list[str]:
313        """Return the set of stream names."""
314        return self.connection.stream_names
315
316    @final
317    @property
318    def streams(
319        self,
320    ) -> _SyncResultStreams:
321        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
322
323        This is a convenience wrapper around the `stream_names`
324        property and `get_dataset()` method.
325        """
326        return self._SyncResultStreams(self)
327
328    class _SyncResultStreams(Mapping[str, CachedDataset]):
329        """A mapping of stream names to cached datasets."""
330
331        def __init__(
332            self,
333            parent: SyncResult,
334            /,
335        ) -> None:
336            self.parent: SyncResult = parent
337
338        def __getitem__(self, key: str) -> CachedDataset:
339            return self.parent.get_dataset(stream_name=key)
340
341        def __iter__(self) -> Iterator[str]:
342            """TODO"""
343            return iter(self.parent.stream_names)
344
345        def __len__(self) -> int:
346            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the airbyte.cloud.CloudWorkspace and airbyte.cloud.CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: str, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.base.CacheBase | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: str
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"

Return the URL of the sync job.

def is_job_complete(self) -> bool:
174    def is_job_complete(self) -> bool:
175        """Check if the sync job is complete."""
176        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> JobStatusEnum:
178    def get_job_status(self) -> JobStatusEnum:
179        """Check if the sync job is still running."""
180        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
194    @property
195    def bytes_synced(self) -> int:
196        """Return the number of records processed."""
197        return self._fetch_latest_job_info().bytes_synced

Return the number of records processed.

records_synced: int
199    @property
200    def records_synced(self) -> int:
201        """Return the number of records processed."""
202        return self._fetch_latest_job_info().rows_synced

Return the number of records processed.

start_time: datetime.datetime
204    @property
205    def start_time(self) -> datetime:
206        """Return the start time of the sync job in UTC."""
207        # Parse from ISO 8601 format:
208        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
210    def raise_failure_status(
211        self,
212        *,
213        refresh_status: bool = False,
214    ) -> None:
215        """Raise an exception if the sync job failed.
216
217        By default, this method will use the latest status available. If you want to refresh the
218        status before checking for failure, set `refresh_status=True`. If the job has failed, this
219        method will raise a `AirbyteConnectionSyncError`.
220
221        Otherwise, do nothing.
222        """
223        if not refresh_status and self._latest_job_info:
224            latest_status = self._latest_job_info.status
225        else:
226            latest_status = self.get_job_status()
227
228        if latest_status in FAILED_STATUSES:
229            raise AirbyteConnectionSyncError(
230                workspace=self.workspace,
231                connection_id=self.connection.connection_id,
232                job_id=self.job_id,
233                job_status=self.get_job_status(),
234            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> JobStatusEnum:
236    def wait_for_completion(
237        self,
238        *,
239        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
240        raise_timeout: bool = True,
241        raise_failure: bool = False,
242    ) -> JobStatusEnum:
243        """Wait for a job to finish running."""
244        start_time = time.time()
245        while True:
246            latest_status = self.get_job_status()
247            if latest_status in FINAL_STATUSES:
248                if raise_failure:
249                    # No-op if the job succeeded or is still running:
250                    self.raise_failure_status()
251
252                return latest_status
253
254            if time.time() - start_time > wait_timeout:
255                if raise_timeout:
256                    raise AirbyteConnectionSyncTimeoutError(
257                        workspace=self.workspace,
258                        connection_id=self.connection.connection_id,
259                        job_id=self.job_id,
260                        job_status=latest_status,
261                        timeout=wait_timeout,
262                    )
263
264                return latest_status  # This will be a non-final status
265
266            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.base.CacheBase:
268    def get_sql_cache(self) -> CacheBase:
269        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
270        if self._cache:
271            return self._cache
272
273        destination_configuration: dict[str, Any] = self._get_destination_configuration()
274        self._cache = create_cache_from_destination_config(
275            destination_configuration=destination_configuration
276        )
277        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.datasets._sql.CachedDataset:
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298        """
299        return CachedDataset(self.get_sql_cache(), stream_name=stream_name)

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

def get_sql_database_name(self) -> str:
301    def get_sql_database_name(self) -> str:
302        """Return the SQL database name."""
303        cache = self.get_sql_cache()
304        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
306    def get_sql_schema_name(self) -> str:
307        """Return the SQL schema name."""
308        cache = self.get_sql_cache()
309        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
311    @property
312    def stream_names(self) -> list[str]:
313        """Return the set of stream names."""
314        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
316    @final
317    @property
318    def streams(
319        self,
320    ) -> _SyncResultStreams:
321        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
322
323        This is a convenience wrapper around the `stream_names`
324        property and `get_dataset()` method.
325        """
326        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
 7class JobStatusEnum(str, Enum):
 8    PENDING = 'pending'
 9    RUNNING = 'running'
10    INCOMPLETE = 'incomplete'
11    FAILED = 'failed'
12    SUCCEEDED = 'succeeded'
13    CANCELLED = 'cancelled'

An enumeration.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans