airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import asdict, dataclass
106from typing import TYPE_CHECKING, Any
107
108from typing_extensions import final
109
110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
111
112from airbyte._util import api_util
113from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
114from airbyte.datasets import CachedDataset
115from airbyte.destinations._translate_dest_to_cache import destination_to_cache
116from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
117
118
119DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
120"""The default timeout for waiting for a sync job to complete, in seconds."""
121
122if TYPE_CHECKING:
123    from datetime import datetime
124
125    import sqlalchemy
126
127    from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
128    from airbyte.caches.base import CacheBase
129    from airbyte.cloud.connections import CloudConnection
130    from airbyte.cloud.workspaces import CloudWorkspace
131
132
133@dataclass
134class SyncAttempt:
135    """Represents a single attempt of a sync job.
136
137    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
138    calling `.SyncResult.get_attempts()`.
139    """
140
141    workspace: CloudWorkspace
142    connection: CloudConnection
143    job_id: int
144    attempt_number: int
145    _attempt_data: dict[str, Any] | None = None
146
147    @property
148    def attempt_id(self) -> int:
149        """Return the attempt ID."""
150        return self._get_attempt_data()["id"]
151
152    @property
153    def status(self) -> str:
154        """Return the attempt status."""
155        return self._get_attempt_data()["status"]
156
157    @property
158    def bytes_synced(self) -> int:
159        """Return the number of bytes synced in this attempt."""
160        return self._get_attempt_data().get("bytesSynced", 0)
161
162    @property
163    def records_synced(self) -> int:
164        """Return the number of records synced in this attempt."""
165        return self._get_attempt_data().get("recordsSynced", 0)
166
167    @property
168    def created_at(self) -> datetime:
169        """Return the creation time of the attempt."""
170        timestamp = self._get_attempt_data()["createdAt"]
171        return ab_datetime_parse(timestamp)
172
173    def _get_attempt_data(self) -> dict[str, Any]:
174        """Get attempt data from the provided attempt data."""
175        if self._attempt_data is None:
176            raise ValueError(
177                "Attempt data not provided. SyncAttempt should be created via "
178                "SyncResult.get_attempts()."
179            )
180        return self._attempt_data["attempt"]
181
182    def get_full_log_text(self) -> str:
183        """Return the complete log text for this attempt.
184
185        Returns:
186            String containing all log text for this attempt, with lines separated by newlines.
187        """
188        if self._attempt_data is None:
189            return ""
190
191        logs_data = self._attempt_data.get("logs")
192        if not logs_data:
193            return ""
194
195        result = ""
196
197        if "events" in logs_data:
198            log_events = logs_data["events"]
199            if log_events:
200                log_lines = []
201                for event in log_events:
202                    timestamp = event.get("timestamp", "")
203                    level = event.get("level", "INFO")
204                    message = event.get("message", "")
205                    log_lines.append(f"[{timestamp}] {level}: {message}")
206                result = "\n".join(log_lines)
207        elif "logLines" in logs_data:
208            log_lines = logs_data["logLines"]
209            if log_lines:
210                result = "\n".join(log_lines)
211
212        return result
213
214
215@dataclass
216class SyncResult:
217    """The result of a sync operation.
218
219    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
220    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
221    """
222
223    workspace: CloudWorkspace
224    connection: CloudConnection
225    job_id: int
226    table_name_prefix: str = ""
227    table_name_suffix: str = ""
228    _latest_job_info: JobResponse | None = None
229    _connection_response: ConnectionResponse | None = None
230    _cache: CacheBase | None = None
231    _job_with_attempts_info: dict[str, Any] | None = None
232
233    @property
234    def job_url(self) -> str:
235        """Return the URL of the sync job.
236
237        Note: This currently returns the connection's job history URL, as there is no direct URL
238        to a specific job in the Airbyte Cloud web app.
239
240        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
241              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
242        """
243        return f"{self.connection.job_history_url}"
244
245    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
246        """Return connection info for the sync job."""
247        if self._connection_response and not force_refresh:
248            return self._connection_response
249
250        self._connection_response = api_util.get_connection(
251            workspace_id=self.workspace.workspace_id,
252            api_root=self.workspace.api_root,
253            connection_id=self.connection.connection_id,
254            client_id=self.workspace.client_id,
255            client_secret=self.workspace.client_secret,
256        )
257        return self._connection_response
258
259    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
260        """Return the destination configuration for the sync job."""
261        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
262        destination_response = api_util.get_destination(
263            destination_id=connection_info.destination_id,
264            api_root=self.workspace.api_root,
265            client_id=self.workspace.client_id,
266            client_secret=self.workspace.client_secret,
267        )
268        return asdict(destination_response.configuration)
269
270    def is_job_complete(self) -> bool:
271        """Check if the sync job is complete."""
272        return self.get_job_status() in FINAL_STATUSES
273
274    def get_job_status(self) -> JobStatusEnum:
275        """Check if the sync job is still running."""
276        return self._fetch_latest_job_info().status
277
278    def _fetch_latest_job_info(self) -> JobResponse:
279        """Return the job info for the sync job."""
280        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
281            return self._latest_job_info
282
283        self._latest_job_info = api_util.get_job_info(
284            job_id=self.job_id,
285            api_root=self.workspace.api_root,
286            client_id=self.workspace.client_id,
287            client_secret=self.workspace.client_secret,
288        )
289        return self._latest_job_info
290
291    @property
292    def bytes_synced(self) -> int:
293        """Return the number of records processed."""
294        return self._fetch_latest_job_info().bytes_synced or 0
295
296    @property
297    def records_synced(self) -> int:
298        """Return the number of records processed."""
299        return self._fetch_latest_job_info().rows_synced or 0
300
301    @property
302    def start_time(self) -> datetime:
303        """Return the start time of the sync job in UTC."""
304        try:
305            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
306        except (ValueError, TypeError) as e:
307            if "Invalid isoformat string" in str(e):
308                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
309                    api_root=self.workspace.api_root,
310                    path="/jobs/get",
311                    json={"id": self.job_id},
312                    client_id=self.workspace.client_id,
313                    client_secret=self.workspace.client_secret,
314                )
315                raw_start_time = job_info_raw.get("startTime")
316                if raw_start_time:
317                    return ab_datetime_parse(raw_start_time)
318            raise
319
320    def _fetch_job_with_attempts(self) -> dict[str, Any]:
321        """Fetch job info with attempts from Config API using lazy loading pattern."""
322        if self._job_with_attempts_info is not None:
323            return self._job_with_attempts_info
324
325        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
326            api_root=self.workspace.api_root,
327            path="/jobs/get",
328            json={
329                "id": self.job_id,
330            },
331            client_id=self.workspace.client_id,
332            client_secret=self.workspace.client_secret,
333        )
334        return self._job_with_attempts_info
335
336    def get_attempts(self) -> list[SyncAttempt]:
337        """Return a list of attempts for this sync job."""
338        job_with_attempts = self._fetch_job_with_attempts()
339        attempts_data = job_with_attempts.get("attempts", [])
340
341        return [
342            SyncAttempt(
343                workspace=self.workspace,
344                connection=self.connection,
345                job_id=self.job_id,
346                attempt_number=i,
347                _attempt_data=attempt_data,
348            )
349            for i, attempt_data in enumerate(attempts_data, start=0)
350        ]
351
352    def raise_failure_status(
353        self,
354        *,
355        refresh_status: bool = False,
356    ) -> None:
357        """Raise an exception if the sync job failed.
358
359        By default, this method will use the latest status available. If you want to refresh the
360        status before checking for failure, set `refresh_status=True`. If the job has failed, this
361        method will raise a `AirbyteConnectionSyncError`.
362
363        Otherwise, do nothing.
364        """
365        if not refresh_status and self._latest_job_info:
366            latest_status = self._latest_job_info.status
367        else:
368            latest_status = self.get_job_status()
369
370        if latest_status in FAILED_STATUSES:
371            raise AirbyteConnectionSyncError(
372                workspace=self.workspace,
373                connection_id=self.connection.connection_id,
374                job_id=self.job_id,
375                job_status=self.get_job_status(),
376            )
377
378    def wait_for_completion(
379        self,
380        *,
381        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
382        raise_timeout: bool = True,
383        raise_failure: bool = False,
384    ) -> JobStatusEnum:
385        """Wait for a job to finish running."""
386        start_time = time.time()
387        while True:
388            latest_status = self.get_job_status()
389            if latest_status in FINAL_STATUSES:
390                if raise_failure:
391                    # No-op if the job succeeded or is still running:
392                    self.raise_failure_status()
393
394                return latest_status
395
396            if time.time() - start_time > wait_timeout:
397                if raise_timeout:
398                    raise AirbyteConnectionSyncTimeoutError(
399                        workspace=self.workspace,
400                        connection_id=self.connection.connection_id,
401                        job_id=self.job_id,
402                        job_status=latest_status,
403                        timeout=wait_timeout,
404                    )
405
406                return latest_status  # This will be a non-final status
407
408            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
409
410    def get_sql_cache(self) -> CacheBase:
411        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
412        if self._cache:
413            return self._cache
414
415        destination_configuration = self._get_destination_configuration()
416        self._cache = destination_to_cache(destination_configuration=destination_configuration)
417        return self._cache
418
419    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
420        """Return a SQL Engine for querying a SQL-based destination."""
421        return self.get_sql_cache().get_sql_engine()
422
423    def get_sql_table_name(self, stream_name: str) -> str:
424        """Return the SQL table name of the named stream."""
425        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
426
427    def get_sql_table(
428        self,
429        stream_name: str,
430    ) -> sqlalchemy.Table:
431        """Return a SQLAlchemy table object for the named stream."""
432        return self.get_sql_cache().processor.get_sql_table(stream_name)
433
434    def get_dataset(self, stream_name: str) -> CachedDataset:
435        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
436
437        This can be used to read and analyze the data in a SQL-based destination.
438
439        TODO: In a future iteration, we can consider providing stream configuration information
440              (catalog information) to the `CachedDataset` object via the "Get stream properties"
441              API: https://reference.airbyte.com/reference/getstreamproperties
442        """
443        return CachedDataset(
444            self.get_sql_cache(),
445            stream_name=stream_name,
446            stream_configuration=False,  # Don't look for stream configuration in cache.
447        )
448
449    def get_sql_database_name(self) -> str:
450        """Return the SQL database name."""
451        cache = self.get_sql_cache()
452        return cache.get_database_name()
453
454    def get_sql_schema_name(self) -> str:
455        """Return the SQL schema name."""
456        cache = self.get_sql_cache()
457        return cache.schema_name
458
459    @property
460    def stream_names(self) -> list[str]:
461        """Return the set of stream names."""
462        return self.connection.stream_names
463
464    @final
465    @property
466    def streams(
467        self,
468    ) -> _SyncResultStreams:
469        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
470
471        This is a convenience wrapper around the `stream_names`
472        property and `get_dataset()` method.
473        """
474        return self._SyncResultStreams(self)
475
476    class _SyncResultStreams(Mapping[str, CachedDataset]):
477        """A mapping of stream names to cached datasets."""
478
479        def __init__(
480            self,
481            parent: SyncResult,
482            /,
483        ) -> None:
484            self.parent: SyncResult = parent
485
486        def __getitem__(self, key: str) -> CachedDataset:
487            return self.parent.get_dataset(stream_name=key)
488
489        def __iter__(self) -> Iterator[str]:
490            return iter(self.parent.stream_names)
491
492        def __len__(self) -> int:
493            return len(self.parent.stream_names)
494
495
496__all__ = [
497    "SyncResult",
498    "SyncAttempt",
499]
@dataclass
class SyncResult:
216@dataclass
217class SyncResult:
218    """The result of a sync operation.
219
220    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
221    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
222    """
223
224    workspace: CloudWorkspace
225    connection: CloudConnection
226    job_id: int
227    table_name_prefix: str = ""
228    table_name_suffix: str = ""
229    _latest_job_info: JobResponse | None = None
230    _connection_response: ConnectionResponse | None = None
231    _cache: CacheBase | None = None
232    _job_with_attempts_info: dict[str, Any] | None = None
233
234    @property
235    def job_url(self) -> str:
236        """Return the URL of the sync job.
237
238        Note: This currently returns the connection's job history URL, as there is no direct URL
239        to a specific job in the Airbyte Cloud web app.
240
241        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
242              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
243        """
244        return f"{self.connection.job_history_url}"
245
246    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
247        """Return connection info for the sync job."""
248        if self._connection_response and not force_refresh:
249            return self._connection_response
250
251        self._connection_response = api_util.get_connection(
252            workspace_id=self.workspace.workspace_id,
253            api_root=self.workspace.api_root,
254            connection_id=self.connection.connection_id,
255            client_id=self.workspace.client_id,
256            client_secret=self.workspace.client_secret,
257        )
258        return self._connection_response
259
260    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
261        """Return the destination configuration for the sync job."""
262        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
263        destination_response = api_util.get_destination(
264            destination_id=connection_info.destination_id,
265            api_root=self.workspace.api_root,
266            client_id=self.workspace.client_id,
267            client_secret=self.workspace.client_secret,
268        )
269        return asdict(destination_response.configuration)
270
271    def is_job_complete(self) -> bool:
272        """Check if the sync job is complete."""
273        return self.get_job_status() in FINAL_STATUSES
274
275    def get_job_status(self) -> JobStatusEnum:
276        """Check if the sync job is still running."""
277        return self._fetch_latest_job_info().status
278
279    def _fetch_latest_job_info(self) -> JobResponse:
280        """Return the job info for the sync job."""
281        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
282            return self._latest_job_info
283
284        self._latest_job_info = api_util.get_job_info(
285            job_id=self.job_id,
286            api_root=self.workspace.api_root,
287            client_id=self.workspace.client_id,
288            client_secret=self.workspace.client_secret,
289        )
290        return self._latest_job_info
291
292    @property
293    def bytes_synced(self) -> int:
294        """Return the number of records processed."""
295        return self._fetch_latest_job_info().bytes_synced or 0
296
297    @property
298    def records_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().rows_synced or 0
301
302    @property
303    def start_time(self) -> datetime:
304        """Return the start time of the sync job in UTC."""
305        try:
306            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
307        except (ValueError, TypeError) as e:
308            if "Invalid isoformat string" in str(e):
309                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
310                    api_root=self.workspace.api_root,
311                    path="/jobs/get",
312                    json={"id": self.job_id},
313                    client_id=self.workspace.client_id,
314                    client_secret=self.workspace.client_secret,
315                )
316                raw_start_time = job_info_raw.get("startTime")
317                if raw_start_time:
318                    return ab_datetime_parse(raw_start_time)
319            raise
320
321    def _fetch_job_with_attempts(self) -> dict[str, Any]:
322        """Fetch job info with attempts from Config API using lazy loading pattern."""
323        if self._job_with_attempts_info is not None:
324            return self._job_with_attempts_info
325
326        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
327            api_root=self.workspace.api_root,
328            path="/jobs/get",
329            json={
330                "id": self.job_id,
331            },
332            client_id=self.workspace.client_id,
333            client_secret=self.workspace.client_secret,
334        )
335        return self._job_with_attempts_info
336
337    def get_attempts(self) -> list[SyncAttempt]:
338        """Return a list of attempts for this sync job."""
339        job_with_attempts = self._fetch_job_with_attempts()
340        attempts_data = job_with_attempts.get("attempts", [])
341
342        return [
343            SyncAttempt(
344                workspace=self.workspace,
345                connection=self.connection,
346                job_id=self.job_id,
347                attempt_number=i,
348                _attempt_data=attempt_data,
349            )
350            for i, attempt_data in enumerate(attempts_data, start=0)
351        ]
352
353    def raise_failure_status(
354        self,
355        *,
356        refresh_status: bool = False,
357    ) -> None:
358        """Raise an exception if the sync job failed.
359
360        By default, this method will use the latest status available. If you want to refresh the
361        status before checking for failure, set `refresh_status=True`. If the job has failed, this
362        method will raise a `AirbyteConnectionSyncError`.
363
364        Otherwise, do nothing.
365        """
366        if not refresh_status and self._latest_job_info:
367            latest_status = self._latest_job_info.status
368        else:
369            latest_status = self.get_job_status()
370
371        if latest_status in FAILED_STATUSES:
372            raise AirbyteConnectionSyncError(
373                workspace=self.workspace,
374                connection_id=self.connection.connection_id,
375                job_id=self.job_id,
376                job_status=self.get_job_status(),
377            )
378
379    def wait_for_completion(
380        self,
381        *,
382        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
383        raise_timeout: bool = True,
384        raise_failure: bool = False,
385    ) -> JobStatusEnum:
386        """Wait for a job to finish running."""
387        start_time = time.time()
388        while True:
389            latest_status = self.get_job_status()
390            if latest_status in FINAL_STATUSES:
391                if raise_failure:
392                    # No-op if the job succeeded or is still running:
393                    self.raise_failure_status()
394
395                return latest_status
396
397            if time.time() - start_time > wait_timeout:
398                if raise_timeout:
399                    raise AirbyteConnectionSyncTimeoutError(
400                        workspace=self.workspace,
401                        connection_id=self.connection.connection_id,
402                        job_id=self.job_id,
403                        job_status=latest_status,
404                        timeout=wait_timeout,
405                    )
406
407                return latest_status  # This will be a non-final status
408
409            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
410
411    def get_sql_cache(self) -> CacheBase:
412        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
413        if self._cache:
414            return self._cache
415
416        destination_configuration = self._get_destination_configuration()
417        self._cache = destination_to_cache(destination_configuration=destination_configuration)
418        return self._cache
419
420    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
421        """Return a SQL Engine for querying a SQL-based destination."""
422        return self.get_sql_cache().get_sql_engine()
423
424    def get_sql_table_name(self, stream_name: str) -> str:
425        """Return the SQL table name of the named stream."""
426        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
427
428    def get_sql_table(
429        self,
430        stream_name: str,
431    ) -> sqlalchemy.Table:
432        """Return a SQLAlchemy table object for the named stream."""
433        return self.get_sql_cache().processor.get_sql_table(stream_name)
434
435    def get_dataset(self, stream_name: str) -> CachedDataset:
436        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
437
438        This can be used to read and analyze the data in a SQL-based destination.
439
440        TODO: In a future iteration, we can consider providing stream configuration information
441              (catalog information) to the `CachedDataset` object via the "Get stream properties"
442              API: https://reference.airbyte.com/reference/getstreamproperties
443        """
444        return CachedDataset(
445            self.get_sql_cache(),
446            stream_name=stream_name,
447            stream_configuration=False,  # Don't look for stream configuration in cache.
448        )
449
450    def get_sql_database_name(self) -> str:
451        """Return the SQL database name."""
452        cache = self.get_sql_cache()
453        return cache.get_database_name()
454
455    def get_sql_schema_name(self) -> str:
456        """Return the SQL schema name."""
457        cache = self.get_sql_cache()
458        return cache.schema_name
459
460    @property
461    def stream_names(self) -> list[str]:
462        """Return the set of stream names."""
463        return self.connection.stream_names
464
465    @final
466    @property
467    def streams(
468        self,
469    ) -> _SyncResultStreams:
470        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
471
472        This is a convenience wrapper around the `stream_names`
473        property and `get_dataset()` method.
474        """
475        return self._SyncResultStreams(self)
476
477    class _SyncResultStreams(Mapping[str, CachedDataset]):
478        """A mapping of stream names to cached datasets."""
479
480        def __init__(
481            self,
482            parent: SyncResult,
483            /,
484        ) -> None:
485            self.parent: SyncResult = parent
486
487        def __getitem__(self, key: str) -> CachedDataset:
488            return self.parent.get_dataset(stream_name=key)
489
490        def __iter__(self) -> Iterator[str]:
491            return iter(self.parent.stream_names)
492
493        def __len__(self) -> int:
494            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
234    @property
235    def job_url(self) -> str:
236        """Return the URL of the sync job.
237
238        Note: This currently returns the connection's job history URL, as there is no direct URL
239        to a specific job in the Airbyte Cloud web app.
240
241        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
242              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
243        """
244        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
271    def is_job_complete(self) -> bool:
272        """Check if the sync job is complete."""
273        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
275    def get_job_status(self) -> JobStatusEnum:
276        """Check if the sync job is still running."""
277        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
292    @property
293    def bytes_synced(self) -> int:
294        """Return the number of records processed."""
295        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
297    @property
298    def records_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
302    @property
303    def start_time(self) -> datetime:
304        """Return the start time of the sync job in UTC."""
305        try:
306            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
307        except (ValueError, TypeError) as e:
308            if "Invalid isoformat string" in str(e):
309                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
310                    api_root=self.workspace.api_root,
311                    path="/jobs/get",
312                    json={"id": self.job_id},
313                    client_id=self.workspace.client_id,
314                    client_secret=self.workspace.client_secret,
315                )
316                raw_start_time = job_info_raw.get("startTime")
317                if raw_start_time:
318                    return ab_datetime_parse(raw_start_time)
319            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[SyncAttempt]:
337    def get_attempts(self) -> list[SyncAttempt]:
338        """Return a list of attempts for this sync job."""
339        job_with_attempts = self._fetch_job_with_attempts()
340        attempts_data = job_with_attempts.get("attempts", [])
341
342        return [
343            SyncAttempt(
344                workspace=self.workspace,
345                connection=self.connection,
346                job_id=self.job_id,
347                attempt_number=i,
348                _attempt_data=attempt_data,
349            )
350            for i, attempt_data in enumerate(attempts_data, start=0)
351        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
353    def raise_failure_status(
354        self,
355        *,
356        refresh_status: bool = False,
357    ) -> None:
358        """Raise an exception if the sync job failed.
359
360        By default, this method will use the latest status available. If you want to refresh the
361        status before checking for failure, set `refresh_status=True`. If the job has failed, this
362        method will raise a `AirbyteConnectionSyncError`.
363
364        Otherwise, do nothing.
365        """
366        if not refresh_status and self._latest_job_info:
367            latest_status = self._latest_job_info.status
368        else:
369            latest_status = self.get_job_status()
370
371        if latest_status in FAILED_STATUSES:
372            raise AirbyteConnectionSyncError(
373                workspace=self.workspace,
374                connection_id=self.connection.connection_id,
375                job_id=self.job_id,
376                job_status=self.get_job_status(),
377            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
379    def wait_for_completion(
380        self,
381        *,
382        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
383        raise_timeout: bool = True,
384        raise_failure: bool = False,
385    ) -> JobStatusEnum:
386        """Wait for a job to finish running."""
387        start_time = time.time()
388        while True:
389            latest_status = self.get_job_status()
390            if latest_status in FINAL_STATUSES:
391                if raise_failure:
392                    # No-op if the job succeeded or is still running:
393                    self.raise_failure_status()
394
395                return latest_status
396
397            if time.time() - start_time > wait_timeout:
398                if raise_timeout:
399                    raise AirbyteConnectionSyncTimeoutError(
400                        workspace=self.workspace,
401                        connection_id=self.connection.connection_id,
402                        job_id=self.job_id,
403                        job_status=latest_status,
404                        timeout=wait_timeout,
405                    )
406
407                return latest_status  # This will be a non-final status
408
409            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
411    def get_sql_cache(self) -> CacheBase:
412        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
413        if self._cache:
414            return self._cache
415
416        destination_configuration = self._get_destination_configuration()
417        self._cache = destination_to_cache(destination_configuration=destination_configuration)
418        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
420    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
421        """Return a SQL Engine for querying a SQL-based destination."""
422        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
424    def get_sql_table_name(self, stream_name: str) -> str:
425        """Return the SQL table name of the named stream."""
426        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
428    def get_sql_table(
429        self,
430        stream_name: str,
431    ) -> sqlalchemy.Table:
432        """Return a SQLAlchemy table object for the named stream."""
433        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
435    def get_dataset(self, stream_name: str) -> CachedDataset:
436        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
437
438        This can be used to read and analyze the data in a SQL-based destination.
439
440        TODO: In a future iteration, we can consider providing stream configuration information
441              (catalog information) to the `CachedDataset` object via the "Get stream properties"
442              API: https://reference.airbyte.com/reference/getstreamproperties
443        """
444        return CachedDataset(
445            self.get_sql_cache(),
446            stream_name=stream_name,
447            stream_configuration=False,  # Don't look for stream configuration in cache.
448        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
450    def get_sql_database_name(self) -> str:
451        """Return the SQL database name."""
452        cache = self.get_sql_cache()
453        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
455    def get_sql_schema_name(self) -> str:
456        """Return the SQL schema name."""
457        cache = self.get_sql_cache()
458        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
460    @property
461    def stream_names(self) -> list[str]:
462        """Return the set of stream names."""
463        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
465    @final
466    @property
467    def streams(
468        self,
469    ) -> _SyncResultStreams:
470        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
471
472        This is a convenience wrapper around the `stream_names`
473        property and `get_dataset()` method.
474        """
475        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

@dataclass
class SyncAttempt:
134@dataclass
135class SyncAttempt:
136    """Represents a single attempt of a sync job.
137
138    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
139    calling `.SyncResult.get_attempts()`.
140    """
141
142    workspace: CloudWorkspace
143    connection: CloudConnection
144    job_id: int
145    attempt_number: int
146    _attempt_data: dict[str, Any] | None = None
147
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]
152
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]
157
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)
162
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)
167
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)
173
174    def _get_attempt_data(self) -> dict[str, Any]:
175        """Get attempt data from the provided attempt data."""
176        if self._attempt_data is None:
177            raise ValueError(
178                "Attempt data not provided. SyncAttempt should be created via "
179                "SyncResult.get_attempts()."
180            )
181        return self._attempt_data["attempt"]
182
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(f"[{timestamp}] {level}: {message}")
207                result = "\n".join(log_lines)
208        elif "logLines" in logs_data:
209            log_lines = logs_data["logLines"]
210            if log_lines:
211                result = "\n".join(log_lines)
212
213        return result

Represents a single attempt of a sync job.

This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by calling .SyncResult.get_attempts().

SyncAttempt( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, attempt_number: int, _attempt_data: dict[str, typing.Any] | None = None)
job_id: int
attempt_number: int
attempt_id: int
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]

Return the attempt ID.

status: str
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]

Return the attempt status.

bytes_synced: int
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)

Return the number of bytes synced in this attempt.

records_synced: int
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)

Return the number of records synced in this attempt.

created_at: datetime.datetime
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)

Return the creation time of the attempt.

def get_full_log_text(self) -> str:
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(f"[{timestamp}] {level}: {message}")
207                result = "\n".join(log_lines)
208        elif "logLines" in logs_data:
209            log_lines = logs_data["logLines"]
210            if log_lines:
211                result = "\n".join(log_lines)
212
213        return result

Return the complete log text for this attempt.

Returns:

String containing all log text for this attempt, with lines separated by newlines.