airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import dataclass
106from datetime import datetime
107from typing import TYPE_CHECKING, Any, final
108
109from airbyte._util import api_util
110from airbyte.cloud._destination_util import create_cache_from_destination_config
111from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
112from airbyte.datasets import CachedDataset
113from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
114
115
116DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
117"""The default timeout for waiting for a sync job to complete, in seconds."""
118
119if TYPE_CHECKING:
120    import sqlalchemy
121
122    from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
123    from airbyte.caches.base import CacheBase
124    from airbyte.cloud.connections import CloudConnection
125    from airbyte.cloud.workspaces import CloudWorkspace
126
127
128@dataclass
129class SyncResult:
130    """The result of a sync operation.
131
132    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
133    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
134    """
135
136    workspace: CloudWorkspace
137    connection: CloudConnection
138    job_id: str
139    table_name_prefix: str = ""
140    table_name_suffix: str = ""
141    _latest_job_info: JobResponse | None = None
142    _connection_response: ConnectionResponse | None = None
143    _cache: CacheBase | None = None
144
145    @property
146    def job_url(self) -> str:
147        """Return the URL of the sync job."""
148        return f"{self.connection.job_history_url}/{self.job_id}"
149
150    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
151        """Return connection info for the sync job."""
152        if self._connection_response and not force_refresh:
153            return self._connection_response
154
155        self._connection_response = api_util.get_connection(
156            workspace_id=self.workspace.workspace_id,
157            api_root=self.workspace.api_root,
158            api_key=self.workspace.api_key,
159            connection_id=self.connection.connection_id,
160        )
161        return self._connection_response
162
163    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
164        """Return the destination configuration for the sync job."""
165        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
166        destination_response = api_util.get_destination(
167            destination_id=connection_info.destination_id,
168            api_root=self.workspace.api_root,
169            api_key=self.workspace.api_key,
170        )
171        return destination_response.configuration
172
173    def is_job_complete(self) -> bool:
174        """Check if the sync job is complete."""
175        return self.get_job_status() in FINAL_STATUSES
176
177    def get_job_status(self) -> JobStatusEnum:
178        """Check if the sync job is still running."""
179        return self._fetch_latest_job_info().status
180
181    def _fetch_latest_job_info(self) -> JobResponse:
182        """Return the job info for the sync job."""
183        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
184            return self._latest_job_info
185
186        self._latest_job_info = api_util.get_job_info(
187            job_id=self.job_id,
188            api_root=self.workspace.api_root,
189            api_key=self.workspace.api_key,
190        )
191        return self._latest_job_info
192
193    @property
194    def bytes_synced(self) -> int:
195        """Return the number of records processed."""
196        return self._fetch_latest_job_info().bytes_synced
197
198    @property
199    def records_synced(self) -> int:
200        """Return the number of records processed."""
201        return self._fetch_latest_job_info().rows_synced
202
203    @property
204    def start_time(self) -> datetime:
205        """Return the start time of the sync job in UTC."""
206        # Parse from ISO 8601 format:
207        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
208
209    def raise_failure_status(
210        self,
211        *,
212        refresh_status: bool = False,
213    ) -> None:
214        """Raise an exception if the sync job failed.
215
216        By default, this method will use the latest status available. If you want to refresh the
217        status before checking for failure, set `refresh_status=True`. If the job has failed, this
218        method will raise a `AirbyteConnectionSyncError`.
219
220        Otherwise, do nothing.
221        """
222        if not refresh_status and self._latest_job_info:
223            latest_status = self._latest_job_info.status
224        else:
225            latest_status = self.get_job_status()
226
227        if latest_status in FAILED_STATUSES:
228            raise AirbyteConnectionSyncError(
229                workspace=self.workspace,
230                connection_id=self.connection.connection_id,
231                job_id=self.job_id,
232                job_status=self.get_job_status(),
233            )
234
235    def wait_for_completion(
236        self,
237        *,
238        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
239        raise_timeout: bool = True,
240        raise_failure: bool = False,
241    ) -> JobStatusEnum:
242        """Wait for a job to finish running."""
243        start_time = time.time()
244        while True:
245            latest_status = self.get_job_status()
246            if latest_status in FINAL_STATUSES:
247                if raise_failure:
248                    # No-op if the job succeeded or is still running:
249                    self.raise_failure_status()
250
251                return latest_status
252
253            if time.time() - start_time > wait_timeout:
254                if raise_timeout:
255                    raise AirbyteConnectionSyncTimeoutError(
256                        workspace=self.workspace,
257                        connection_id=self.connection.connection_id,
258                        job_id=self.job_id,
259                        job_status=latest_status,
260                        timeout=wait_timeout,
261                    )
262
263                return latest_status  # This will be a non-final status
264
265            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
266
267    def get_sql_cache(self) -> CacheBase:
268        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
269        if self._cache:
270            return self._cache
271
272        destination_configuration: dict[str, Any] = self._get_destination_configuration()
273        self._cache = create_cache_from_destination_config(
274            destination_configuration=destination_configuration
275        )
276        return self._cache
277
278    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
279        """Return a SQL Engine for querying a SQL-based destination."""
280        return self.get_sql_cache().get_sql_engine()
281
282    def get_sql_table_name(self, stream_name: str) -> str:
283        """Return the SQL table name of the named stream."""
284        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
285
286    def get_sql_table(
287        self,
288        stream_name: str,
289    ) -> sqlalchemy.Table:
290        """Return a SQLAlchemy table object for the named stream."""
291        return self.get_sql_cache().processor.get_sql_table(stream_name)
292
293    def get_dataset(self, stream_name: str) -> CachedDataset:
294        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
295
296        This can be used to read and analyze the data in a SQL-based destination.
297
298        TODO: In a future iteration, we can consider providing stream configuration information
299              (catalog information) to the `CachedDataset` object via the "Get stream properties"
300              API: https://reference.airbyte.com/reference/getstreamproperties
301        """
302        return CachedDataset(
303            self.get_sql_cache(),
304            stream_name=stream_name,
305            stream_configuration=False,  # Don't look for stream configuration in cache.
306        )
307
308    def get_sql_database_name(self) -> str:
309        """Return the SQL database name."""
310        cache = self.get_sql_cache()
311        return cache.get_database_name()
312
313    def get_sql_schema_name(self) -> str:
314        """Return the SQL schema name."""
315        cache = self.get_sql_cache()
316        return cache.schema_name
317
318    @property
319    def stream_names(self) -> list[str]:
320        """Return the set of stream names."""
321        return self.connection.stream_names
322
323    @final
324    @property
325    def streams(
326        self,
327    ) -> _SyncResultStreams:
328        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
329
330        This is a convenience wrapper around the `stream_names`
331        property and `get_dataset()` method.
332        """
333        return self._SyncResultStreams(self)
334
335    class _SyncResultStreams(Mapping[str, CachedDataset]):
336        """A mapping of stream names to cached datasets."""
337
338        def __init__(
339            self,
340            parent: SyncResult,
341            /,
342        ) -> None:
343            self.parent: SyncResult = parent
344
345        def __getitem__(self, key: str) -> CachedDataset:
346            return self.parent.get_dataset(stream_name=key)
347
348        def __iter__(self) -> Iterator[str]:
349            return iter(self.parent.stream_names)
350
351        def __len__(self) -> int:
352            return len(self.parent.stream_names)
353
354
355__all__ = [
356    "SyncResult",
357]
@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: str
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"
150
151    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
152        """Return connection info for the sync job."""
153        if self._connection_response and not force_refresh:
154            return self._connection_response
155
156        self._connection_response = api_util.get_connection(
157            workspace_id=self.workspace.workspace_id,
158            api_root=self.workspace.api_root,
159            api_key=self.workspace.api_key,
160            connection_id=self.connection.connection_id,
161        )
162        return self._connection_response
163
164    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
165        """Return the destination configuration for the sync job."""
166        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
167        destination_response = api_util.get_destination(
168            destination_id=connection_info.destination_id,
169            api_root=self.workspace.api_root,
170            api_key=self.workspace.api_key,
171        )
172        return destination_response.configuration
173
174    def is_job_complete(self) -> bool:
175        """Check if the sync job is complete."""
176        return self.get_job_status() in FINAL_STATUSES
177
178    def get_job_status(self) -> JobStatusEnum:
179        """Check if the sync job is still running."""
180        return self._fetch_latest_job_info().status
181
182    def _fetch_latest_job_info(self) -> JobResponse:
183        """Return the job info for the sync job."""
184        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
185            return self._latest_job_info
186
187        self._latest_job_info = api_util.get_job_info(
188            job_id=self.job_id,
189            api_root=self.workspace.api_root,
190            api_key=self.workspace.api_key,
191        )
192        return self._latest_job_info
193
194    @property
195    def bytes_synced(self) -> int:
196        """Return the number of records processed."""
197        return self._fetch_latest_job_info().bytes_synced
198
199    @property
200    def records_synced(self) -> int:
201        """Return the number of records processed."""
202        return self._fetch_latest_job_info().rows_synced
203
204    @property
205    def start_time(self) -> datetime:
206        """Return the start time of the sync job in UTC."""
207        # Parse from ISO 8601 format:
208        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
209
210    def raise_failure_status(
211        self,
212        *,
213        refresh_status: bool = False,
214    ) -> None:
215        """Raise an exception if the sync job failed.
216
217        By default, this method will use the latest status available. If you want to refresh the
218        status before checking for failure, set `refresh_status=True`. If the job has failed, this
219        method will raise a `AirbyteConnectionSyncError`.
220
221        Otherwise, do nothing.
222        """
223        if not refresh_status and self._latest_job_info:
224            latest_status = self._latest_job_info.status
225        else:
226            latest_status = self.get_job_status()
227
228        if latest_status in FAILED_STATUSES:
229            raise AirbyteConnectionSyncError(
230                workspace=self.workspace,
231                connection_id=self.connection.connection_id,
232                job_id=self.job_id,
233                job_status=self.get_job_status(),
234            )
235
236    def wait_for_completion(
237        self,
238        *,
239        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
240        raise_timeout: bool = True,
241        raise_failure: bool = False,
242    ) -> JobStatusEnum:
243        """Wait for a job to finish running."""
244        start_time = time.time()
245        while True:
246            latest_status = self.get_job_status()
247            if latest_status in FINAL_STATUSES:
248                if raise_failure:
249                    # No-op if the job succeeded or is still running:
250                    self.raise_failure_status()
251
252                return latest_status
253
254            if time.time() - start_time > wait_timeout:
255                if raise_timeout:
256                    raise AirbyteConnectionSyncTimeoutError(
257                        workspace=self.workspace,
258                        connection_id=self.connection.connection_id,
259                        job_id=self.job_id,
260                        job_status=latest_status,
261                        timeout=wait_timeout,
262                    )
263
264                return latest_status  # This will be a non-final status
265
266            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
267
268    def get_sql_cache(self) -> CacheBase:
269        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
270        if self._cache:
271            return self._cache
272
273        destination_configuration: dict[str, Any] = self._get_destination_configuration()
274        self._cache = create_cache_from_destination_config(
275            destination_configuration=destination_configuration
276        )
277        return self._cache
278
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        return self.get_sql_cache().get_sql_engine()
282
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
286
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        return self.get_sql_cache().processor.get_sql_table(stream_name)
293
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298
299        TODO: In a future iteration, we can consider providing stream configuration information
300              (catalog information) to the `CachedDataset` object via the "Get stream properties"
301              API: https://reference.airbyte.com/reference/getstreamproperties
302        """
303        return CachedDataset(
304            self.get_sql_cache(),
305            stream_name=stream_name,
306            stream_configuration=False,  # Don't look for stream configuration in cache.
307        )
308
309    def get_sql_database_name(self) -> str:
310        """Return the SQL database name."""
311        cache = self.get_sql_cache()
312        return cache.get_database_name()
313
314    def get_sql_schema_name(self) -> str:
315        """Return the SQL schema name."""
316        cache = self.get_sql_cache()
317        return cache.schema_name
318
319    @property
320    def stream_names(self) -> list[str]:
321        """Return the set of stream names."""
322        return self.connection.stream_names
323
324    @final
325    @property
326    def streams(
327        self,
328    ) -> _SyncResultStreams:
329        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
330
331        This is a convenience wrapper around the `stream_names`
332        property and `get_dataset()` method.
333        """
334        return self._SyncResultStreams(self)
335
336    class _SyncResultStreams(Mapping[str, CachedDataset]):
337        """A mapping of stream names to cached datasets."""
338
339        def __init__(
340            self,
341            parent: SyncResult,
342            /,
343        ) -> None:
344            self.parent: SyncResult = parent
345
346        def __getitem__(self, key: str) -> CachedDataset:
347            return self.parent.get_dataset(stream_name=key)
348
349        def __iter__(self) -> Iterator[str]:
350            return iter(self.parent.stream_names)
351
352        def __len__(self) -> int:
353            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: str, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
job_id: str
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"

