airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import asdict, dataclass
106from typing import TYPE_CHECKING, Any
107
108from typing_extensions import final
109
110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
111
112from airbyte._util import api_util
113from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
114from airbyte.datasets import CachedDataset
115from airbyte.destinations._translate_dest_to_cache import destination_to_cache
116from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
117
118
119DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
120"""The default timeout for waiting for a sync job to complete, in seconds."""
121
122if TYPE_CHECKING:
123    from datetime import datetime
124
125    import sqlalchemy
126
127    from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
128    from airbyte.caches.base import CacheBase
129    from airbyte.cloud.connections import CloudConnection
130    from airbyte.cloud.workspaces import CloudWorkspace
131
132
133@dataclass
134class SyncAttempt:
135    """Represents a single attempt of a sync job.
136
137    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
138    calling `.SyncResult.get_attempts()`.
139    """
140
141    workspace: CloudWorkspace
142    connection: CloudConnection
143    job_id: int
144    attempt_number: int
145    _attempt_data: dict[str, Any] | None = None
146
147    @property
148    def attempt_id(self) -> int:
149        """Return the attempt ID."""
150        return self._get_attempt_data()["id"]
151
152    @property
153    def status(self) -> str:
154        """Return the attempt status."""
155        return self._get_attempt_data()["status"]
156
157    @property
158    def bytes_synced(self) -> int:
159        """Return the number of bytes synced in this attempt."""
160        return self._get_attempt_data().get("bytesSynced", 0)
161
162    @property
163    def records_synced(self) -> int:
164        """Return the number of records synced in this attempt."""
165        return self._get_attempt_data().get("recordsSynced", 0)
166
167    @property
168    def created_at(self) -> datetime:
169        """Return the creation time of the attempt."""
170        timestamp = self._get_attempt_data()["createdAt"]
171        return ab_datetime_parse(timestamp)
172
173    def _get_attempt_data(self) -> dict[str, Any]:
174        """Get attempt data from the provided attempt data."""
175        if self._attempt_data is None:
176            raise ValueError(
177                "Attempt data not provided. SyncAttempt should be created via "
178                "SyncResult.get_attempts()."
179            )
180        return self._attempt_data["attempt"]
181
182    def get_full_log_text(self) -> str:
183        """Return the complete log text for this attempt.
184
185        Returns:
186            String containing all log text for this attempt, with lines separated by newlines.
187        """
188        if self._attempt_data is None:
189            return ""
190
191        logs_data = self._attempt_data.get("logs")
192        if not logs_data:
193            return ""
194
195        result = ""
196
197        if "events" in logs_data:
198            log_events = logs_data["events"]
199            if log_events:
200                log_lines = []
201                for event in log_events:
202                    timestamp = event.get("timestamp", "")
203                    level = event.get("level", "INFO")
204                    message = event.get("message", "")
205                    log_lines.append(
206                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
207                    )
208                result = "\n".join(log_lines)
209        elif "logLines" in logs_data:
210            log_lines = logs_data["logLines"]
211            if log_lines:
212                result = "\n".join(log_lines)
213
214        return result
215
216
217@dataclass
218class SyncResult:
219    """The result of a sync operation.
220
221    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
222    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
223    """
224
225    workspace: CloudWorkspace
226    connection: CloudConnection
227    job_id: int
228    table_name_prefix: str = ""
229    table_name_suffix: str = ""
230    _latest_job_info: JobResponse | None = None
231    _connection_response: ConnectionResponse | None = None
232    _cache: CacheBase | None = None
233    _job_with_attempts_info: dict[str, Any] | None = None
234
235    @property
236    def job_url(self) -> str:
237        """Return the URL of the sync job.
238
239        Note: This currently returns the connection's job history URL, as there is no direct URL
240        to a specific job in the Airbyte Cloud web app.
241
242        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
243              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
244        """
245        return f"{self.connection.job_history_url}"
246
247    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
248        """Return connection info for the sync job."""
249        if self._connection_response and not force_refresh:
250            return self._connection_response
251
252        self._connection_response = api_util.get_connection(
253            workspace_id=self.workspace.workspace_id,
254            api_root=self.workspace.api_root,
255            connection_id=self.connection.connection_id,
256            client_id=self.workspace.client_id,
257            client_secret=self.workspace.client_secret,
258            bearer_token=self.workspace.bearer_token,
259        )
260        return self._connection_response
261
262    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
263        """Return the destination configuration for the sync job."""
264        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
265        destination_response = api_util.get_destination(
266            destination_id=connection_info.destination_id,
267            api_root=self.workspace.api_root,
268            client_id=self.workspace.client_id,
269            client_secret=self.workspace.client_secret,
270            bearer_token=self.workspace.bearer_token,
271        )
272        return asdict(destination_response.configuration)
273
274    def is_job_complete(self) -> bool:
275        """Check if the sync job is complete."""
276        return self.get_job_status() in FINAL_STATUSES
277
278    def get_job_status(self) -> JobStatusEnum:
279        """Check if the sync job is still running."""
280        return self._fetch_latest_job_info().status
281
282    def _fetch_latest_job_info(self) -> JobResponse:
283        """Return the job info for the sync job."""
284        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
285            return self._latest_job_info
286
287        self._latest_job_info = api_util.get_job_info(
288            job_id=self.job_id,
289            api_root=self.workspace.api_root,
290            client_id=self.workspace.client_id,
291            client_secret=self.workspace.client_secret,
292            bearer_token=self.workspace.bearer_token,
293        )
294        return self._latest_job_info
295
296    @property
297    def bytes_synced(self) -> int:
298        """Return the number of records processed."""
299        return self._fetch_latest_job_info().bytes_synced or 0
300
301    @property
302    def records_synced(self) -> int:
303        """Return the number of records processed."""
304        return self._fetch_latest_job_info().rows_synced or 0
305
306    @property
307    def start_time(self) -> datetime:
308        """Return the start time of the sync job in UTC."""
309        try:
310            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
311        except (ValueError, TypeError) as e:
312            if "Invalid isoformat string" in str(e):
313                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
314                    api_root=self.workspace.api_root,
315                    path="/jobs/get",
316                    json={"id": self.job_id},
317                    client_id=self.workspace.client_id,
318                    client_secret=self.workspace.client_secret,
319                    bearer_token=self.workspace.bearer_token,
320                )
321                raw_start_time = job_info_raw.get("startTime")
322                if raw_start_time:
323                    return ab_datetime_parse(raw_start_time)
324            raise
325
326    def _fetch_job_with_attempts(self) -> dict[str, Any]:
327        """Fetch job info with attempts from Config API using lazy loading pattern."""
328        if self._job_with_attempts_info is not None:
329            return self._job_with_attempts_info
330
331        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
332            api_root=self.workspace.api_root,
333            path="/jobs/get",
334            json={
335                "id": self.job_id,
336            },
337            client_id=self.workspace.client_id,
338            client_secret=self.workspace.client_secret,
339            bearer_token=self.workspace.bearer_token,
340        )
341        return self._job_with_attempts_info
342
343    def get_attempts(self) -> list[SyncAttempt]:
344        """Return a list of attempts for this sync job."""
345        job_with_attempts = self._fetch_job_with_attempts()
346        attempts_data = job_with_attempts.get("attempts", [])
347
348        return [
349            SyncAttempt(
350                workspace=self.workspace,
351                connection=self.connection,
352                job_id=self.job_id,
353                attempt_number=i,
354                _attempt_data=attempt_data,
355            )
356            for i, attempt_data in enumerate(attempts_data, start=0)
357        ]
358
359    def raise_failure_status(
360        self,
361        *,
362        refresh_status: bool = False,
363    ) -> None:
364        """Raise an exception if the sync job failed.
365
366        By default, this method will use the latest status available. If you want to refresh the
367        status before checking for failure, set `refresh_status=True`. If the job has failed, this
368        method will raise a `AirbyteConnectionSyncError`.
369
370        Otherwise, do nothing.
371        """
372        if not refresh_status and self._latest_job_info:
373            latest_status = self._latest_job_info.status
374        else:
375            latest_status = self.get_job_status()
376
377        if latest_status in FAILED_STATUSES:
378            raise AirbyteConnectionSyncError(
379                workspace=self.workspace,
380                connection_id=self.connection.connection_id,
381                job_id=self.job_id,
382                job_status=self.get_job_status(),
383            )
384
385    def wait_for_completion(
386        self,
387        *,
388        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
389        raise_timeout: bool = True,
390        raise_failure: bool = False,
391    ) -> JobStatusEnum:
392        """Wait for a job to finish running."""
393        start_time = time.time()
394        while True:
395            latest_status = self.get_job_status()
396            if latest_status in FINAL_STATUSES:
397                if raise_failure:
398                    # No-op if the job succeeded or is still running:
399                    self.raise_failure_status()
400
401                return latest_status
402
403            if time.time() - start_time > wait_timeout:
404                if raise_timeout:
405                    raise AirbyteConnectionSyncTimeoutError(
406                        workspace=self.workspace,
407                        connection_id=self.connection.connection_id,
408                        job_id=self.job_id,
409                        job_status=latest_status,
410                        timeout=wait_timeout,
411                    )
412
413                return latest_status  # This will be a non-final status
414
415            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
416
417    def get_sql_cache(self) -> CacheBase:
418        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
419        if self._cache:
420            return self._cache
421
422        destination_configuration = self._get_destination_configuration()
423        self._cache = destination_to_cache(destination_configuration=destination_configuration)
424        return self._cache
425
426    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
427        """Return a SQL Engine for querying a SQL-based destination."""
428        return self.get_sql_cache().get_sql_engine()
429
430    def get_sql_table_name(self, stream_name: str) -> str:
431        """Return the SQL table name of the named stream."""
432        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
433
434    def get_sql_table(
435        self,
436        stream_name: str,
437    ) -> sqlalchemy.Table:
438        """Return a SQLAlchemy table object for the named stream."""
439        return self.get_sql_cache().processor.get_sql_table(stream_name)
440
441    def get_dataset(self, stream_name: str) -> CachedDataset:
442        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
443
444        This can be used to read and analyze the data in a SQL-based destination.
445
446        TODO: In a future iteration, we can consider providing stream configuration information
447              (catalog information) to the `CachedDataset` object via the "Get stream properties"
448              API: https://reference.airbyte.com/reference/getstreamproperties
449        """
450        return CachedDataset(
451            self.get_sql_cache(),
452            stream_name=stream_name,
453            stream_configuration=False,  # Don't look for stream configuration in cache.
454        )
455
456    def get_sql_database_name(self) -> str:
457        """Return the SQL database name."""
458        cache = self.get_sql_cache()
459        return cache.get_database_name()
460
461    def get_sql_schema_name(self) -> str:
462        """Return the SQL schema name."""
463        cache = self.get_sql_cache()
464        return cache.schema_name
465
466    @property
467    def stream_names(self) -> list[str]:
468        """Return the set of stream names."""
469        return self.connection.stream_names
470
471    @final
472    @property
473    def streams(
474        self,
475    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
476        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
477
478        This is a convenience wrapper around the `stream_names`
479        property and `get_dataset()` method.
480        """
481        return self._SyncResultStreams(self)
482
483    class _SyncResultStreams(Mapping[str, CachedDataset]):
484        """A mapping of stream names to cached datasets."""
485
486        def __init__(
487            self,
488            parent: SyncResult,
489            /,
490        ) -> None:
491            self.parent: SyncResult = parent
492
493        def __getitem__(self, key: str) -> CachedDataset:
494            return self.parent.get_dataset(stream_name=key)
495
496        def __iter__(self) -> Iterator[str]:
497            return iter(self.parent.stream_names)
498
499        def __len__(self) -> int:
500            return len(self.parent.stream_names)
501
502
503__all__ = [
504    "SyncResult",
505    "SyncAttempt",
506]
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259            bearer_token=self.workspace.bearer_token,
260        )
261        return self._connection_response
262
263    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
264        """Return the destination configuration for the sync job."""
265        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
266        destination_response = api_util.get_destination(
267            destination_id=connection_info.destination_id,
268            api_root=self.workspace.api_root,
269            client_id=self.workspace.client_id,
270            client_secret=self.workspace.client_secret,
271            bearer_token=self.workspace.bearer_token,
272        )
273        return asdict(destination_response.configuration)
274
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES
278
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status
282
283    def _fetch_latest_job_info(self) -> JobResponse:
284        """Return the job info for the sync job."""
285        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
286            return self._latest_job_info
287
288        self._latest_job_info = api_util.get_job_info(
289            job_id=self.job_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            bearer_token=self.workspace.bearer_token,
294        )
295        return self._latest_job_info
296
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0
301
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0
306
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise
326
327    def _fetch_job_with_attempts(self) -> dict[str, Any]:
328        """Fetch job info with attempts from Config API using lazy loading pattern."""
329        if self._job_with_attempts_info is not None:
330            return self._job_with_attempts_info
331
332        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
333            api_root=self.workspace.api_root,
334            path="/jobs/get",
335            json={
336                "id": self.job_id,
337            },
338            client_id=self.workspace.client_id,
339            client_secret=self.workspace.client_secret,
340            bearer_token=self.workspace.bearer_token,
341        )
342        return self._job_with_attempts_info
343
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]
359
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )
385
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
417
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache
426
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()
430
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
434
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)
441
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )
456
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()
461
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name
466
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names
471
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)
483
484    class _SyncResultStreams(Mapping[str, CachedDataset]):
485        """A mapping of stream names to cached datasets."""
486
487        def __init__(
488            self,
489            parent: SyncResult,
490            /,
491        ) -> None:
492            self.parent: SyncResult = parent
493
494        def __getitem__(self, key: str) -> CachedDataset:
495            return self.parent.get_dataset(stream_name=key)
496
497        def __iter__(self) -> Iterator[str]:
498            return iter(self.parent.stream_names)
499
500        def __len__(self) -> int:
501            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
275    def is_job_complete(self) -> bool:
276        """Check if the sync job is complete."""
277        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
279    def get_job_status(self) -> JobStatusEnum:
280        """Check if the sync job is still running."""
281        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
297    @property
298    def bytes_synced(self) -> int:
299        """Return the number of records processed."""
300        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
302    @property
303    def records_synced(self) -> int:
304        """Return the number of records processed."""
305        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
307    @property
308    def start_time(self) -> datetime:
309        """Return the start time of the sync job in UTC."""
310        try:
311            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
312        except (ValueError, TypeError) as e:
313            if "Invalid isoformat string" in str(e):
314                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
315                    api_root=self.workspace.api_root,
316                    path="/jobs/get",
317                    json={"id": self.job_id},
318                    client_id=self.workspace.client_id,
319                    client_secret=self.workspace.client_secret,
320                    bearer_token=self.workspace.bearer_token,
321                )
322                raw_start_time = job_info_raw.get("startTime")
323                if raw_start_time:
324                    return ab_datetime_parse(raw_start_time)
325            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[SyncAttempt]:
344    def get_attempts(self) -> list[SyncAttempt]:
345        """Return a list of attempts for this sync job."""
346        job_with_attempts = self._fetch_job_with_attempts()
347        attempts_data = job_with_attempts.get("attempts", [])
348
349        return [
350            SyncAttempt(
351                workspace=self.workspace,
352                connection=self.connection,
353                job_id=self.job_id,
354                attempt_number=i,
355                _attempt_data=attempt_data,
356            )
357            for i, attempt_data in enumerate(attempts_data, start=0)
358        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
360    def raise_failure_status(
361        self,
362        *,
363        refresh_status: bool = False,
364    ) -> None:
365        """Raise an exception if the sync job failed.
366
367        By default, this method will use the latest status available. If you want to refresh the
368        status before checking for failure, set `refresh_status=True`. If the job has failed, this
369        method will raise a `AirbyteConnectionSyncError`.
370
371        Otherwise, do nothing.
372        """
373        if not refresh_status and self._latest_job_info:
374            latest_status = self._latest_job_info.status
375        else:
376            latest_status = self.get_job_status()
377
378        if latest_status in FAILED_STATUSES:
379            raise AirbyteConnectionSyncError(
380                workspace=self.workspace,
381                connection_id=self.connection.connection_id,
382                job_id=self.job_id,
383                job_status=self.get_job_status(),
384            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
386    def wait_for_completion(
387        self,
388        *,
389        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
390        raise_timeout: bool = True,
391        raise_failure: bool = False,
392    ) -> JobStatusEnum:
393        """Wait for a job to finish running."""
394        start_time = time.time()
395        while True:
396            latest_status = self.get_job_status()
397            if latest_status in FINAL_STATUSES:
398                if raise_failure:
399                    # No-op if the job succeeded or is still running:
400                    self.raise_failure_status()
401
402                return latest_status
403
404            if time.time() - start_time > wait_timeout:
405                if raise_timeout:
406                    raise AirbyteConnectionSyncTimeoutError(
407                        workspace=self.workspace,
408                        connection_id=self.connection.connection_id,
409                        job_id=self.job_id,
410                        job_status=latest_status,
411                        timeout=wait_timeout,
412                    )
413
414                return latest_status  # This will be a non-final status
415
416            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
418    def get_sql_cache(self) -> CacheBase:
419        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
420        if self._cache:
421            return self._cache
422
423        destination_configuration = self._get_destination_configuration()
424        self._cache = destination_to_cache(destination_configuration=destination_configuration)
425        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
427    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
428        """Return a SQL Engine for querying a SQL-based destination."""
429        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
431    def get_sql_table_name(self, stream_name: str) -> str:
432        """Return the SQL table name of the named stream."""
433        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
435    def get_sql_table(
436        self,
437        stream_name: str,
438    ) -> sqlalchemy.Table:
439        """Return a SQLAlchemy table object for the named stream."""
440        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
442    def get_dataset(self, stream_name: str) -> CachedDataset:
443        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
444
445        This can be used to read and analyze the data in a SQL-based destination.
446
447        TODO: In a future iteration, we can consider providing stream configuration information
448              (catalog information) to the `CachedDataset` object via the "Get stream properties"
449              API: https://reference.airbyte.com/reference/getstreamproperties
450        """
451        return CachedDataset(
452            self.get_sql_cache(),
453            stream_name=stream_name,
454            stream_configuration=False,  # Don't look for stream configuration in cache.
455        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
457    def get_sql_database_name(self) -> str:
458        """Return the SQL database name."""
459        cache = self.get_sql_cache()
460        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
462    def get_sql_schema_name(self) -> str:
463        """Return the SQL schema name."""
464        cache = self.get_sql_cache()
465        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
467    @property
468    def stream_names(self) -> list[str]:
469        """Return the set of stream names."""
470        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
472    @final
473    @property
474    def streams(
475        self,
476    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
477        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
478
479        This is a convenience wrapper around the `stream_names`
480        property and `get_dataset()` method.
481        """
482        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

@dataclass
class SyncAttempt:
134@dataclass
135class SyncAttempt:
136    """Represents a single attempt of a sync job.
137
138    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
139    calling `.SyncResult.get_attempts()`.
140    """
141
142    workspace: CloudWorkspace
143    connection: CloudConnection
144    job_id: int
145    attempt_number: int
146    _attempt_data: dict[str, Any] | None = None
147
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]
152
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]
157
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)
162
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)
167
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)
173
174    def _get_attempt_data(self) -> dict[str, Any]:
175        """Get attempt data from the provided attempt data."""
176        if self._attempt_data is None:
177            raise ValueError(
178                "Attempt data not provided. SyncAttempt should be created via "
179                "SyncResult.get_attempts()."
180            )
181        return self._attempt_data["attempt"]
182
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(
207                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
208                    )
209                result = "\n".join(log_lines)
210        elif "logLines" in logs_data:
211            log_lines = logs_data["logLines"]
212            if log_lines:
213                result = "\n".join(log_lines)
214
215        return result

Represents a single attempt of a sync job.

This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by calling .SyncResult.get_attempts().

SyncAttempt( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, attempt_number: int, _attempt_data: dict[str, typing.Any] | None = None)
job_id: int
attempt_number: int
attempt_id: int
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]

Return the attempt ID.

status: str
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]

Return the attempt status.

bytes_synced: int
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)

Return the number of bytes synced in this attempt.

records_synced: int
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)

Return the number of records synced in this attempt.

created_at: datetime.datetime
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)

Return the creation time of the attempt.

def get_full_log_text(self) -> str:
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(
207                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
208                    )
209                result = "\n".join(log_lines)
210        elif "logLines" in logs_data:
211            log_lines = logs_data["logLines"]
212            if log_lines:
213                result = "\n".join(log_lines)
214
215        return result

Return the complete log text for this attempt.

Returns:

String containing all log text for this attempt, with lines separated by newlines.