airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import asdict, dataclass
106from datetime import datetime
107from typing import TYPE_CHECKING, Any, final
108
109from airbyte._util import api_util
110from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
111from airbyte.datasets import CachedDataset
112from airbyte.destinations._translate_dest_to_cache import destination_to_cache
113from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
114
115
116DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
117"""The default timeout for waiting for a sync job to complete, in seconds."""
118
119if TYPE_CHECKING:
120    import sqlalchemy
121
122    from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
123    from airbyte.caches.base import CacheBase
124    from airbyte.cloud.connections import CloudConnection
125    from airbyte.cloud.workspaces import CloudWorkspace
126
127
128@dataclass
129class SyncResult:
130    """The result of a sync operation.
131
132    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
133    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
134    """
135
136    workspace: CloudWorkspace
137    connection: CloudConnection
138    job_id: int
139    table_name_prefix: str = ""
140    table_name_suffix: str = ""
141    _latest_job_info: JobResponse | None = None
142    _connection_response: ConnectionResponse | None = None
143    _cache: CacheBase | None = None
144
145    @property
146    def job_url(self) -> str:
147        """Return the URL of the sync job."""
148        return f"{self.connection.job_history_url}/{self.job_id}"
149
150    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
151        """Return connection info for the sync job."""
152        if self._connection_response and not force_refresh:
153            return self._connection_response
154
155        self._connection_response = api_util.get_connection(
156            workspace_id=self.workspace.workspace_id,
157            api_root=self.workspace.api_root,
158            connection_id=self.connection.connection_id,
159            client_id=self.workspace.client_id,
160            client_secret=self.workspace.client_secret,
161        )
162        return self._connection_response
163
164    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
165        """Return the destination configuration for the sync job."""
166        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
167        destination_response = api_util.get_destination(
168            destination_id=connection_info.destination_id,
169            api_root=self.workspace.api_root,
170            client_id=self.workspace.client_id,
171            client_secret=self.workspace.client_secret,
172        )
173        return asdict(destination_response.configuration)
174
175    def is_job_complete(self) -> bool:
176        """Check if the sync job is complete."""
177        return self.get_job_status() in FINAL_STATUSES
178
179    def get_job_status(self) -> JobStatusEnum:
180        """Check if the sync job is still running."""
181        return self._fetch_latest_job_info().status
182
183    def _fetch_latest_job_info(self) -> JobResponse:
184        """Return the job info for the sync job."""
185        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
186            return self._latest_job_info
187
188        self._latest_job_info = api_util.get_job_info(
189            job_id=self.job_id,
190            api_root=self.workspace.api_root,
191            client_id=self.workspace.client_id,
192            client_secret=self.workspace.client_secret,
193        )
194        return self._latest_job_info
195
196    @property
197    def bytes_synced(self) -> int:
198        """Return the number of records processed."""
199        return self._fetch_latest_job_info().bytes_synced or 0
200
201    @property
202    def records_synced(self) -> int:
203        """Return the number of records processed."""
204        return self._fetch_latest_job_info().rows_synced or 0
205
206    @property
207    def start_time(self) -> datetime:
208        """Return the start time of the sync job in UTC."""
209        # Parse from ISO 8601 format:
210        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
211
212    def raise_failure_status(
213        self,
214        *,
215        refresh_status: bool = False,
216    ) -> None:
217        """Raise an exception if the sync job failed.
218
219        By default, this method will use the latest status available. If you want to refresh the
220        status before checking for failure, set `refresh_status=True`. If the job has failed, this
221        method will raise a `AirbyteConnectionSyncError`.
222
223        Otherwise, do nothing.
224        """
225        if not refresh_status and self._latest_job_info:
226            latest_status = self._latest_job_info.status
227        else:
228            latest_status = self.get_job_status()
229
230        if latest_status in FAILED_STATUSES:
231            raise AirbyteConnectionSyncError(
232                workspace=self.workspace,
233                connection_id=self.connection.connection_id,
234                job_id=self.job_id,
235                job_status=self.get_job_status(),
236            )
237
238    def wait_for_completion(
239        self,
240        *,
241        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
242        raise_timeout: bool = True,
243        raise_failure: bool = False,
244    ) -> JobStatusEnum:
245        """Wait for a job to finish running."""
246        start_time = time.time()
247        while True:
248            latest_status = self.get_job_status()
249            if latest_status in FINAL_STATUSES:
250                if raise_failure:
251                    # No-op if the job succeeded or is still running:
252                    self.raise_failure_status()
253
254                return latest_status
255
256            if time.time() - start_time > wait_timeout:
257                if raise_timeout:
258                    raise AirbyteConnectionSyncTimeoutError(
259                        workspace=self.workspace,
260                        connection_id=self.connection.connection_id,
261                        job_id=self.job_id,
262                        job_status=latest_status,
263                        timeout=wait_timeout,
264                    )
265
266                return latest_status  # This will be a non-final status
267
268            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
269
270    def get_sql_cache(self) -> CacheBase:
271        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
272        if self._cache:
273            return self._cache
274
275        destination_configuration = self._get_destination_configuration()
276        self._cache = destination_to_cache(destination_configuration=destination_configuration)
277        return self._cache
278
279    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
280        """Return a SQL Engine for querying a SQL-based destination."""
281        return self.get_sql_cache().get_sql_engine()
282
283    def get_sql_table_name(self, stream_name: str) -> str:
284        """Return the SQL table name of the named stream."""
285        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
286
287    def get_sql_table(
288        self,
289        stream_name: str,
290    ) -> sqlalchemy.Table:
291        """Return a SQLAlchemy table object for the named stream."""
292        return self.get_sql_cache().processor.get_sql_table(stream_name)
293
294    def get_dataset(self, stream_name: str) -> CachedDataset:
295        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
296
297        This can be used to read and analyze the data in a SQL-based destination.
298
299        TODO: In a future iteration, we can consider providing stream configuration information
300              (catalog information) to the `CachedDataset` object via the "Get stream properties"
301              API: https://reference.airbyte.com/reference/getstreamproperties
302        """
303        return CachedDataset(
304            self.get_sql_cache(),
305            stream_name=stream_name,
306            stream_configuration=False,  # Don't look for stream configuration in cache.
307        )
308
309    def get_sql_database_name(self) -> str:
310        """Return the SQL database name."""
311        cache = self.get_sql_cache()
312        return cache.get_database_name()
313
314    def get_sql_schema_name(self) -> str:
315        """Return the SQL schema name."""
316        cache = self.get_sql_cache()
317        return cache.schema_name
318
319    @property
320    def stream_names(self) -> list[str]:
321        """Return the set of stream names."""
322        return self.connection.stream_names
323
324    @final
325    @property
326    def streams(
327        self,
328    ) -> _SyncResultStreams:
329        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
330
331        This is a convenience wrapper around the `stream_names`
332        property and `get_dataset()` method.
333        """
334        return self._SyncResultStreams(self)
335
336    class _SyncResultStreams(Mapping[str, CachedDataset]):
337        """A mapping of stream names to cached datasets."""
338
339        def __init__(
340            self,
341            parent: SyncResult,
342            /,
343        ) -> None:
344            self.parent: SyncResult = parent
345
346        def __getitem__(self, key: str) -> CachedDataset:
347            return self.parent.get_dataset(stream_name=key)
348
349        def __iter__(self) -> Iterator[str]:
350            return iter(self.parent.stream_names)
351
352        def __len__(self) -> int:
353            return len(self.parent.stream_names)
354
355
356__all__ = [
357    "SyncResult",
358]
@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: int
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"
150
151    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
152        """Return connection info for the sync job."""
153        if self._connection_response and not force_refresh:
154            return self._connection_response
155
156        self._connection_response = api_util.get_connection(
157            workspace_id=self.workspace.workspace_id,
158            api_root=self.workspace.api_root,
159            connection_id=self.connection.connection_id,
160            client_id=self.workspace.client_id,
161            client_secret=self.workspace.client_secret,
162        )
163        return self._connection_response
164
165    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
166        """Return the destination configuration for the sync job."""
167        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
168        destination_response = api_util.get_destination(
169            destination_id=connection_info.destination_id,
170            api_root=self.workspace.api_root,
171            client_id=self.workspace.client_id,
172            client_secret=self.workspace.client_secret,
173        )
174        return asdict(destination_response.configuration)
175
176    def is_job_complete(self) -> bool:
177        """Check if the sync job is complete."""
178        return self.get_job_status() in FINAL_STATUSES
179
180    def get_job_status(self) -> JobStatusEnum:
181        """Check if the sync job is still running."""
182        return self._fetch_latest_job_info().status
183
184    def _fetch_latest_job_info(self) -> JobResponse:
185        """Return the job info for the sync job."""
186        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
187            return self._latest_job_info
188
189        self._latest_job_info = api_util.get_job_info(
190            job_id=self.job_id,
191            api_root=self.workspace.api_root,
192            client_id=self.workspace.client_id,
193            client_secret=self.workspace.client_secret,
194        )
195        return self._latest_job_info
196
197    @property
198    def bytes_synced(self) -> int:
199        """Return the number of records processed."""
200        return self._fetch_latest_job_info().bytes_synced or 0
201
202    @property
203    def records_synced(self) -> int:
204        """Return the number of records processed."""
205        return self._fetch_latest_job_info().rows_synced or 0
206
207    @property
208    def start_time(self) -> datetime:
209        """Return the start time of the sync job in UTC."""
210        # Parse from ISO 8601 format:
211        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
212
213    def raise_failure_status(
214        self,
215        *,
216        refresh_status: bool = False,
217    ) -> None:
218        """Raise an exception if the sync job failed.
219
220        By default, this method will use the latest status available. If you want to refresh the
221        status before checking for failure, set `refresh_status=True`. If the job has failed, this
222        method will raise a `AirbyteConnectionSyncError`.
223
224        Otherwise, do nothing.
225        """
226        if not refresh_status and self._latest_job_info:
227            latest_status = self._latest_job_info.status
228        else:
229            latest_status = self.get_job_status()
230
231        if latest_status in FAILED_STATUSES:
232            raise AirbyteConnectionSyncError(
233                workspace=self.workspace,
234                connection_id=self.connection.connection_id,
235                job_id=self.job_id,
236                job_status=self.get_job_status(),
237            )
238
239    def wait_for_completion(
240        self,
241        *,
242        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
243        raise_timeout: bool = True,
244        raise_failure: bool = False,
245    ) -> JobStatusEnum:
246        """Wait for a job to finish running."""
247        start_time = time.time()
248        while True:
249            latest_status = self.get_job_status()
250            if latest_status in FINAL_STATUSES:
251                if raise_failure:
252                    # No-op if the job succeeded or is still running:
253                    self.raise_failure_status()
254
255                return latest_status
256
257            if time.time() - start_time > wait_timeout:
258                if raise_timeout:
259                    raise AirbyteConnectionSyncTimeoutError(
260                        workspace=self.workspace,
261                        connection_id=self.connection.connection_id,
262                        job_id=self.job_id,
263                        job_status=latest_status,
264                        timeout=wait_timeout,
265                    )
266
267                return latest_status  # This will be a non-final status
268
269            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
270
271    def get_sql_cache(self) -> CacheBase:
272        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
273        if self._cache:
274            return self._cache
275
276        destination_configuration = self._get_destination_configuration()
277        self._cache = destination_to_cache(destination_configuration=destination_configuration)
278        return self._cache
279
280    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
281        """Return a SQL Engine for querying a SQL-based destination."""
282        return self.get_sql_cache().get_sql_engine()
283
284    def get_sql_table_name(self, stream_name: str) -> str:
285        """Return the SQL table name of the named stream."""
286        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
287
288    def get_sql_table(
289        self,
290        stream_name: str,
291    ) -> sqlalchemy.Table:
292        """Return a SQLAlchemy table object for the named stream."""
293        return self.get_sql_cache().processor.get_sql_table(stream_name)
294
295    def get_dataset(self, stream_name: str) -> CachedDataset:
296        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
297
298        This can be used to read and analyze the data in a SQL-based destination.
299
300        TODO: In a future iteration, we can consider providing stream configuration information
301              (catalog information) to the `CachedDataset` object via the "Get stream properties"
302              API: https://reference.airbyte.com/reference/getstreamproperties
303        """
304        return CachedDataset(
305            self.get_sql_cache(),
306            stream_name=stream_name,
307            stream_configuration=False,  # Don't look for stream configuration in cache.
308        )
309
310    def get_sql_database_name(self) -> str:
311        """Return the SQL database name."""
312        cache = self.get_sql_cache()
313        return cache.get_database_name()
314
315    def get_sql_schema_name(self) -> str:
316        """Return the SQL schema name."""
317        cache = self.get_sql_cache()
318        return cache.schema_name
319
320    @property
321    def stream_names(self) -> list[str]:
322        """Return the set of stream names."""
323        return self.connection.stream_names
324
325    @final
326    @property
327    def streams(
328        self,
329    ) -> _SyncResultStreams:
330        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
331
332        This is a convenience wrapper around the `stream_names`
333        property and `get_dataset()` method.
334        """
335        return self._SyncResultStreams(self)
336
337    class _SyncResultStreams(Mapping[str, CachedDataset]):
338        """A mapping of stream names to cached datasets."""
339
340        def __init__(
341            self,
342            parent: SyncResult,
343            /,
344        ) -> None:
345            self.parent: SyncResult = parent
346
347        def __getitem__(self, key: str) -> CachedDataset:
348            return self.parent.get_dataset(stream_name=key)
349
350        def __iter__(self) -> Iterator[str]:
351            return iter(self.parent.stream_names)
352
353        def __len__(self) -> int:
354            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job."""
149        return f"{self.connection.job_history_url}/{self.job_id}"

Return the URL of the sync job.

def is_job_complete(self) -> bool:
176    def is_job_complete(self) -> bool:
177        """Check if the sync job is complete."""
178        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
180    def get_job_status(self) -> JobStatusEnum:
181        """Check if the sync job is still running."""
182        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
197    @property
198    def bytes_synced(self) -> int:
199        """Return the number of records processed."""
200        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
202    @property
203    def records_synced(self) -> int:
204        """Return the number of records processed."""
205        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
207    @property
208    def start_time(self) -> datetime:
209        """Return the start time of the sync job in UTC."""
210        # Parse from ISO 8601 format:
211        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
213    def raise_failure_status(
214        self,
215        *,
216        refresh_status: bool = False,
217    ) -> None:
218        """Raise an exception if the sync job failed.
219
220        By default, this method will use the latest status available. If you want to refresh the
221        status before checking for failure, set `refresh_status=True`. If the job has failed, this
222        method will raise a `AirbyteConnectionSyncError`.
223
224        Otherwise, do nothing.
225        """
226        if not refresh_status and self._latest_job_info:
227            latest_status = self._latest_job_info.status
228        else:
229            latest_status = self.get_job_status()
230
231        if latest_status in FAILED_STATUSES:
232            raise AirbyteConnectionSyncError(
233                workspace=self.workspace,
234                connection_id=self.connection.connection_id,
235                job_id=self.job_id,
236                job_status=self.get_job_status(),
237            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
239    def wait_for_completion(
240        self,
241        *,
242        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
243        raise_timeout: bool = True,
244        raise_failure: bool = False,
245    ) -> JobStatusEnum:
246        """Wait for a job to finish running."""
247        start_time = time.time()
248        while True:
249            latest_status = self.get_job_status()
250            if latest_status in FINAL_STATUSES:
251                if raise_failure:
252                    # No-op if the job succeeded or is still running:
253                    self.raise_failure_status()
254
255                return latest_status
256
257            if time.time() - start_time > wait_timeout:
258                if raise_timeout:
259                    raise AirbyteConnectionSyncTimeoutError(
260                        workspace=self.workspace,
261                        connection_id=self.connection.connection_id,
262                        job_id=self.job_id,
263                        job_status=latest_status,
264                        timeout=wait_timeout,
265                    )
266
267                return latest_status  # This will be a non-final status
268
269            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
271    def get_sql_cache(self) -> CacheBase:
272        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
273        if self._cache:
274            return self._cache
275
276        destination_configuration = self._get_destination_configuration()
277        self._cache = destination_to_cache(destination_configuration=destination_configuration)
278        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
280    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
281        """Return a SQL Engine for querying a SQL-based destination."""
282        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
284    def get_sql_table_name(self, stream_name: str) -> str:
285        """Return the SQL table name of the named stream."""
286        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
288    def get_sql_table(
289        self,
290        stream_name: str,
291    ) -> sqlalchemy.Table:
292        """Return a SQLAlchemy table object for the named stream."""
293        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
295    def get_dataset(self, stream_name: str) -> CachedDataset:
296        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
297
298        This can be used to read and analyze the data in a SQL-based destination.
299
300        TODO: In a future iteration, we can consider providing stream configuration information
301              (catalog information) to the `CachedDataset` object via the "Get stream properties"
302              API: https://reference.airbyte.com/reference/getstreamproperties
303        """
304        return CachedDataset(
305            self.get_sql_cache(),
306            stream_name=stream_name,
307            stream_configuration=False,  # Don't look for stream configuration in cache.
308        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
310    def get_sql_database_name(self) -> str:
311        """Return the SQL database name."""
312        cache = self.get_sql_cache()
313        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
315    def get_sql_schema_name(self) -> str:
316        """Return the SQL schema name."""
317        cache = self.get_sql_cache()
318        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
320    @property
321    def stream_names(self) -> list[str]:
322        """Return the set of stream names."""
323        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
325    @final
326    @property
327    def streams(
328        self,
329    ) -> _SyncResultStreams:
330        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
331
332        This is a convenience wrapper around the `stream_names`
333        property and `get_dataset()` method.
334        """
335        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.