airbyte.cloud.sync_results

Sync results for Airbyte Cloud workspaces.

Examples

Run a sync job and wait for completion

To get started, we'll need a .CloudConnection object. You can obtain this object by calling .CloudWorkspace.get_connection().

from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Get a connection object
connection = workspace.get_connection(connection_id="456")

Once we have a .CloudConnection object, we can simply call run_sync() to start a sync job and wait for it to complete.

# Run a sync job
sync_result: SyncResult = connection.run_sync()

Run a sync job and return immediately

By default, run_sync() will wait for the job to complete and raise an exception if the job fails. You can instead return immediately by setting wait=False.

# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)

while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)

print(f"Job is complete! Status: {sync_result.get_job_status()}")

Examining the sync result

You can examine the sync result to get more information about the job:

sync_result: SyncResult = connection.run_sync()

# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)

Reading data from Airbyte Cloud sync result

This feature is currently only available for specific SQL-based destinations. This includes SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.

If your destination is supported, you can read records directly from the SyncResult object.

# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()

# Print a list of available stream names
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Sync results for Airbyte Cloud workspaces.
  3
  4## Examples
  5
  6### Run a sync job and wait for completion
  7
  8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling
  9`.CloudWorkspace.get_connection()`.
 10
 11```python
 12from airbyte import cloud
 13
 14# Initialize an Airbyte Cloud workspace object
 15workspace = cloud.CloudWorkspace(
 16    workspace_id="123",
 17    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 18)
 19
 20# Get a connection object
 21connection = workspace.get_connection(connection_id="456")
 22```
 23
 24Once we have a `.CloudConnection` object, we can simply call `run_sync()`
 25to start a sync job and wait for it to complete.
 26
 27```python
 28# Run a sync job
 29sync_result: SyncResult = connection.run_sync()
 30```
 31
 32### Run a sync job and return immediately
 33
 34By default, `run_sync()` will wait for the job to complete and raise an
 35exception if the job fails. You can instead return immediately by setting
 36`wait=False`.
 37
 38```python
 39# Start the sync job and return immediately
 40sync_result: SyncResult = connection.run_sync(wait=False)
 41
 42while not sync_result.is_job_complete():
 43    print("Job is still running...")
 44    time.sleep(5)
 45
 46print(f"Job is complete! Status: {sync_result.get_job_status()}")
 47```
 48
 49### Examining the sync result
 50
 51You can examine the sync result to get more information about the job:
 52
 53```python
 54sync_result: SyncResult = connection.run_sync()
 55
 56# Print the job details
 57print(
 58    f'''
 59    Job ID: {sync_result.job_id}
 60    Job URL: {sync_result.job_url}
 61    Start Time: {sync_result.start_time}
 62    Records Synced: {sync_result.records_synced}
 63    Bytes Synced: {sync_result.bytes_synced}
 64    Job Status: {sync_result.get_job_status()}
 65    List of Stream Names: {', '.join(sync_result.stream_names)}
 66    '''
 67)
 68```
 69
 70### Reading data from Airbyte Cloud sync result
 71
 72**This feature is currently only available for specific SQL-based destinations.** This includes
 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`.
 75
 76If your destination is supported, you can read records directly from the SyncResult object.
 77
 78```python
 79# Assuming we've already created a `connection` object...
 80sync_result = connection.get_sync_result()
 81
 82# Print a list of available stream names
 83print(sync_result.stream_names)
 84
 85# Get a dataset from the sync result
 86dataset: CachedDataset = sync_result.get_dataset("users")
 87
 88# Get the SQLAlchemy table to use in SQL queries...
 89users_table = dataset.to_sql_table()
 90print(f"Table name: {users_table.name}")
 91
 92# Or iterate over the dataset directly
 93for record in dataset:
 94    print(record)
 95```
 96
 97------
 98
 99"""
100
101from __future__ import annotations
102
103import time
104from collections.abc import Iterator, Mapping
105from dataclasses import asdict, dataclass
106from typing import TYPE_CHECKING, Any
107
108from typing_extensions import final
109
110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse
111
112from airbyte._util import api_util
113from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES
114from airbyte.datasets import CachedDataset
115from airbyte.destinations._translate_dest_to_cache import destination_to_cache
116from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError
117
118
119DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60  # 30 minutes
120"""The default timeout for waiting for a sync job to complete, in seconds."""
121
122if TYPE_CHECKING:
123    from datetime import datetime
124
125    import sqlalchemy
126
127    from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum
128    from airbyte.caches.base import CacheBase
129    from airbyte.cloud.connections import CloudConnection
130    from airbyte.cloud.workspaces import CloudWorkspace
131
132
133@dataclass
134class SyncAttempt:
135    """Represents a single attempt of a sync job.
136
137    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
138    calling `.SyncResult.get_attempts()`.
139    """
140
141    workspace: CloudWorkspace
142    connection: CloudConnection
143    job_id: int
144    attempt_number: int
145    _attempt_data: dict[str, Any] | None = None
146
147    @property
148    def attempt_id(self) -> int:
149        """Return the attempt ID."""
150        return self._get_attempt_data()["id"]
151
152    @property
153    def status(self) -> str:
154        """Return the attempt status."""
155        return self._get_attempt_data()["status"]
156
157    @property
158    def bytes_synced(self) -> int:
159        """Return the number of bytes synced in this attempt."""
160        return self._get_attempt_data().get("bytesSynced", 0)
161
162    @property
163    def records_synced(self) -> int:
164        """Return the number of records synced in this attempt."""
165        return self._get_attempt_data().get("recordsSynced", 0)
166
167    @property
168    def created_at(self) -> datetime:
169        """Return the creation time of the attempt."""
170        timestamp = self._get_attempt_data()["createdAt"]
171        return ab_datetime_parse(timestamp)
172
173    def _get_attempt_data(self) -> dict[str, Any]:
174        """Get attempt data from the provided attempt data."""
175        if self._attempt_data is None:
176            raise ValueError(
177                "Attempt data not provided. SyncAttempt should be created via "
178                "SyncResult.get_attempts()."
179            )
180        return self._attempt_data["attempt"]
181
182    def get_full_log_text(self) -> str:
183        """Return the complete log text for this attempt.
184
185        Returns:
186            String containing all log text for this attempt, with lines separated by newlines.
187        """
188        if self._attempt_data is None:
189            return ""
190
191        logs_data = self._attempt_data.get("logs")
192        if not logs_data:
193            return ""
194
195        result = ""
196
197        if "events" in logs_data:
198            log_events = logs_data["events"]
199            if log_events:
200                log_lines = []
201                for event in log_events:
202                    timestamp = event.get("timestamp", "")
203                    level = event.get("level", "INFO")
204                    message = event.get("message", "")
205                    log_lines.append(
206                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
207                    )
208                result = "\n".join(log_lines)
209        elif "logLines" in logs_data:
210            log_lines = logs_data["logLines"]
211            if log_lines:
212                result = "\n".join(log_lines)
213
214        return result
215
216
217@dataclass
218class SyncResult:
219    """The result of a sync operation.
220
221    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
222    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
223    """
224
225    workspace: CloudWorkspace
226    connection: CloudConnection
227    job_id: int
228    table_name_prefix: str = ""
229    table_name_suffix: str = ""
230    _latest_job_info: JobResponse | None = None
231    _connection_response: ConnectionResponse | None = None
232    _cache: CacheBase | None = None
233    _job_with_attempts_info: dict[str, Any] | None = None
234
235    @property
236    def job_url(self) -> str:
237        """Return the URL of the sync job.
238
239        Note: This currently returns the connection's job history URL, as there is no direct URL
240        to a specific job in the Airbyte Cloud web app.
241
242        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
243              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
244        """
245        return f"{self.connection.job_history_url}"
246
247    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
248        """Return connection info for the sync job."""
249        if self._connection_response and not force_refresh:
250            return self._connection_response
251
252        self._connection_response = api_util.get_connection(
253            workspace_id=self.workspace.workspace_id,
254            api_root=self.workspace.api_root,
255            connection_id=self.connection.connection_id,
256            client_id=self.workspace.client_id,
257            client_secret=self.workspace.client_secret,
258        )
259        return self._connection_response
260
261    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
262        """Return the destination configuration for the sync job."""
263        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
264        destination_response = api_util.get_destination(
265            destination_id=connection_info.destination_id,
266            api_root=self.workspace.api_root,
267            client_id=self.workspace.client_id,
268            client_secret=self.workspace.client_secret,
269        )
270        return asdict(destination_response.configuration)
271
272    def is_job_complete(self) -> bool:
273        """Check if the sync job is complete."""
274        return self.get_job_status() in FINAL_STATUSES
275
276    def get_job_status(self) -> JobStatusEnum:
277        """Check if the sync job is still running."""
278        return self._fetch_latest_job_info().status
279
280    def _fetch_latest_job_info(self) -> JobResponse:
281        """Return the job info for the sync job."""
282        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
283            return self._latest_job_info
284
285        self._latest_job_info = api_util.get_job_info(
286            job_id=self.job_id,
287            api_root=self.workspace.api_root,
288            client_id=self.workspace.client_id,
289            client_secret=self.workspace.client_secret,
290        )
291        return self._latest_job_info
292
293    @property
294    def bytes_synced(self) -> int:
295        """Return the number of records processed."""
296        return self._fetch_latest_job_info().bytes_synced or 0
297
298    @property
299    def records_synced(self) -> int:
300        """Return the number of records processed."""
301        return self._fetch_latest_job_info().rows_synced or 0
302
303    @property
304    def start_time(self) -> datetime:
305        """Return the start time of the sync job in UTC."""
306        try:
307            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
308        except (ValueError, TypeError) as e:
309            if "Invalid isoformat string" in str(e):
310                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
311                    api_root=self.workspace.api_root,
312                    path="/jobs/get",
313                    json={"id": self.job_id},
314                    client_id=self.workspace.client_id,
315                    client_secret=self.workspace.client_secret,
316                )
317                raw_start_time = job_info_raw.get("startTime")
318                if raw_start_time:
319                    return ab_datetime_parse(raw_start_time)
320            raise
321
322    def _fetch_job_with_attempts(self) -> dict[str, Any]:
323        """Fetch job info with attempts from Config API using lazy loading pattern."""
324        if self._job_with_attempts_info is not None:
325            return self._job_with_attempts_info
326
327        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
328            api_root=self.workspace.api_root,
329            path="/jobs/get",
330            json={
331                "id": self.job_id,
332            },
333            client_id=self.workspace.client_id,
334            client_secret=self.workspace.client_secret,
335        )
336        return self._job_with_attempts_info
337
338    def get_attempts(self) -> list[SyncAttempt]:
339        """Return a list of attempts for this sync job."""
340        job_with_attempts = self._fetch_job_with_attempts()
341        attempts_data = job_with_attempts.get("attempts", [])
342
343        return [
344            SyncAttempt(
345                workspace=self.workspace,
346                connection=self.connection,
347                job_id=self.job_id,
348                attempt_number=i,
349                _attempt_data=attempt_data,
350            )
351            for i, attempt_data in enumerate(attempts_data, start=0)
352        ]
353
354    def raise_failure_status(
355        self,
356        *,
357        refresh_status: bool = False,
358    ) -> None:
359        """Raise an exception if the sync job failed.
360
361        By default, this method will use the latest status available. If you want to refresh the
362        status before checking for failure, set `refresh_status=True`. If the job has failed, this
363        method will raise a `AirbyteConnectionSyncError`.
364
365        Otherwise, do nothing.
366        """
367        if not refresh_status and self._latest_job_info:
368            latest_status = self._latest_job_info.status
369        else:
370            latest_status = self.get_job_status()
371
372        if latest_status in FAILED_STATUSES:
373            raise AirbyteConnectionSyncError(
374                workspace=self.workspace,
375                connection_id=self.connection.connection_id,
376                job_id=self.job_id,
377                job_status=self.get_job_status(),
378            )
379
380    def wait_for_completion(
381        self,
382        *,
383        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
384        raise_timeout: bool = True,
385        raise_failure: bool = False,
386    ) -> JobStatusEnum:
387        """Wait for a job to finish running."""
388        start_time = time.time()
389        while True:
390            latest_status = self.get_job_status()
391            if latest_status in FINAL_STATUSES:
392                if raise_failure:
393                    # No-op if the job succeeded or is still running:
394                    self.raise_failure_status()
395
396                return latest_status
397
398            if time.time() - start_time > wait_timeout:
399                if raise_timeout:
400                    raise AirbyteConnectionSyncTimeoutError(
401                        workspace=self.workspace,
402                        connection_id=self.connection.connection_id,
403                        job_id=self.job_id,
404                        job_status=latest_status,
405                        timeout=wait_timeout,
406                    )
407
408                return latest_status  # This will be a non-final status
409
410            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
411
412    def get_sql_cache(self) -> CacheBase:
413        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
414        if self._cache:
415            return self._cache
416
417        destination_configuration = self._get_destination_configuration()
418        self._cache = destination_to_cache(destination_configuration=destination_configuration)
419        return self._cache
420
421    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
422        """Return a SQL Engine for querying a SQL-based destination."""
423        return self.get_sql_cache().get_sql_engine()
424
425    def get_sql_table_name(self, stream_name: str) -> str:
426        """Return the SQL table name of the named stream."""
427        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
428
429    def get_sql_table(
430        self,
431        stream_name: str,
432    ) -> sqlalchemy.Table:
433        """Return a SQLAlchemy table object for the named stream."""
434        return self.get_sql_cache().processor.get_sql_table(stream_name)
435
436    def get_dataset(self, stream_name: str) -> CachedDataset:
437        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
438
439        This can be used to read and analyze the data in a SQL-based destination.
440
441        TODO: In a future iteration, we can consider providing stream configuration information
442              (catalog information) to the `CachedDataset` object via the "Get stream properties"
443              API: https://reference.airbyte.com/reference/getstreamproperties
444        """
445        return CachedDataset(
446            self.get_sql_cache(),
447            stream_name=stream_name,
448            stream_configuration=False,  # Don't look for stream configuration in cache.
449        )
450
451    def get_sql_database_name(self) -> str:
452        """Return the SQL database name."""
453        cache = self.get_sql_cache()
454        return cache.get_database_name()
455
456    def get_sql_schema_name(self) -> str:
457        """Return the SQL schema name."""
458        cache = self.get_sql_cache()
459        return cache.schema_name
460
461    @property
462    def stream_names(self) -> list[str]:
463        """Return the set of stream names."""
464        return self.connection.stream_names
465
466    @final
467    @property
468    def streams(
469        self,
470    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
471        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
472
473        This is a convenience wrapper around the `stream_names`
474        property and `get_dataset()` method.
475        """
476        return self._SyncResultStreams(self)
477
478    class _SyncResultStreams(Mapping[str, CachedDataset]):
479        """A mapping of stream names to cached datasets."""
480
481        def __init__(
482            self,
483            parent: SyncResult,
484            /,
485        ) -> None:
486            self.parent: SyncResult = parent
487
488        def __getitem__(self, key: str) -> CachedDataset:
489            return self.parent.get_dataset(stream_name=key)
490
491        def __iter__(self) -> Iterator[str]:
492            return iter(self.parent.stream_names)
493
494        def __len__(self) -> int:
495            return len(self.parent.stream_names)
496
497
498__all__ = [
499    "SyncResult",
500    "SyncAttempt",
501]
@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: JobResponse | None = None
232    _connection_response: ConnectionResponse | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = api_util.get_connection(
254            workspace_id=self.workspace.workspace_id,
255            api_root=self.workspace.api_root,
256            connection_id=self.connection.connection_id,
257            client_id=self.workspace.client_id,
258            client_secret=self.workspace.client_secret,
259        )
260        return self._connection_response
261
262    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
263        """Return the destination configuration for the sync job."""
264        connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh)
265        destination_response = api_util.get_destination(
266            destination_id=connection_info.destination_id,
267            api_root=self.workspace.api_root,
268            client_id=self.workspace.client_id,
269            client_secret=self.workspace.client_secret,
270        )
271        return asdict(destination_response.configuration)
272
273    def is_job_complete(self) -> bool:
274        """Check if the sync job is complete."""
275        return self.get_job_status() in FINAL_STATUSES
276
277    def get_job_status(self) -> JobStatusEnum:
278        """Check if the sync job is still running."""
279        return self._fetch_latest_job_info().status
280
281    def _fetch_latest_job_info(self) -> JobResponse:
282        """Return the job info for the sync job."""
283        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
284            return self._latest_job_info
285
286        self._latest_job_info = api_util.get_job_info(
287            job_id=self.job_id,
288            api_root=self.workspace.api_root,
289            client_id=self.workspace.client_id,
290            client_secret=self.workspace.client_secret,
291        )
292        return self._latest_job_info
293
294    @property
295    def bytes_synced(self) -> int:
296        """Return the number of records processed."""
297        return self._fetch_latest_job_info().bytes_synced or 0
298
299    @property
300    def records_synced(self) -> int:
301        """Return the number of records processed."""
302        return self._fetch_latest_job_info().rows_synced or 0
303
304    @property
305    def start_time(self) -> datetime:
306        """Return the start time of the sync job in UTC."""
307        try:
308            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
309        except (ValueError, TypeError) as e:
310            if "Invalid isoformat string" in str(e):
311                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
312                    api_root=self.workspace.api_root,
313                    path="/jobs/get",
314                    json={"id": self.job_id},
315                    client_id=self.workspace.client_id,
316                    client_secret=self.workspace.client_secret,
317                )
318                raw_start_time = job_info_raw.get("startTime")
319                if raw_start_time:
320                    return ab_datetime_parse(raw_start_time)
321            raise
322
323    def _fetch_job_with_attempts(self) -> dict[str, Any]:
324        """Fetch job info with attempts from Config API using lazy loading pattern."""
325        if self._job_with_attempts_info is not None:
326            return self._job_with_attempts_info
327
328        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
329            api_root=self.workspace.api_root,
330            path="/jobs/get",
331            json={
332                "id": self.job_id,
333            },
334            client_id=self.workspace.client_id,
335            client_secret=self.workspace.client_secret,
336        )
337        return self._job_with_attempts_info
338
339    def get_attempts(self) -> list[SyncAttempt]:
340        """Return a list of attempts for this sync job."""
341        job_with_attempts = self._fetch_job_with_attempts()
342        attempts_data = job_with_attempts.get("attempts", [])
343
344        return [
345            SyncAttempt(
346                workspace=self.workspace,
347                connection=self.connection,
348                job_id=self.job_id,
349                attempt_number=i,
350                _attempt_data=attempt_data,
351            )
352            for i, attempt_data in enumerate(attempts_data, start=0)
353        ]
354
355    def raise_failure_status(
356        self,
357        *,
358        refresh_status: bool = False,
359    ) -> None:
360        """Raise an exception if the sync job failed.
361
362        By default, this method will use the latest status available. If you want to refresh the
363        status before checking for failure, set `refresh_status=True`. If the job has failed, this
364        method will raise a `AirbyteConnectionSyncError`.
365
366        Otherwise, do nothing.
367        """
368        if not refresh_status and self._latest_job_info:
369            latest_status = self._latest_job_info.status
370        else:
371            latest_status = self.get_job_status()
372
373        if latest_status in FAILED_STATUSES:
374            raise AirbyteConnectionSyncError(
375                workspace=self.workspace,
376                connection_id=self.connection.connection_id,
377                job_id=self.job_id,
378                job_status=self.get_job_status(),
379            )
380
381    def wait_for_completion(
382        self,
383        *,
384        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
385        raise_timeout: bool = True,
386        raise_failure: bool = False,
387    ) -> JobStatusEnum:
388        """Wait for a job to finish running."""
389        start_time = time.time()
390        while True:
391            latest_status = self.get_job_status()
392            if latest_status in FINAL_STATUSES:
393                if raise_failure:
394                    # No-op if the job succeeded or is still running:
395                    self.raise_failure_status()
396
397                return latest_status
398
399            if time.time() - start_time > wait_timeout:
400                if raise_timeout:
401                    raise AirbyteConnectionSyncTimeoutError(
402                        workspace=self.workspace,
403                        connection_id=self.connection.connection_id,
404                        job_id=self.job_id,
405                        job_status=latest_status,
406                        timeout=wait_timeout,
407                    )
408
409                return latest_status  # This will be a non-final status
410
411            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
412
413    def get_sql_cache(self) -> CacheBase:
414        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
415        if self._cache:
416            return self._cache
417
418        destination_configuration = self._get_destination_configuration()
419        self._cache = destination_to_cache(destination_configuration=destination_configuration)
420        return self._cache
421
422    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
423        """Return a SQL Engine for querying a SQL-based destination."""
424        return self.get_sql_cache().get_sql_engine()
425
426    def get_sql_table_name(self, stream_name: str) -> str:
427        """Return the SQL table name of the named stream."""
428        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
429
430    def get_sql_table(
431        self,
432        stream_name: str,
433    ) -> sqlalchemy.Table:
434        """Return a SQLAlchemy table object for the named stream."""
435        return self.get_sql_cache().processor.get_sql_table(stream_name)
436
437    def get_dataset(self, stream_name: str) -> CachedDataset:
438        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
439
440        This can be used to read and analyze the data in a SQL-based destination.
441
442        TODO: In a future iteration, we can consider providing stream configuration information
443              (catalog information) to the `CachedDataset` object via the "Get stream properties"
444              API: https://reference.airbyte.com/reference/getstreamproperties
445        """
446        return CachedDataset(
447            self.get_sql_cache(),
448            stream_name=stream_name,
449            stream_configuration=False,  # Don't look for stream configuration in cache.
450        )
451
452    def get_sql_database_name(self) -> str:
453        """Return the SQL database name."""
454        cache = self.get_sql_cache()
455        return cache.get_database_name()
456
457    def get_sql_schema_name(self) -> str:
458        """Return the SQL schema name."""
459        cache = self.get_sql_cache()
460        return cache.schema_name
461
462    @property
463    def stream_names(self) -> list[str]:
464        """Return the set of stream names."""
465        return self.connection.stream_names
466
467    @final
468    @property
469    def streams(
470        self,
471    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
472        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
473
474        This is a convenience wrapper around the `stream_names`
475        property and `get_dataset()` method.
476        """
477        return self._SyncResultStreams(self)
478
479    class _SyncResultStreams(Mapping[str, CachedDataset]):
480        """A mapping of stream names to cached datasets."""
481
482        def __init__(
483            self,
484            parent: SyncResult,
485            /,
486        ) -> None:
487            self.parent: SyncResult = parent
488
489        def __getitem__(self, key: str) -> CachedDataset:
490            return self.parent.get_dataset(stream_name=key)
491
492        def __iter__(self) -> Iterator[str]:
493            return iter(self.parent.stream_names)
494
495        def __len__(self) -> int:
496            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte_api.models.jobresponse.JobResponse | None = None, _connection_response: airbyte_api.models.connectionresponse.ConnectionResponse | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
273    def is_job_complete(self) -> bool:
274        """Check if the sync job is complete."""
275        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
277    def get_job_status(self) -> JobStatusEnum:
278        """Check if the sync job is still running."""
279        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
294    @property
295    def bytes_synced(self) -> int:
296        """Return the number of records processed."""
297        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
299    @property
300    def records_synced(self) -> int:
301        """Return the number of records processed."""
302        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
304    @property
305    def start_time(self) -> datetime:
306        """Return the start time of the sync job in UTC."""
307        try:
308            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
309        except (ValueError, TypeError) as e:
310            if "Invalid isoformat string" in str(e):
311                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
312                    api_root=self.workspace.api_root,
313                    path="/jobs/get",
314                    json={"id": self.job_id},
315                    client_id=self.workspace.client_id,
316                    client_secret=self.workspace.client_secret,
317                )
318                raw_start_time = job_info_raw.get("startTime")
319                if raw_start_time:
320                    return ab_datetime_parse(raw_start_time)
321            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[SyncAttempt]:
339    def get_attempts(self) -> list[SyncAttempt]:
340        """Return a list of attempts for this sync job."""
341        job_with_attempts = self._fetch_job_with_attempts()
342        attempts_data = job_with_attempts.get("attempts", [])
343
344        return [
345            SyncAttempt(
346                workspace=self.workspace,
347                connection=self.connection,
348                job_id=self.job_id,
349                attempt_number=i,
350                _attempt_data=attempt_data,
351            )
352            for i, attempt_data in enumerate(attempts_data, start=0)
353        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
355    def raise_failure_status(
356        self,
357        *,
358        refresh_status: bool = False,
359    ) -> None:
360        """Raise an exception if the sync job failed.
361
362        By default, this method will use the latest status available. If you want to refresh the
363        status before checking for failure, set `refresh_status=True`. If the job has failed, this
364        method will raise a `AirbyteConnectionSyncError`.
365
366        Otherwise, do nothing.
367        """
368        if not refresh_status and self._latest_job_info:
369            latest_status = self._latest_job_info.status
370        else:
371            latest_status = self.get_job_status()
372
373        if latest_status in FAILED_STATUSES:
374            raise AirbyteConnectionSyncError(
375                workspace=self.workspace,
376                connection_id=self.connection.connection_id,
377                job_id=self.job_id,
378                job_status=self.get_job_status(),
379            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> airbyte_api.models.jobstatusenum.JobStatusEnum:
381    def wait_for_completion(
382        self,
383        *,
384        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
385        raise_timeout: bool = True,
386        raise_failure: bool = False,
387    ) -> JobStatusEnum:
388        """Wait for a job to finish running."""
389        start_time = time.time()
390        while True:
391            latest_status = self.get_job_status()
392            if latest_status in FINAL_STATUSES:
393                if raise_failure:
394                    # No-op if the job succeeded or is still running:
395                    self.raise_failure_status()
396
397                return latest_status
398
399            if time.time() - start_time > wait_timeout:
400                if raise_timeout:
401                    raise AirbyteConnectionSyncTimeoutError(
402                        workspace=self.workspace,
403                        connection_id=self.connection.connection_id,
404                        job_id=self.job_id,
405                        job_status=latest_status,
406                        timeout=wait_timeout,
407                    )
408
409                return latest_status  # This will be a non-final status
410
411            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
413    def get_sql_cache(self) -> CacheBase:
414        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
415        if self._cache:
416            return self._cache
417
418        destination_configuration = self._get_destination_configuration()
419        self._cache = destination_to_cache(destination_configuration=destination_configuration)
420        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
422    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
423        """Return a SQL Engine for querying a SQL-based destination."""
424        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
426    def get_sql_table_name(self, stream_name: str) -> str:
427        """Return the SQL table name of the named stream."""
428        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
430    def get_sql_table(
431        self,
432        stream_name: str,
433    ) -> sqlalchemy.Table:
434        """Return a SQLAlchemy table object for the named stream."""
435        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
437    def get_dataset(self, stream_name: str) -> CachedDataset:
438        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
439
440        This can be used to read and analyze the data in a SQL-based destination.
441
442        TODO: In a future iteration, we can consider providing stream configuration information
443              (catalog information) to the `CachedDataset` object via the "Get stream properties"
444              API: https://reference.airbyte.com/reference/getstreamproperties
445        """
446        return CachedDataset(
447            self.get_sql_cache(),
448            stream_name=stream_name,
449            stream_configuration=False,  # Don't look for stream configuration in cache.
450        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
452    def get_sql_database_name(self) -> str:
453        """Return the SQL database name."""
454        cache = self.get_sql_cache()
455        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
457    def get_sql_schema_name(self) -> str:
458        """Return the SQL schema name."""
459        cache = self.get_sql_cache()
460        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
462    @property
463    def stream_names(self) -> list[str]:
464        """Return the set of stream names."""
465        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
467    @final
468    @property
469    def streams(
470        self,
471    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
472        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
473
474        This is a convenience wrapper around the `stream_names`
475        property and `get_dataset()` method.
476        """
477        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

@dataclass
class SyncAttempt:
134@dataclass
135class SyncAttempt:
136    """Represents a single attempt of a sync job.
137
138    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by
139    calling `.SyncResult.get_attempts()`.
140    """
141
142    workspace: CloudWorkspace
143    connection: CloudConnection
144    job_id: int
145    attempt_number: int
146    _attempt_data: dict[str, Any] | None = None
147
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]
152
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]
157
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)
162
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)
167
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)
173
174    def _get_attempt_data(self) -> dict[str, Any]:
175        """Get attempt data from the provided attempt data."""
176        if self._attempt_data is None:
177            raise ValueError(
178                "Attempt data not provided. SyncAttempt should be created via "
179                "SyncResult.get_attempts()."
180            )
181        return self._attempt_data["attempt"]
182
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(
207                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
208                    )
209                result = "\n".join(log_lines)
210        elif "logLines" in logs_data:
211            log_lines = logs_data["logLines"]
212            if log_lines:
213                result = "\n".join(log_lines)
214
215        return result

Represents a single attempt of a sync job.

This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by calling .SyncResult.get_attempts().

SyncAttempt( workspace: airbyte.cloud.CloudWorkspace, connection: airbyte.cloud.CloudConnection, job_id: int, attempt_number: int, _attempt_data: dict[str, typing.Any] | None = None)
job_id: int
attempt_number: int
attempt_id: int
148    @property
149    def attempt_id(self) -> int:
150        """Return the attempt ID."""
151        return self._get_attempt_data()["id"]

Return the attempt ID.

status: str
153    @property
154    def status(self) -> str:
155        """Return the attempt status."""
156        return self._get_attempt_data()["status"]

Return the attempt status.

bytes_synced: int
158    @property
159    def bytes_synced(self) -> int:
160        """Return the number of bytes synced in this attempt."""
161        return self._get_attempt_data().get("bytesSynced", 0)

Return the number of bytes synced in this attempt.

records_synced: int
163    @property
164    def records_synced(self) -> int:
165        """Return the number of records synced in this attempt."""
166        return self._get_attempt_data().get("recordsSynced", 0)

Return the number of records synced in this attempt.

created_at: datetime.datetime
168    @property
169    def created_at(self) -> datetime:
170        """Return the creation time of the attempt."""
171        timestamp = self._get_attempt_data()["createdAt"]
172        return ab_datetime_parse(timestamp)

Return the creation time of the attempt.

def get_full_log_text(self) -> str:
183    def get_full_log_text(self) -> str:
184        """Return the complete log text for this attempt.
185
186        Returns:
187            String containing all log text for this attempt, with lines separated by newlines.
188        """
189        if self._attempt_data is None:
190            return ""
191
192        logs_data = self._attempt_data.get("logs")
193        if not logs_data:
194            return ""
195
196        result = ""
197
198        if "events" in logs_data:
199            log_events = logs_data["events"]
200            if log_events:
201                log_lines = []
202                for event in log_events:
203                    timestamp = event.get("timestamp", "")
204                    level = event.get("level", "INFO")
205                    message = event.get("message", "")
206                    log_lines.append(
207                        f"[{timestamp}] {level}: {message}"  # pyrefly: ignore[bad-argument-type]
208                    )
209                result = "\n".join(log_lines)
210        elif "logLines" in logs_data:
211            log_lines = logs_data["logLines"]
212            if log_lines:
213                result = "\n".join(log_lines)
214
215        return result

Return the complete log text for this attempt.

Returns:

String containing all log text for this attempt, with lines separated by newlines.