Return the URL of the sync job.

def is_job_complete(self) -> bool:
174    def is_job_complete(self) -> bool:
175        """Check if the sync job is complete."""
176        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
178    def get_job_status(self) -> JobStatusEnum:
179        """Check if the sync job is still running."""
180        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
194    @property
195    def bytes_synced(self) -> int:
196        """Return the number of records processed."""
197        return self._fetch_latest_job_info().bytes_synced

Return the number of records processed.

records_synced: int
199    @property
200    def records_synced(self) -> int:
201        """Return the number of records processed."""
202        return self._fetch_latest_job_info().rows_synced

Return the number of records processed.

start_time: datetime.datetime
204    @property
205    def start_time(self) -> datetime:
206        """Return the start time of the sync job in UTC."""
207        # Parse from ISO 8601 format:
208        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
210    def raise_failure_status(
211        self,
212        *,
213        refresh_status: bool = False,
214    ) -> None:
215        """Raise an exception if the sync job failed.
216
217        By default, this method will use the latest status available. If you want to refresh the
218        status before checking for failure, set `refresh_status=True`. If the job has failed, this
219        method will raise a `AirbyteConnectionSyncError`.
220
221        Otherwise, do nothing.
222        """
223        if not refresh_status and self._latest_job_info:
224            latest_status = self._latest_job_info.status
225        else:
226            latest_status = self.get_job_status()
227
228        if latest_status in FAILED_STATUSES:
229            raise AirbyteConnectionSyncError(
230                workspace=self.workspace,
231                connection_id=self.connection.connection_id,
232                job_id=self.job_id,
233                job_status=self.get_job_status(),
234            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
236    def wait_for_completion(
237        self,
238        *,
239        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
240        raise_timeout: bool = True,
241        raise_failure: bool = False,
242    ) -> JobStatusEnum:
243        """Wait for a job to finish running."""
244        start_time = time.time()
245        while True:
246            latest_status = self.get_job_status()
247            if latest_status in FINAL_STATUSES:
248                if raise_failure:
249                    # No-op if the job succeeded or is still running:
250                    self.raise_failure_status()
251
252                return latest_status
253
254            if time.time() - start_time > wait_timeout:
255                if raise_timeout:
256                    raise AirbyteConnectionSyncTimeoutError(
257                        workspace=self.workspace,
258                        connection_id=self.connection.connection_id,
259                        job_id=self.job_id,
260                        job_status=latest_status,
261                        timeout=wait_timeout,
262                    )
263
264                return latest_status  # This will be a non-final status
265
266            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
268    def get_sql_cache(self) -> CacheBase:
269        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
270        if self._cache:
271            return self._cache
272
273        destination_configuration: dict[str, Any] = self._get_destination_configuration()
274        self._cache = create_cache_from_destination_config(
275            destination_configuration=destination_configuration
276        )
277        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298
299        TODO: In a future iteration, we can consider providing stream configuration information
300              (catalog information) to the `CachedDataset` object via the "Get stream properties"
301              API: https://reference.airbyte.com/reference/getstreamproperties
302        """
303        return CachedDataset(
304            self.get_sql_cache(),
305            stream_name=stream_name,
306            stream_configuration=False,  # Don't look for stream configuration in cache.
307        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
309    def get_sql_database_name(self) -> str:
310        """Return the SQL database name."""
311        cache = self.get_sql_cache()
312        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
314    def get_sql_schema_name(self) -> str:
315        """Return the SQL schema name."""
316        cache = self.get_sql_cache()
317        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
319    @property
320    def stream_names(self) -> list[str]:
321        """Return the set of stream names."""
322        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
324    @final
325    @property
326    def streams(
327        self,
328    ) -> _SyncResultStreams:
329        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
330
331        This is a convenience wrapper around the `stream_names`
332        property and `get_dataset()` method.
333        """
334        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.