airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import asdict, dataclass
106from datetime import datetime
107from typing import TYPE_CHECKING, Any, final
108
109from airbyte._util import api_util
110from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
111from airbyte.datasets import CachedDataset
112from airbyte.destinations._translate_dest_to_cache import destination_to_cache
113from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
114
115
116DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
117"""The default timeout for waiting for a sync job to complete, in seconds."""
118
119if TYPE_CHECKING:
120    import sqlalchemy
121
122    from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
123    from airbyte.caches.base import CacheBase
124    from airbyte.cloud.connections import CloudConnection
125    from airbyte.cloud.workspaces import CloudWorkspace
126
127
128@dataclass
129class SyncResult:
130    """The result of a sync operation.
131
132    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
133    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
134    """
135
136    workspace: CloudWorkspace
137    connection: CloudConnection
138    job_id: int
139    table_name_prefix: str = ""
140    table_name_suffix: str = ""
141    _latest_job_info: JobResponse | None = None
142    _connection_response: ConnectionResponse | None = None
143    _cache: CacheBase | None = None
144
145    @property
146    def job_url(self) -> str:
147        """Return the URL of the sync job.
148
149        Note: This currently returns the connection's job history URL, as there is no direct URL
150        to a specific job in the Airbyte Cloud web app.
151
152        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
153              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
154        """
155        return f"{self.connection.job_history_url}"
156
157    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
158        """Return connection info for the sync job."""
159        if self._connection_response and not force_refresh:
160            return self._connection_response
161
162        self._connection_response = api_util.get_connection(
163            workspace_id=self.workspace.workspace_id,
164            api_root=self.workspace.api_root,
165            connection_id=self.connection.connection_id,
166            client_id=self.workspace.client_id,
167            client_secret=self.workspace.client_secret,
168        )
169        return self._connection_response
170
171    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
172        """Return the destination configuration for the sync job."""
173        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
174        destination_response = api_util.get_destination(
175            destination_id=connection_info.destination_id,
176            api_root=self.workspace.api_root,
177            client_id=self.workspace.client_id,
178            client_secret=self.workspace.client_secret,
179        )
180        return asdict(destination_response.configuration)
181
182    def is_job_complete(self) -> bool:
183        """Check if the sync job is complete."""
184        return self.get_job_status() in FINAL_STATUSES
185
186    def get_job_status(self) -> JobStatusEnum:
187        """Check if the sync job is still running."""
188        return self._fetch_latest_job_info().status
189
190    def _fetch_latest_job_info(self) -> JobResponse:
191        """Return the job info for the sync job."""
192        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
193            return self._latest_job_info
194
195        self._latest_job_info = api_util.get_job_info(
196            job_id=self.job_id,
197            api_root=self.workspace.api_root,
198            client_id=self.workspace.client_id,
199            client_secret=self.workspace.client_secret,
200        )
201        return self._latest_job_info
202
203    @property
204    def bytes_synced(self) -> int:
205        """Return the number of records processed."""
206        return self._fetch_latest_job_info().bytes_synced or 0
207
208    @property
209    def records_synced(self) -> int:
210        """Return the number of records processed."""
211        return self._fetch_latest_job_info().rows_synced or 0
212
213    @property
214    def start_time(self) -> datetime:
215        """Return the start time of the sync job in UTC."""
216        # Parse from ISO 8601 format:
217        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
218
219    def raise_failure_status(
220        self,
221        *,
222        refresh_status: bool = False,
223    ) -> None:
224        """Raise an exception if the sync job failed.
225
226        By default, this method will use the latest status available. If you want to refresh the
227        status before checking for failure, set `refresh_status=True`. If the job has failed, this
228        method will raise a `AirbyteConnectionSyncError`.
229
230        Otherwise, do nothing.
231        """
232        if not refresh_status and self._latest_job_info:
233            latest_status = self._latest_job_info.status
234        else:
235            latest_status = self.get_job_status()
236
237        if latest_status in FAILED_STATUSES:
238            raise AirbyteConnectionSyncError(
239                workspace=self.workspace,
240                connection_id=self.connection.connection_id,
241                job_id=self.job_id,
242                job_status=self.get_job_status(),
243            )
244
245    def wait_for_completion(
246        self,
247        *,
248        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
249        raise_timeout: bool = True,
250        raise_failure: bool = False,
251    ) -> JobStatusEnum:
252        """Wait for a job to finish running."""
253        start_time = time.time()
254        while True:
255            latest_status = self.get_job_status()
256            if latest_status in FINAL_STATUSES:
257                if raise_failure:
258                    # No-op if the job succeeded or is still running:
259                    self.raise_failure_status()
260
261                return latest_status
262
263            if time.time() - start_time > wait_timeout:
264                if raise_timeout:
265                    raise AirbyteConnectionSyncTimeoutError(
266                        workspace=self.workspace,
267                        connection_id=self.connection.connection_id,
268                        job_id=self.job_id,
269                        job_status=latest_status,
270                        timeout=wait_timeout,
271                    )
272
273                return latest_status  # This will be a non-final status
274
275            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
276
277    def get_sql_cache(self) -> CacheBase:
278        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
279        if self._cache:
280            return self._cache
281
282        destination_configuration = self._get_destination_configuration()
283        self._cache = destination_to_cache(destination_configuration=destination_configuration)
284        return self._cache
285
286    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
287        """Return a SQL Engine for querying a SQL-based destination."""
288        return self.get_sql_cache().get_sql_engine()
289
290    def get_sql_table_name(self, stream_name: str) -> str:
291        """Return the SQL table name of the named stream."""
292        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
293
294    def get_sql_table(
295        self,
296        stream_name: str,
297    ) -> sqlalchemy.Table:
298        """Return a SQLAlchemy table object for the named stream."""
299        return self.get_sql_cache().processor.get_sql_table(stream_name)
300
301    def get_dataset(self, stream_name: str) -> CachedDataset:
302        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
303
304        This can be used to read and analyze the data in a SQL-based destination.
305
306        TODO: In a future iteration, we can consider providing stream configuration information
307              (catalog information) to the `CachedDataset` object via the "Get stream properties"
308              API: https://reference.airbyte.com/reference/getstreamproperties
309        """
310        return CachedDataset(
311            self.get_sql_cache(),
312            stream_name=stream_name,
313            stream_configuration=False,  # Don't look for stream configuration in cache.
314        )
315
316    def get_sql_database_name(self) -> str:
317        """Return the SQL database name."""
318        cache = self.get_sql_cache()
319        return cache.get_database_name()
320
321    def get_sql_schema_name(self) -> str:
322        """Return the SQL schema name."""
323        cache = self.get_sql_cache()
324        return cache.schema_name
325
326    @property
327    def stream_names(self) -> list[str]:
328        """Return the set of stream names."""
329        return self.connection.stream_names
330
331    @final
332    @property
333    def streams(
334        self,
335    ) -> _SyncResultStreams:
336        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
337
338        This is a convenience wrapper around the `stream_names`
339        property and `get_dataset()` method.
340        """
341        return self._SyncResultStreams(self)
342
343    class _SyncResultStreams(Mapping[str, CachedDataset]):
344        """A mapping of stream names to cached datasets."""
345
346        def __init__(
347            self,
348            parent: SyncResult,
349            /,
350        ) -> None:
351            self.parent: SyncResult = parent
352
353        def __getitem__(self, key: str) -> CachedDataset:
354            return self.parent.get_dataset(stream_name=key)
355
356        def __iter__(self) -> Iterator[str]:
357            return iter(self.parent.stream_names)
358
359        def __len__(self) -> int:
360            return len(self.parent.stream_names)
361
362
363__all__ = [
364    "SyncResult",
365]
@dataclass
class SyncResult:
129@dataclass
130class SyncResult:
131    """The result of a sync operation.
132
133    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
134    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
135    """
136
137    workspace: CloudWorkspace
138    connection: CloudConnection
139    job_id: int
140    table_name_prefix: str = ""
141    table_name_suffix: str = ""
142    _latest_job_info: JobResponse | None = None
143    _connection_response: ConnectionResponse | None = None
144    _cache: CacheBase | None = None
145
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job.
149
150        Note: This currently returns the connection's job history URL, as there is no direct URL
151        to a specific job in the Airbyte Cloud web app.
152
153        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
154              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
155        """
156        return f"{self.connection.job_history_url}"
157
158    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
159        """Return connection info for the sync job."""
160        if self._connection_response and not force_refresh:
161            return self._connection_response
162
163        self._connection_response = api_util.get_connection(
164            workspace_id=self.workspace.workspace_id,
165            api_root=self.workspace.api_root,
166            connection_id=self.connection.connection_id,
167            client_id=self.workspace.client_id,
168            client_secret=self.workspace.client_secret,
169        )
170        return self._connection_response
171
172    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
173        """Return the destination configuration for the sync job."""
174        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
175        destination_response = api_util.get_destination(
176            destination_id=connection_info.destination_id,
177            api_root=self.workspace.api_root,
178            client_id=self.workspace.client_id,
179            client_secret=self.workspace.client_secret,
180        )
181        return asdict(destination_response.configuration)
182
183    def is_job_complete(self) -> bool:
184        """Check if the sync job is complete."""
185        return self.get_job_status() in FINAL_STATUSES
186
187    def get_job_status(self) -> JobStatusEnum:
188        """Check if the sync job is still running."""
189        return self._fetch_latest_job_info().status
190
191    def _fetch_latest_job_info(self) -> JobResponse:
192        """Return the job info for the sync job."""
193        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
194            return self._latest_job_info
195
196        self._latest_job_info = api_util.get_job_info(
197            job_id=self.job_id,
198            api_root=self.workspace.api_root,
199            client_id=self.workspace.client_id,
200            client_secret=self.workspace.client_secret,
201        )
202        return self._latest_job_info
203
204    @property
205    def bytes_synced(self) -> int:
206        """Return the number of records processed."""
207        return self._fetch_latest_job_info().bytes_synced or 0
208
209    @property
210    def records_synced(self) -> int:
211        """Return the number of records processed."""
212        return self._fetch_latest_job_info().rows_synced or 0
213
214    @property
215    def start_time(self) -> datetime:
216        """Return the start time of the sync job in UTC."""
217        # Parse from ISO 8601 format:
218        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
219
220    def raise_failure_status(
221        self,
222        *,
223        refresh_status: bool = False,
224    ) -> None:
225        """Raise an exception if the sync job failed.
226
227        By default, this method will use the latest status available. If you want to refresh the
228        status before checking for failure, set `refresh_status=True`. If the job has failed, this
229        method will raise a `AirbyteConnectionSyncError`.
230
231        Otherwise, do nothing.
232        """
233        if not refresh_status and self._latest_job_info:
234            latest_status = self._latest_job_info.status
235        else:
236            latest_status = self.get_job_status()
237
238        if latest_status in FAILED_STATUSES:
239            raise AirbyteConnectionSyncError(
240                workspace=self.workspace,
241                connection_id=self.connection.connection_id,
242                job_id=self.job_id,
243                job_status=self.get_job_status(),
244            )
245
246    def wait_for_completion(
247        self,
248        *,
249        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
250        raise_timeout: bool = True,
251        raise_failure: bool = False,
252    ) -> JobStatusEnum:
253        """Wait for a job to finish running."""
254        start_time = time.time()
255        while True:
256            latest_status = self.get_job_status()
257            if latest_status in FINAL_STATUSES:
258                if raise_failure:
259                    # No-op if the job succeeded or is still running:
260                    self.raise_failure_status()
261
262                return latest_status
263
264            if time.time() - start_time > wait_timeout:
265                if raise_timeout:
266                    raise AirbyteConnectionSyncTimeoutError(
267                        workspace=self.workspace,
268                        connection_id=self.connection.connection_id,
269                        job_id=self.job_id,
270                        job_status=latest_status,
271                        timeout=wait_timeout,
272                    )
273
274                return latest_status  # This will be a non-final status
275
276            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
277
278    def get_sql_cache(self) -> CacheBase:
279        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
280        if self._cache:
281            return self._cache
282
283        destination_configuration = self._get_destination_configuration()
284        self._cache = destination_to_cache(destination_configuration=destination_configuration)
285        return self._cache
286
287    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
288        """Return a SQL Engine for querying a SQL-based destination."""
289        return self.get_sql_cache().get_sql_engine()
290
291    def get_sql_table_name(self, stream_name: str) -> str:
292        """Return the SQL table name of the named stream."""
293        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
294
295    def get_sql_table(
296        self,
297        stream_name: str,
298    ) -> sqlalchemy.Table:
299        """Return a SQLAlchemy table object for the named stream."""
300        return self.get_sql_cache().processor.get_sql_table(stream_name)
301
302    def get_dataset(self, stream_name: str) -> CachedDataset:
303        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
304
305        This can be used to read and analyze the data in a SQL-based destination.
306
307        TODO: In a future iteration, we can consider providing stream configuration information
308              (catalog information) to the `CachedDataset` object via the "Get stream properties"
309              API: https://reference.airbyte.com/reference/getstreamproperties
310        """
311        return CachedDataset(
312            self.get_sql_cache(),
313            stream_name=stream_name,
314            stream_configuration=False,  # Don't look for stream configuration in cache.
315        )
316
317    def get_sql_database_name(self) -> str:
318        """Return the SQL database name."""
319        cache = self.get_sql_cache()
320        return cache.get_database_name()
321
322    def get_sql_schema_name(self) -> str:
323        """Return the SQL schema name."""
324        cache = self.get_sql_cache()
325        return cache.schema_name
326
327    @property
328    def stream_names(self) -> list[str]:
329        """Return the set of stream names."""
330        return self.connection.stream_names
331
332    @final
333    @property
334    def streams(
335        self,
336    ) -> _SyncResultStreams:
337        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
338
339        This is a convenience wrapper around the `stream_names`
340        property and `get_dataset()` method.
341        """
342        return self._SyncResultStreams(self)
343
344    class _SyncResultStreams(Mapping[str, CachedDataset]):
345        """A mapping of stream names to cached datasets."""
346
347        def __init__(
348            self,
349            parent: SyncResult,
350            /,
351        ) -> None:
352            self.parent: SyncResult = parent
353
354        def __getitem__(self, key: str) -> CachedDataset:
355            return self.parent.get_dataset(stream_name=key)
356
357        def __iter__(self) -> Iterator[str]:
358            return iter(self.parent.stream_names)
359
360        def __len__(self) -> int:
361            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None)
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
146    @property
147    def job_url(self) -> str:
148        """Return the URL of the sync job.
149
150        Note: This currently returns the connection's job history URL, as there is no direct URL
151        to a specific job in the Airbyte Cloud web app.
152
153        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
154              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
155        """
156        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
183    def is_job_complete(self) -> bool:
184        """Check if the sync job is complete."""
185        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
187    def get_job_status(self) -> JobStatusEnum:
188        """Check if the sync job is still running."""
189        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
204    @property
205    def bytes_synced(self) -> int:
206        """Return the number of records processed."""
207        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
209    @property
210    def records_synced(self) -> int:
211        """Return the number of records processed."""
212        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
214    @property
215    def start_time(self) -> datetime:
216        """Return the start time of the sync job in UTC."""
217        # Parse from ISO 8601 format:
218        return datetime.fromisoformat(self._fetch_latest_job_info().start_time)

Return the start time of the sync job in UTC.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
220    def raise_failure_status(
221        self,
222        *,
223        refresh_status: bool = False,
224    ) -> None:
225        """Raise an exception if the sync job failed.
226
227        By default, this method will use the latest status available. If you want to refresh the
228        status before checking for failure, set `refresh_status=True`. If the job has failed, this
229        method will raise a `AirbyteConnectionSyncError`.
230
231        Otherwise, do nothing.
232        """
233        if not refresh_status and self._latest_job_info:
234            latest_status = self._latest_job_info.status
235        else:
236            latest_status = self.get_job_status()
237
238        if latest_status in FAILED_STATUSES:
239            raise AirbyteConnectionSyncError(
240                workspace=self.workspace,
241                connection_id=self.connection.connection_id,
242                job_id=self.job_id,
243                job_status=self.get_job_status(),
244            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
246    def wait_for_completion(
247        self,
248        *,
249        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
250        raise_timeout: bool = True,
251        raise_failure: bool = False,
252    ) -> JobStatusEnum:
253        """Wait for a job to finish running."""
254        start_time = time.time()
255        while True:
256            latest_status = self.get_job_status()
257            if latest_status in FINAL_STATUSES:
258                if raise_failure:
259                    # No-op if the job succeeded or is still running:
260                    self.raise_failure_status()
261
262                return latest_status
263
264            if time.time() - start_time > wait_timeout:
265                if raise_timeout:
266                    raise AirbyteConnectionSyncTimeoutError(
267                        workspace=self.workspace,
268                        connection_id=self.connection.connection_id,
269                        job_id=self.job_id,
270                        job_status=latest_status,
271                        timeout=wait_timeout,
272                    )
273
274                return latest_status  # This will be a non-final status
275
276            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
278    def get_sql_cache(self) -> CacheBase:
279        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
280        if self._cache:
281            return self._cache
282
283        destination_configuration = self._get_destination_configuration()
284        self._cache = destination_to_cache(destination_configuration=destination_configuration)
285        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
287    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
288        """Return a SQL Engine for querying a SQL-based destination."""
289        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
291    def get_sql_table_name(self, stream_name: str) -> str:
292        """Return the SQL table name of the named stream."""
293        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
295    def get_sql_table(
296        self,
297        stream_name: str,
298    ) -> sqlalchemy.Table:
299        """Return a SQLAlchemy table object for the named stream."""
300        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
302    def get_dataset(self, stream_name: str) -> CachedDataset:
303        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
304
305        This can be used to read and analyze the data in a SQL-based destination.
306
307        TODO: In a future iteration, we can consider providing stream configuration information
308              (catalog information) to the `CachedDataset` object via the "Get stream properties"
309              API: https://reference.airbyte.com/reference/getstreamproperties
310        """
311        return CachedDataset(
312            self.get_sql_cache(),
313            stream_name=stream_name,
314            stream_configuration=False,  # Don't look for stream configuration in cache.
315        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
317    def get_sql_database_name(self) -> str:
318        """Return the SQL database name."""
319        cache = self.get_sql_cache()
320        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
322    def get_sql_schema_name(self) -> str:
323        """Return the SQL schema name."""
324        cache = self.get_sql_cache()
325        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
327    @property
328    def stream_names(self) -> list[str]:
329        """Return the set of stream names."""
330        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
332    @final
333    @property
334    def streams(
335        self,
336    ) -> _SyncResultStreams:
337        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
338
339        This is a convenience wrapper around the `stream_names`
340        property and `get_dataset()` method.
341        """
342        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.