airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import asdict, dataclass
106from typing import TYPE_CHECKING, Any
107
108from typing_extensions import final
109
110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
111
112from airbyte._util import api_util
113from airbyte.caches._utils._dest_to_cache import destination_to_cache
114from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
115from airbyte.cloud.models import CloudConnectionInfo, CloudJobInfo, JobStatusEnum
116from airbyte.datasets import CachedDataset
117from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
118
119
120DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
121"""The default timeout for waiting for a sync job to complete, in seconds."""
122
123if TYPE_CHECKING:
124    from datetime import datetime
125
126    import sqlalchemy
127
128    from airbyte.caches.base import CacheBase
129    from airbyte.cloud.connections import CloudConnection
130    from airbyte.cloud.workspaces import CloudWorkspace
131
132
133@dataclass
134class SyncAttempt:
135    """Represents a single attempt of a sync job.
136
137    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
138    calling `.SyncResult.get_attempts()`.
139    """
140
141    workspace: CloudWorkspace
142    connection: CloudConnection
143    job_id: int
144    attempt_number: int
145    _attempt_data: dict[str, Any] | None = None
146
147    @property
148    def attempt_id(self) -> int:
149        """Return the attempt ID."""
150        return self._get_attempt_data()["id"]
151
152    @property
153    def status(self) -> str:
154        """Return the attempt status."""
155        return self._get_attempt_data()["status"]
156
157    @property
158    def bytes_synced(self) -> int:
159        """Return the number of bytes synced in this attempt."""
160        return self._get_attempt_data().get("bytesSynced", 0)
161
162    @property
163    def records_synced(self) -> int:
164        """Return the number of records synced in this attempt."""
165        return self._get_attempt_data().get("recordsSynced", 0)
166
167    @property
168    def created_at(self) -> datetime:
169        """Return the creation time of the attempt."""
170        timestamp = self._get_attempt_data()["createdAt"]
171        return ab_datetime_parse(timestamp)
172
173    def _get_attempt_data(self) -> dict[str, Any]:
174        """Get attempt data from the provided attempt data."""
175        if self._attempt_data is None:
176            raise ValueError(
177                "Attempt data not provided. SyncAttempt should be created via "
178                "SyncResult.get_attempts()."
179            )
180        return self._attempt_data["attempt"]
181
182    def get_full_log_text(self) -> str:
183        """Return the complete log text for this attempt.
184
185        Returns:
186            String containing all log text for this attempt, with lines separated by newlines.
187        """
188        if self._attempt_data is None:
189            return ""
190
191        logs_data = self._attempt_data.get("logs")
192        if not logs_data:
193            return ""
194
195        result = ""
196
197        if "events" in logs_data:
198            log_events = logs_data["events"]
199            if log_events:
200                log_lines = []
201                for event in log_events:
202                    timestamp = event.get("timestamp", "")
203                    level = event.get("level", "INFO")
204                    message = event.get("message", "")
205                    log_lines.append(
206                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
207                    )
208                result = "\n".join(log_lines)
209        elif "logLines" in logs_data:
210            log_lines = logs_data["logLines"]
211            if log_lines:
212                result = "\n".join(log_lines)
213
214        return result
215
216
217@dataclass
218class SyncResult:
219    """The result of a sync operation.
220
221    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
222    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
223    """
224
225    workspace: CloudWorkspace
226    connection: CloudConnection
227    job_id: int
228    table_name_prefix: str = ""
229    table_name_suffix: str = ""
230    _latest_job_info: CloudJobInfo | None = None
231    _connection_response: CloudConnectionInfo | None = None
232    _cache: CacheBase | None = None
233    _job_with_attempts_info: dict[str, Any] | None = None
234
235    @property
236    def job_url(self) -> str:
237        """Return the URL of the sync job.
238
239        Note: This currently returns the connection's job history URL, as there is no direct URL
240        to a specific job in the Airbyte Cloud web app.
241
242        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
243              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
244        """
245        return f"{self.connection.job_history_url}"
246
247    def _get_connection_info(self, *, force_refresh: bool = False) -> CloudConnectionInfo:
248        """Return connection info for the sync job."""
249        if self._connection_response and not force_refresh:
250            return self._connection_response
251
252        self._connection_response = CloudConnectionInfo.from_api_response(
253            api_util.get_connection(
254                workspace_id=self.workspace.workspace_id,
255                api_root=self.workspace.api_root,
256                connection_id=self.connection.connection_id,
257                client_id=self.workspace.client_id,
258                client_secret=self.workspace.client_secret,
259                bearer_token=self.workspace.bearer_token,
260            )
261        )
262        return self._connection_response
263
264    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
265        """Return the destination configuration for the sync job."""
266        connection_info = self._get_connection_info(force_refresh=force_refresh)
267        destination_response = api_util.get_destination(
268            destination_id=connection_info.destination_id,
269            api_root=self.workspace.api_root,
270            client_id=self.workspace.client_id,
271            client_secret=self.workspace.client_secret,
272            bearer_token=self.workspace.bearer_token,
273        )
274        return asdict(destination_response.configuration)
275
276    def is_job_complete(self) -> bool:
277        """Check if the sync job is complete."""
278        return self.get_job_status() in FINAL_STATUSES
279
280    def get_job_status(self) -> JobStatusEnum:
281        """Check if the sync job is still running."""
282        return self._fetch_latest_job_info().status
283
284    def _fetch_latest_job_info(self) -> CloudJobInfo:
285        """Return the job info for the sync job."""
286        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
287            return self._latest_job_info
288
289        self._latest_job_info = CloudJobInfo.from_api_response(
290            api_util.get_job_info(
291                job_id=self.job_id,
292                api_root=self.workspace.api_root,
293                client_id=self.workspace.client_id,
294                client_secret=self.workspace.client_secret,
295                bearer_token=self.workspace.bearer_token,
296            )
297        )
298        return self._latest_job_info
299
300    @property
301    def bytes_synced(self) -> int:
302        """Return the number of records processed."""
303        return self._fetch_latest_job_info().bytes_synced or 0
304
305    @property
306    def records_synced(self) -> int:
307        """Return the number of records processed."""
308        return self._fetch_latest_job_info().rows_synced or 0
309
310    @property
311    def start_time(self) -> datetime:
312        """Return the start time of the sync job in UTC."""
313        try:
314            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
315        except (ValueError, TypeError) as e:
316            if "Invalid isoformat string" in str(e):
317                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
318                    api_root=self.workspace.api_root,
319                    config_api_root=self.workspace.config_api_root,
320                    path="/jobs/get",
321                    json={"id": self.job_id},
322                    client_id=self.workspace.client_id,
323                    client_secret=self.workspace.client_secret,
324                    bearer_token=self.workspace.bearer_token,
325                )
326                raw_start_time = job_info_raw.get("startTime")
327                if raw_start_time:
328                    return ab_datetime_parse(raw_start_time)
329            raise
330
331    def _fetch_job_with_attempts(self) -> dict[str, Any]:
332        """Fetch job info with attempts from Config API using lazy loading pattern."""
333        if self._job_with_attempts_info is not None:
334            return self._job_with_attempts_info
335
336        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
337            api_root=self.workspace.api_root,
338            config_api_root=self.workspace.config_api_root,
339            path="/jobs/get",
340            json={
341                "id": self.job_id,
342            },
343            client_id=self.workspace.client_id,
344            client_secret=self.workspace.client_secret,
345            bearer_token=self.workspace.bearer_token,
346        )
347        return self._job_with_attempts_info
348
349    def get_attempts(self) -> list[SyncAttempt]:
350        """Return a list of attempts for this sync job."""
351        job_with_attempts = self._fetch_job_with_attempts()
352        attempts_data = job_with_attempts.get("attempts", [])
353
354        return [
355            SyncAttempt(
356                workspace=self.workspace,
357                connection=self.connection,
358                job_id=self.job_id,
359                attempt_number=i,
360                _attempt_data=attempt_data,
361            )
362            for i, attempt_data in enumerate(attempts_data, start=0)
363        ]
364
365    def raise_failure_status(
366        self,
367        *,
368        refresh_status: bool = False,
369    ) -> None:
370        """Raise an exception if the sync job failed.
371
372        By default, this method will use the latest status available. If you want to refresh the
373        status before checking for failure, set `refresh_status=True`. If the job has failed, this
374        method will raise a `AirbyteConnectionSyncError`.
375
376        Otherwise, do nothing.
377        """
378        if not refresh_status and self._latest_job_info:
379            latest_status = self._latest_job_info.status
380        else:
381            latest_status = self.get_job_status()
382
383        if latest_status in FAILED_STATUSES:
384            raise AirbyteConnectionSyncError(
385                workspace=self.workspace,
386                connection_id=self.connection.connection_id,
387                job_id=self.job_id,
388                job_status=self.get_job_status(),
389            )
390
391    def wait_for_completion(
392        self,
393        *,
394        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
395        raise_timeout: bool = True,
396        raise_failure: bool = False,
397    ) -> JobStatusEnum:
398        """Wait for a job to finish running."""
399        start_time = time.time()
400        while True:
401            latest_status = self.get_job_status()
402            if latest_status in FINAL_STATUSES:
403                if raise_failure:
404                    # No-op if the job succeeded or is still running:
405                    self.raise_failure_status()
406
407                return latest_status
408
409            if time.time() - start_time > wait_timeout:
410                if raise_timeout:
411                    raise AirbyteConnectionSyncTimeoutError(
412                        workspace=self.workspace,
413                        connection_id=self.connection.connection_id,
414                        job_id=self.job_id,
415                        job_status=latest_status,
416                        timeout=wait_timeout,
417                    )
418
419                return latest_status  # This will be a non-final status
420
421            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
422
423    def get_sql_cache(self) -> CacheBase:
424        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
425        if self._cache:
426            return self._cache
427
428        destination_configuration = self._get_destination_configuration()
429        self._cache = destination_to_cache(destination_configuration=destination_configuration)
430        return self._cache
431
432    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
433        """Return a SQL Engine for querying a SQL-based destination."""
434        return self.get_sql_cache().get_sql_engine()
435
436    def get_sql_table_name(self, stream_name: str) -> str:
437        """Return the SQL table name of the named stream."""
438        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
439
440    def get_sql_table(
441        self,
442        stream_name: str,
443    ) -> sqlalchemy.Table:
444        """Return a SQLAlchemy table object for the named stream."""
445        return self.get_sql_cache().processor.get_sql_table(stream_name)
446
447    def get_dataset(self, stream_name: str) -> CachedDataset:
448        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
449
450        This can be used to read and analyze the data in a SQL-based destination.
451
452        TODO: In a future iteration, we can consider providing stream configuration information
453              (catalog information) to the `CachedDataset` object via the "Get stream properties"
454              API: https://reference.airbyte.com/reference/getstreamproperties
455        """
456        return CachedDataset(
457            self.get_sql_cache(),
458            stream_name=stream_name,
459            stream_configuration=False,  # Don't look for stream configuration in cache.
460        )
461
462    def get_sql_database_name(self) -> str:
463        """Return the SQL database name."""
464        cache = self.get_sql_cache()
465        return cache.get_database_name()
466
467    def get_sql_schema_name(self) -> str:
468        """Return the SQL schema name."""
469        cache = self.get_sql_cache()
470        return cache.schema_name
471
472    @property
473    def stream_names(self) -> list[str]:
474        """Return the set of stream names."""
475        return self.connection.stream_names
476
477    @final
478    @property
479    def streams(
480        self,
481    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
482        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
483
484        This is a convenience wrapper around the `stream_names`
485        property and `get_dataset()` method.
486        """
487        return self._SyncResultStreams(self)
488
489    class _SyncResultStreams(Mapping[str, CachedDataset]):
490        """A mapping of stream names to cached datasets."""
491
492        def __init__(
493            self,
494            parent: SyncResult,
495            /,
496        ) -> None:
497            self.parent: SyncResult = parent
498
499        def __getitem__(self, key: str) -> CachedDataset:
500            return self.parent.get_dataset(stream_name=key)
501
502        def __iter__(self) -> Iterator[str]:
503            return iter(self.parent.stream_names)
504
505        def __len__(self) -> int:
506            return len(self.parent.stream_names)
507
508
509__all__ = [
510    "SyncResult",
511    "SyncAttempt",
512]
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: CloudJobInfo | None = None
232    _connection_response: CloudConnectionInfo | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> CloudConnectionInfo:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = CloudConnectionInfo.from_api_response(
254            api_util.get_connection(
255                workspace_id=self.workspace.workspace_id,
256                api_root=self.workspace.api_root,
257                connection_id=self.connection.connection_id,
258                client_id=self.workspace.client_id,
259                client_secret=self.workspace.client_secret,
260                bearer_token=self.workspace.bearer_token,
261            )
262        )
263        return self._connection_response
264
265    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
266        """Return the destination configuration for the sync job."""
267        connection_info = self._get_connection_info(force_refresh=force_refresh)
268        destination_response = api_util.get_destination(
269            destination_id=connection_info.destination_id,
270            api_root=self.workspace.api_root,
271            client_id=self.workspace.client_id,
272            client_secret=self.workspace.client_secret,
273            bearer_token=self.workspace.bearer_token,
274        )
275        return asdict(destination_response.configuration)
276
277    def is_job_complete(self) -> bool:
278        """Check if the sync job is complete."""
279        return self.get_job_status() in FINAL_STATUSES
280
281    def get_job_status(self) -> JobStatusEnum:
282        """Check if the sync job is still running."""
283        return self._fetch_latest_job_info().status
284
285    def _fetch_latest_job_info(self) -> CloudJobInfo:
286        """Return the job info for the sync job."""
287        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
288            return self._latest_job_info
289
290        self._latest_job_info = CloudJobInfo.from_api_response(
291            api_util.get_job_info(
292                job_id=self.job_id,
293                api_root=self.workspace.api_root,
294                client_id=self.workspace.client_id,
295                client_secret=self.workspace.client_secret,
296                bearer_token=self.workspace.bearer_token,
297            )
298        )
299        return self._latest_job_info
300
301    @property
302    def bytes_synced(self) -> int:
303        """Return the number of records processed."""
304        return self._fetch_latest_job_info().bytes_synced or 0
305
306    @property
307    def records_synced(self) -> int:
308        """Return the number of records processed."""
309        return self._fetch_latest_job_info().rows_synced or 0
310
311    @property
312    def start_time(self) -> datetime:
313        """Return the start time of the sync job in UTC."""
314        try:
315            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
316        except (ValueError, TypeError) as e:
317            if "Invalid isoformat string" in str(e):
318                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
319                    api_root=self.workspace.api_root,
320                    config_api_root=self.workspace.config_api_root,
321                    path="/jobs/get",
322                    json={"id": self.job_id},
323                    client_id=self.workspace.client_id,
324                    client_secret=self.workspace.client_secret,
325                    bearer_token=self.workspace.bearer_token,
326                )
327                raw_start_time = job_info_raw.get("startTime")
328                if raw_start_time:
329                    return ab_datetime_parse(raw_start_time)
330            raise
331
332    def _fetch_job_with_attempts(self) -> dict[str, Any]:
333        """Fetch job info with attempts from Config API using lazy loading pattern."""
334        if self._job_with_attempts_info is not None:
335            return self._job_with_attempts_info
336
337        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
338            api_root=self.workspace.api_root,
339            config_api_root=self.workspace.config_api_root,
340            path="/jobs/get",
341            json={
342                "id": self.job_id,
343            },
344            client_id=self.workspace.client_id,
345            client_secret=self.workspace.client_secret,
346            bearer_token=self.workspace.bearer_token,
347        )
348        return self._job_with_attempts_info
349
350    def get_attempts(self) -> list[SyncAttempt]:
351        """Return a list of attempts for this sync job."""
352        job_with_attempts = self._fetch_job_with_attempts()
353        attempts_data = job_with_attempts.get("attempts", [])
354
355        return [
356            SyncAttempt(
357                workspace=self.workspace,
358                connection=self.connection,
359                job_id=self.job_id,
360                attempt_number=i,
361                _attempt_data=attempt_data,
362            )
363            for i, attempt_data in enumerate(attempts_data, start=0)
364        ]
365
366    def raise_failure_status(
367        self,
368        *,
369        refresh_status: bool = False,
370    ) -> None:
371        """Raise an exception if the sync job failed.
372
373        By default, this method will use the latest status available. If you want to refresh the
374        status before checking for failure, set `refresh_status=True`. If the job has failed, this
375        method will raise a `AirbyteConnectionSyncError`.
376
377        Otherwise, do nothing.
378        """
379        if not refresh_status and self._latest_job_info:
380            latest_status = self._latest_job_info.status
381        else:
382            latest_status = self.get_job_status()
383
384        if latest_status in FAILED_STATUSES:
385            raise AirbyteConnectionSyncError(
386                workspace=self.workspace,
387                connection_id=self.connection.connection_id,
388                job_id=self.job_id,
389                job_status=self.get_job_status(),
390            )
391
392    def wait_for_completion(
393        self,
394        *,
395        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
396        raise_timeout: bool = True,
397        raise_failure: bool = False,
398    ) -> JobStatusEnum:
399        """Wait for a job to finish running."""
400        start_time = time.time()
401        while True:
402            latest_status = self.get_job_status()
403            if latest_status in FINAL_STATUSES:
404                if raise_failure:
405                    # No-op if the job succeeded or is still running:
406                    self.raise_failure_status()
407
408                return latest_status
409
410            if time.time() - start_time > wait_timeout:
411                if raise_timeout:
412                    raise AirbyteConnectionSyncTimeoutError(
413                        workspace=self.workspace,
414                        connection_id=self.connection.connection_id,
415                        job_id=self.job_id,
416                        job_status=latest_status,
417                        timeout=wait_timeout,
418                    )
419
420                return latest_status  # This will be a non-final status
421
422            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
423
424    def get_sql_cache(self) -> CacheBase:
425        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
426        if self._cache:
427            return self._cache
428
429        destination_configuration = self._get_destination_configuration()
430        self._cache = destination_to_cache(destination_configuration=destination_configuration)
431        return self._cache
432
433    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
434        """Return a SQL Engine for querying a SQL-based destination."""
435        return self.get_sql_cache().get_sql_engine()
436
437    def get_sql_table_name(self, stream_name: str) -> str:
438        """Return the SQL table name of the named stream."""
439        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
440
441    def get_sql_table(
442        self,
443        stream_name: str,
444    ) -> sqlalchemy.Table:
445        """Return a SQLAlchemy table object for the named stream."""
446        return self.get_sql_cache().processor.get_sql_table(stream_name)
447
448    def get_dataset(self, stream_name: str) -> CachedDataset:
449        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
450
451        This can be used to read and analyze the data in a SQL-based destination.
452
453        TODO: In a future iteration, we can consider providing stream configuration information
454              (catalog information) to the `CachedDataset` object via the "Get stream properties"
455              API: https://reference.airbyte.com/reference/getstreamproperties
456        """
457        return CachedDataset(
458            self.get_sql_cache(),
459            stream_name=stream_name,
460            stream_configuration=False,  # Don't look for stream configuration in cache.
461        )
462
463    def get_sql_database_name(self) -> str:
464        """Return the SQL database name."""
465        cache = self.get_sql_cache()
466        return cache.get_database_name()
467
468    def get_sql_schema_name(self) -> str:
469        """Return the SQL schema name."""
470        cache = self.get_sql_cache()
471        return cache.schema_name
472
473    @property
474    def stream_names(self) -> list[str]:
475        """Return the set of stream names."""
476        return self.connection.stream_names
477
478    @final
479    @property
480    def streams(
481        self,
482    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
483        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
484
485        This is a convenience wrapper around the `stream_names`
486        property and `get_dataset()` method.
487        """
488        return self._SyncResultStreams(self)
489
490    class _SyncResultStreams(Mapping[str, CachedDataset]):
491        """A mapping of stream names to cached datasets."""
492
493        def __init__(
494            self,
495            parent: SyncResult,
496            /,
497        ) -> None:
498            self.parent: SyncResult = parent
499
500        def __getitem__(self, key: str) -> CachedDataset:
501            return self.parent.get_dataset(stream_name=key)
502
503        def __iter__(self) -> Iterator[str]:
504            return iter(self.parent.stream_names)
505
506        def __len__(self) -> int:
507            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte.cloud.models.CloudJobInfo | None = None, _connection_response: airbyte.cloud.models.CloudConnectionInfo | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
277    def is_job_complete(self) -> bool:
278        """Check if the sync job is complete."""
279        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte.cloud.JobStatusEnum:
281    def get_job_status(self) -> JobStatusEnum:
282        """Check if the sync job is still running."""
283        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
301    @property
302    def bytes_synced(self) -> int:
303        """Return the number of records processed."""
304        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
306    @property
307    def records_synced(self) -> int:
308        """Return the number of records processed."""
309        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
311    @property
312    def start_time(self) -> datetime:
313        """Return the start time of the sync job in UTC."""
314        try:
315            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
316        except (ValueError, TypeError) as e:
317            if "Invalid isoformat string" in str(e):
318                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
319                    api_root=self.workspace.api_root,
320                    config_api_root=self.workspace.config_api_root,
321                    path="/jobs/get",
322                    json={"id": self.job_id},
323                    client_id=self.workspace.client_id,
324                    client_secret=self.workspace.client_secret,
325                    bearer_token=self.workspace.bearer_token,
326                )
327                raw_start_time = job_info_raw.get("startTime")
328                if raw_start_time:
329                    return ab_datetime_parse(raw_start_time)
330            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[SyncAttempt]:
350    def get_attempts(self) -> list[SyncAttempt]:
351        """Return a list of attempts for this sync job."""
352        job_with_attempts = self._fetch_job_with_attempts()
353        attempts_data = job_with_attempts.get("attempts", [])
354
355        return [
356            SyncAttempt(
357                workspace=self.workspace,
358                connection=self.connection,
359                job_id=self.job_id,
360                attempt_number=i,
361                _attempt_data=attempt_data,
362            )
363            for i, attempt_data in enumerate(attempts_data, start=0)
364        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
366    def raise_failure_status(
367        self,
368        *,
369        refresh_status: bool = False,
370    ) -> None:
371        """Raise an exception if the sync job failed.
372
373        By default, this method will use the latest status available. If you want to refresh the
374        status before checking for failure, set `refresh_status=True`. If the job has failed, this
375        method will raise a `AirbyteConnectionSyncError`.
376
377        Otherwise, do nothing.
378        """
379        if not refresh_status and self._latest_job_info:
380            latest_status = self._latest_job_info.status
381        else:
382            latest_status = self.get_job_status()
383
384        if latest_status in FAILED_STATUSES:
385            raise AirbyteConnectionSyncError(
386                workspace=self.workspace,
387                connection_id=self.connection.connection_id,
388                job_id=self.job_id,
389                job_status=self.get_job_status(),
390            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte.cloud.JobStatusEnum:
392    def wait_for_completion(
393        self,
394        *,
395        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
396        raise_timeout: bool = True,
397        raise_failure: bool = False,
398    ) -> JobStatusEnum:
399        """Wait for a job to finish running."""
400        start_time = time.time()
401        while True:
402            latest_status = self.get_job_status()
403            if latest_status in FINAL_STATUSES:
404                if raise_failure:
405                    # No-op if the job succeeded or is still running:
406                    self.raise_failure_status()
407
408                return latest_status
409
410            if time.time() - start_time > wait_timeout:
411                if raise_timeout:
412                    raise AirbyteConnectionSyncTimeoutError(
413                        workspace=self.workspace,
414                        connection_id=self.connection.connection_id,
415                        job_id=self.job_id,
416                        job_status=latest_status,
417                        timeout=wait_timeout,
418                    )
419
420                return latest_status  # This will be a non-final status
421
422            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
424    def get_sql_cache(self) -> CacheBase:
425        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
426        if self._cache:
427            return self._cache
428
429        destination_configuration = self._get_destination_configuration()
430        self._cache = destination_to_cache(destination_configuration=destination_configuration)
431        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
433    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
434        """Return a SQL Engine for querying a SQL-based destination."""
435        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
437    def get_sql_table_name(self, stream_name: str) -> str:
438        """Return the SQL table name of the named stream."""
439        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
441    def get_sql_table(
442        self,
443        stream_name: str,
444    ) -> sqlalchemy.Table:
445        """Return a SQLAlchemy table object for the named stream."""
446        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
448    def get_dataset(self, stream_name: str) -> CachedDataset:
449        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
450
451        This can be used to read and analyze the data in a SQL-based destination.
452
453        TODO: In a future iteration, we can consider providing stream configuration information
454              (catalog information) to the `CachedDataset` object via the "Get stream properties"
455              API: https://reference.airbyte.com/reference/getstreamproperties
456        """
457        return CachedDataset(
458            self.get_sql_cache(),
459            stream_name=stream_name,
460            stream_configuration=False,  # Don't look for stream configuration in cache.
461        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
463    def get_sql_database_name(self) -> str:
464        """Return the SQL database name."""
465        cache = self.get_sql_cache()
466        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
468    def get_sql_schema_name(self) -> str:
469        """Return the SQL schema name."""
470        cache = self.get_sql_cache()
471        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
473    @property
474    def stream_names(self) -> list[str]:
475        """Return the set of stream names."""
476        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
478    @final
479    @property
480    def streams(
481        self,
482    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
483        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
484
485        This is a convenience wrapper around the `stream_names`
486        property and `get_dataset()` method.
487        """
488        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

@dataclass
class SyncAttempt:
134@dataclass
135class SyncAttempt:
136    """Represents a single attempt of a sync job.
137
138    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
139    calling `.SyncResult.get_attempts()`.
140    """
141
142    workspace: CloudWorkspace
143    connection: CloudConnection
144    job_id: int
145    attempt_number: int
146    _attempt_data: dict[str, Any] | None = None
147
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]
152
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]
157
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)
162
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)
167
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)
173
174    def _get_attempt_data(self) -> dict[str, Any]:
175        """Get attempt data from the provided attempt data."""
176        if self._attempt_data is None:
177            raise ValueError(
178                "Attempt data not provided. SyncAttempt should be created via "
179                "SyncResult.get_attempts()."
180            )
181        return self._attempt_data["attempt"]
182
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(
207                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
208                    )
209                result = "\n".join(log_lines)
210        elif "logLines" in logs_data:
211            log_lines = logs_data["logLines"]
212            if log_lines:
213                result = "\n".join(log_lines)
214
215        return result

Represents a single attempt of a sync job.

This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by calling .SyncResult.get_attempts().

SyncAttempt( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, attempt_number: int, _attempt_data: dict[str, typing.Any] | None = None)
job_id: int
attempt_number: int
attempt_id: int
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]

Return the attempt ID.

status: str
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]

Return the attempt status.

bytes_synced: int
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)

Return the number of bytes synced in this attempt.

records_synced: int
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)

Return the number of records synced in this attempt.

created_at: datetime.datetime
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)

Return the creation time of the attempt.

def get_full_log_text(self) -> str:
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(
207                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
208                    )
209                result = "\n".join(log_lines)
210        elif "logLines" in logs_data:
211            log_lines = logs_data["logLines"]
212            if log_lines:
213                result = "\n".join(log_lines)
214
215        return result

Return the complete log text for this attempt.

Returns:

String containing all log text for this attempt, with lines separated by newlines.