airbyte.cloud.sync_results      
                        Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection object. You can obtain this object by calling
.CloudWorkspace.get_connection().
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync() will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
    print("Job is still running...")
    time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
    f'''
    Job ID: {sync_result.job_id}
    Job URL: {sync_result.job_url}
    Start Time: {sync_result.start_time}
    Records Synced: {sync_result.records_synced}
    Bytes Synced: {sync_result.bytes_synced}
    Job Status: {sync_result.get_job_status()}
    List of Stream Names: {', '.join(sync_result.stream_names)}
    '''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
    print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import asdict, dataclass 106from typing import TYPE_CHECKING, Any 107 108from typing_extensions import final 109 110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse 111 112from airbyte._util import api_util 113from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 114from airbyte.datasets import CachedDataset 115from airbyte.destinations._translate_dest_to_cache import destination_to_cache 116from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 117 118 119DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 120"""The default timeout for waiting for a sync job to complete, in seconds.""" 121 122if TYPE_CHECKING: 123 from datetime import datetime 124 125 import sqlalchemy 126 127 from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum 128 from airbyte.caches.base import CacheBase 129 from airbyte.cloud.connections import CloudConnection 130 from airbyte.cloud.workspaces import CloudWorkspace 131 132 133@dataclass 134class SyncAttempt: 135 """Represents a single attempt of a sync job. 136 137 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 138 calling `.SyncResult.get_attempts()`. 139 """ 140 141 workspace: CloudWorkspace 142 connection: CloudConnection 143 job_id: int 144 attempt_number: int 145 _attempt_data: dict[str, Any] | None = None 146 147 @property 148 def attempt_id(self) -> int: 149 """Return the attempt ID.""" 150 return self._get_attempt_data()["id"] 151 152 @property 153 def status(self) -> str: 154 """Return the attempt status.""" 155 return self._get_attempt_data()["status"] 156 157 @property 158 def bytes_synced(self) -> int: 159 """Return the number of bytes synced in this attempt.""" 160 return self._get_attempt_data().get("bytesSynced", 0) 161 162 @property 163 def records_synced(self) -> int: 164 """Return the number of records synced in this attempt.""" 165 return self._get_attempt_data().get("recordsSynced", 0) 166 167 @property 168 def created_at(self) -> datetime: 169 """Return the creation time of the attempt.""" 170 timestamp = self._get_attempt_data()["createdAt"] 171 return ab_datetime_parse(timestamp) 172 173 def _get_attempt_data(self) -> dict[str, Any]: 174 """Get attempt data from the provided attempt data.""" 175 if self._attempt_data is None: 176 raise ValueError( 177 "Attempt data not provided. SyncAttempt should be created via " 178 "SyncResult.get_attempts()." 179 ) 180 return self._attempt_data["attempt"] 181 182 def get_full_log_text(self) -> str: 183 """Return the complete log text for this attempt. 184 185 Returns: 186 String containing all log text for this attempt, with lines separated by newlines. 187 """ 188 if self._attempt_data is None: 189 return "" 190 191 logs_data = self._attempt_data.get("logs") 192 if not logs_data: 193 return "" 194 195 result = "" 196 197 if "events" in logs_data: 198 log_events = logs_data["events"] 199 if log_events: 200 log_lines = [] 201 for event in log_events: 202 timestamp = event.get("timestamp", "") 203 level = event.get("level", "INFO") 204 message = event.get("message", "") 205 log_lines.append( 206 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 207 ) 208 result = "\n".join(log_lines) 209 elif "logLines" in logs_data: 210 log_lines = logs_data["logLines"] 211 if log_lines: 212 result = "\n".join(log_lines) 213 214 return result 215 216 217@dataclass 218class SyncResult: 219 """The result of a sync operation. 220 221 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 222 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 223 """ 224 225 workspace: CloudWorkspace 226 connection: CloudConnection 227 job_id: int 228 table_name_prefix: str = "" 229 table_name_suffix: str = "" 230 _latest_job_info: JobResponse | None = None 231 _connection_response: ConnectionResponse | None = None 232 _cache: CacheBase | None = None 233 _job_with_attempts_info: dict[str, Any] | None = None 234 235 @property 236 def job_url(self) -> str: 237 """Return the URL of the sync job. 238 239 Note: This currently returns the connection's job history URL, as there is no direct URL 240 to a specific job in the Airbyte Cloud web app. 241 242 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 243 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 244 """ 245 return f"{self.connection.job_history_url}" 246 247 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 248 """Return connection info for the sync job.""" 249 if self._connection_response and not force_refresh: 250 return self._connection_response 251 252 self._connection_response = api_util.get_connection( 253 workspace_id=self.workspace.workspace_id, 254 api_root=self.workspace.api_root, 255 connection_id=self.connection.connection_id, 256 client_id=self.workspace.client_id, 257 client_secret=self.workspace.client_secret, 258 ) 259 return self._connection_response 260 261 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 262 """Return the destination configuration for the sync job.""" 263 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 264 destination_response = api_util.get_destination( 265 destination_id=connection_info.destination_id, 266 api_root=self.workspace.api_root, 267 client_id=self.workspace.client_id, 268 client_secret=self.workspace.client_secret, 269 ) 270 return asdict(destination_response.configuration) 271 272 def is_job_complete(self) -> bool: 273 """Check if the sync job is complete.""" 274 return self.get_job_status() in FINAL_STATUSES 275 276 def get_job_status(self) -> JobStatusEnum: 277 """Check if the sync job is still running.""" 278 return self._fetch_latest_job_info().status 279 280 def _fetch_latest_job_info(self) -> JobResponse: 281 """Return the job info for the sync job.""" 282 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 283 return self._latest_job_info 284 285 self._latest_job_info = api_util.get_job_info( 286 job_id=self.job_id, 287 api_root=self.workspace.api_root, 288 client_id=self.workspace.client_id, 289 client_secret=self.workspace.client_secret, 290 ) 291 return self._latest_job_info 292 293 @property 294 def bytes_synced(self) -> int: 295 """Return the number of records processed.""" 296 return self._fetch_latest_job_info().bytes_synced or 0 297 298 @property 299 def records_synced(self) -> int: 300 """Return the number of records processed.""" 301 return self._fetch_latest_job_info().rows_synced or 0 302 303 @property 304 def start_time(self) -> datetime: 305 """Return the start time of the sync job in UTC.""" 306 try: 307 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 308 except (ValueError, TypeError) as e: 309 if "Invalid isoformat string" in str(e): 310 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 311 api_root=self.workspace.api_root, 312 path="/jobs/get", 313 json={"id": self.job_id}, 314 client_id=self.workspace.client_id, 315 client_secret=self.workspace.client_secret, 316 ) 317 raw_start_time = job_info_raw.get("startTime") 318 if raw_start_time: 319 return ab_datetime_parse(raw_start_time) 320 raise 321 322 def _fetch_job_with_attempts(self) -> dict[str, Any]: 323 """Fetch job info with attempts from Config API using lazy loading pattern.""" 324 if self._job_with_attempts_info is not None: 325 return self._job_with_attempts_info 326 327 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 328 api_root=self.workspace.api_root, 329 path="/jobs/get", 330 json={ 331 "id": self.job_id, 332 }, 333 client_id=self.workspace.client_id, 334 client_secret=self.workspace.client_secret, 335 ) 336 return self._job_with_attempts_info 337 338 def get_attempts(self) -> list[SyncAttempt]: 339 """Return a list of attempts for this sync job.""" 340 job_with_attempts = self._fetch_job_with_attempts() 341 attempts_data = job_with_attempts.get("attempts", []) 342 343 return [ 344 SyncAttempt( 345 workspace=self.workspace, 346 connection=self.connection, 347 job_id=self.job_id, 348 attempt_number=i, 349 _attempt_data=attempt_data, 350 ) 351 for i, attempt_data in enumerate(attempts_data, start=0) 352 ] 353 354 def raise_failure_status( 355 self, 356 *, 357 refresh_status: bool = False, 358 ) -> None: 359 """Raise an exception if the sync job failed. 360 361 By default, this method will use the latest status available. If you want to refresh the 362 status before checking for failure, set `refresh_status=True`. If the job has failed, this 363 method will raise a `AirbyteConnectionSyncError`. 364 365 Otherwise, do nothing. 366 """ 367 if not refresh_status and self._latest_job_info: 368 latest_status = self._latest_job_info.status 369 else: 370 latest_status = self.get_job_status() 371 372 if latest_status in FAILED_STATUSES: 373 raise AirbyteConnectionSyncError( 374 workspace=self.workspace, 375 connection_id=self.connection.connection_id, 376 job_id=self.job_id, 377 job_status=self.get_job_status(), 378 ) 379 380 def wait_for_completion( 381 self, 382 *, 383 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 384 raise_timeout: bool = True, 385 raise_failure: bool = False, 386 ) -> JobStatusEnum: 387 """Wait for a job to finish running.""" 388 start_time = time.time() 389 while True: 390 latest_status = self.get_job_status() 391 if latest_status in FINAL_STATUSES: 392 if raise_failure: 393 # No-op if the job succeeded or is still running: 394 self.raise_failure_status() 395 396 return latest_status 397 398 if time.time() - start_time > wait_timeout: 399 if raise_timeout: 400 raise AirbyteConnectionSyncTimeoutError( 401 workspace=self.workspace, 402 connection_id=self.connection.connection_id, 403 job_id=self.job_id, 404 job_status=latest_status, 405 timeout=wait_timeout, 406 ) 407 408 return latest_status # This will be a non-final status 409 410 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 411 412 def get_sql_cache(self) -> CacheBase: 413 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 414 if self._cache: 415 return self._cache 416 417 destination_configuration = self._get_destination_configuration() 418 self._cache = destination_to_cache(destination_configuration=destination_configuration) 419 return self._cache 420 421 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 422 """Return a SQL Engine for querying a SQL-based destination.""" 423 return self.get_sql_cache().get_sql_engine() 424 425 def get_sql_table_name(self, stream_name: str) -> str: 426 """Return the SQL table name of the named stream.""" 427 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 428 429 def get_sql_table( 430 self, 431 stream_name: str, 432 ) -> sqlalchemy.Table: 433 """Return a SQLAlchemy table object for the named stream.""" 434 return self.get_sql_cache().processor.get_sql_table(stream_name) 435 436 def get_dataset(self, stream_name: str) -> CachedDataset: 437 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 438 439 This can be used to read and analyze the data in a SQL-based destination. 440 441 TODO: In a future iteration, we can consider providing stream configuration information 442 (catalog information) to the `CachedDataset` object via the "Get stream properties" 443 API: https://reference.airbyte.com/reference/getstreamproperties 444 """ 445 return CachedDataset( 446 self.get_sql_cache(), 447 stream_name=stream_name, 448 stream_configuration=False, # Don't look for stream configuration in cache. 449 ) 450 451 def get_sql_database_name(self) -> str: 452 """Return the SQL database name.""" 453 cache = self.get_sql_cache() 454 return cache.get_database_name() 455 456 def get_sql_schema_name(self) -> str: 457 """Return the SQL schema name.""" 458 cache = self.get_sql_cache() 459 return cache.schema_name 460 461 @property 462 def stream_names(self) -> list[str]: 463 """Return the set of stream names.""" 464 return self.connection.stream_names 465 466 @final 467 @property 468 def streams( 469 self, 470 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 471 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 472 473 This is a convenience wrapper around the `stream_names` 474 property and `get_dataset()` method. 475 """ 476 return self._SyncResultStreams(self) 477 478 class _SyncResultStreams(Mapping[str, CachedDataset]): 479 """A mapping of stream names to cached datasets.""" 480 481 def __init__( 482 self, 483 parent: SyncResult, 484 /, 485 ) -> None: 486 self.parent: SyncResult = parent 487 488 def __getitem__(self, key: str) -> CachedDataset: 489 return self.parent.get_dataset(stream_name=key) 490 491 def __iter__(self) -> Iterator[str]: 492 return iter(self.parent.stream_names) 493 494 def __len__(self) -> int: 495 return len(self.parent.stream_names) 496 497 498__all__ = [ 499 "SyncResult", 500 "SyncAttempt", 501]
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: JobResponse | None = None 232 _connection_response: ConnectionResponse | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 ) 260 return self._connection_response 261 262 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 263 """Return the destination configuration for the sync job.""" 264 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 265 destination_response = api_util.get_destination( 266 destination_id=connection_info.destination_id, 267 api_root=self.workspace.api_root, 268 client_id=self.workspace.client_id, 269 client_secret=self.workspace.client_secret, 270 ) 271 return asdict(destination_response.configuration) 272 273 def is_job_complete(self) -> bool: 274 """Check if the sync job is complete.""" 275 return self.get_job_status() in FINAL_STATUSES 276 277 def get_job_status(self) -> JobStatusEnum: 278 """Check if the sync job is still running.""" 279 return self._fetch_latest_job_info().status 280 281 def _fetch_latest_job_info(self) -> JobResponse: 282 """Return the job info for the sync job.""" 283 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 284 return self._latest_job_info 285 286 self._latest_job_info = api_util.get_job_info( 287 job_id=self.job_id, 288 api_root=self.workspace.api_root, 289 client_id=self.workspace.client_id, 290 client_secret=self.workspace.client_secret, 291 ) 292 return self._latest_job_info 293 294 @property 295 def bytes_synced(self) -> int: 296 """Return the number of records processed.""" 297 return self._fetch_latest_job_info().bytes_synced or 0 298 299 @property 300 def records_synced(self) -> int: 301 """Return the number of records processed.""" 302 return self._fetch_latest_job_info().rows_synced or 0 303 304 @property 305 def start_time(self) -> datetime: 306 """Return the start time of the sync job in UTC.""" 307 try: 308 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 309 except (ValueError, TypeError) as e: 310 if "Invalid isoformat string" in str(e): 311 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 312 api_root=self.workspace.api_root, 313 path="/jobs/get", 314 json={"id": self.job_id}, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 ) 318 raw_start_time = job_info_raw.get("startTime") 319 if raw_start_time: 320 return ab_datetime_parse(raw_start_time) 321 raise 322 323 def _fetch_job_with_attempts(self) -> dict[str, Any]: 324 """Fetch job info with attempts from Config API using lazy loading pattern.""" 325 if self._job_with_attempts_info is not None: 326 return self._job_with_attempts_info 327 328 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 329 api_root=self.workspace.api_root, 330 path="/jobs/get", 331 json={ 332 "id": self.job_id, 333 }, 334 client_id=self.workspace.client_id, 335 client_secret=self.workspace.client_secret, 336 ) 337 return self._job_with_attempts_info 338 339 def get_attempts(self) -> list[SyncAttempt]: 340 """Return a list of attempts for this sync job.""" 341 job_with_attempts = self._fetch_job_with_attempts() 342 attempts_data = job_with_attempts.get("attempts", []) 343 344 return [ 345 SyncAttempt( 346 workspace=self.workspace, 347 connection=self.connection, 348 job_id=self.job_id, 349 attempt_number=i, 350 _attempt_data=attempt_data, 351 ) 352 for i, attempt_data in enumerate(attempts_data, start=0) 353 ] 354 355 def raise_failure_status( 356 self, 357 *, 358 refresh_status: bool = False, 359 ) -> None: 360 """Raise an exception if the sync job failed. 361 362 By default, this method will use the latest status available. If you want to refresh the 363 status before checking for failure, set `refresh_status=True`. If the job has failed, this 364 method will raise a `AirbyteConnectionSyncError`. 365 366 Otherwise, do nothing. 367 """ 368 if not refresh_status and self._latest_job_info: 369 latest_status = self._latest_job_info.status 370 else: 371 latest_status = self.get_job_status() 372 373 if latest_status in FAILED_STATUSES: 374 raise AirbyteConnectionSyncError( 375 workspace=self.workspace, 376 connection_id=self.connection.connection_id, 377 job_id=self.job_id, 378 job_status=self.get_job_status(), 379 ) 380 381 def wait_for_completion( 382 self, 383 *, 384 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 385 raise_timeout: bool = True, 386 raise_failure: bool = False, 387 ) -> JobStatusEnum: 388 """Wait for a job to finish running.""" 389 start_time = time.time() 390 while True: 391 latest_status = self.get_job_status() 392 if latest_status in FINAL_STATUSES: 393 if raise_failure: 394 # No-op if the job succeeded or is still running: 395 self.raise_failure_status() 396 397 return latest_status 398 399 if time.time() - start_time > wait_timeout: 400 if raise_timeout: 401 raise AirbyteConnectionSyncTimeoutError( 402 workspace=self.workspace, 403 connection_id=self.connection.connection_id, 404 job_id=self.job_id, 405 job_status=latest_status, 406 timeout=wait_timeout, 407 ) 408 409 return latest_status # This will be a non-final status 410 411 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 412 413 def get_sql_cache(self) -> CacheBase: 414 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 415 if self._cache: 416 return self._cache 417 418 destination_configuration = self._get_destination_configuration() 419 self._cache = destination_to_cache(destination_configuration=destination_configuration) 420 return self._cache 421 422 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 423 """Return a SQL Engine for querying a SQL-based destination.""" 424 return self.get_sql_cache().get_sql_engine() 425 426 def get_sql_table_name(self, stream_name: str) -> str: 427 """Return the SQL table name of the named stream.""" 428 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 429 430 def get_sql_table( 431 self, 432 stream_name: str, 433 ) -> sqlalchemy.Table: 434 """Return a SQLAlchemy table object for the named stream.""" 435 return self.get_sql_cache().processor.get_sql_table(stream_name) 436 437 def get_dataset(self, stream_name: str) -> CachedDataset: 438 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 439 440 This can be used to read and analyze the data in a SQL-based destination. 441 442 TODO: In a future iteration, we can consider providing stream configuration information 443 (catalog information) to the `CachedDataset` object via the "Get stream properties" 444 API: https://reference.airbyte.com/reference/getstreamproperties 445 """ 446 return CachedDataset( 447 self.get_sql_cache(), 448 stream_name=stream_name, 449 stream_configuration=False, # Don't look for stream configuration in cache. 450 ) 451 452 def get_sql_database_name(self) -> str: 453 """Return the SQL database name.""" 454 cache = self.get_sql_cache() 455 return cache.get_database_name() 456 457 def get_sql_schema_name(self) -> str: 458 """Return the SQL schema name.""" 459 cache = self.get_sql_cache() 460 return cache.schema_name 461 462 @property 463 def stream_names(self) -> list[str]: 464 """Return the set of stream names.""" 465 return self.connection.stream_names 466 467 @final 468 @property 469 def streams( 470 self, 471 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 472 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 473 474 This is a convenience wrapper around the `stream_names` 475 property and `get_dataset()` method. 476 """ 477 return self._SyncResultStreams(self) 478 479 class _SyncResultStreams(Mapping[str, CachedDataset]): 480 """A mapping of stream names to cached datasets.""" 481 482 def __init__( 483 self, 484 parent: SyncResult, 485 /, 486 ) -> None: 487 self.parent: SyncResult = parent 488 489 def __getitem__(self, key: str) -> CachedDataset: 490 return self.parent.get_dataset(stream_name=key) 491 492 def __iter__(self) -> Iterator[str]: 493 return iter(self.parent.stream_names) 494 495 def __len__(self) -> int: 496 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
273 def is_job_complete(self) -> bool: 274 """Check if the sync job is complete.""" 275 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
277 def get_job_status(self) -> JobStatusEnum: 278 """Check if the sync job is still running.""" 279 return self._fetch_latest_job_info().status
Check if the sync job is still running.
294 @property 295 def bytes_synced(self) -> int: 296 """Return the number of records processed.""" 297 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
299 @property 300 def records_synced(self) -> int: 301 """Return the number of records processed.""" 302 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
304 @property 305 def start_time(self) -> datetime: 306 """Return the start time of the sync job in UTC.""" 307 try: 308 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 309 except (ValueError, TypeError) as e: 310 if "Invalid isoformat string" in str(e): 311 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 312 api_root=self.workspace.api_root, 313 path="/jobs/get", 314 json={"id": self.job_id}, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 ) 318 raw_start_time = job_info_raw.get("startTime") 319 if raw_start_time: 320 return ab_datetime_parse(raw_start_time) 321 raise
Return the start time of the sync job in UTC.
339 def get_attempts(self) -> list[SyncAttempt]: 340 """Return a list of attempts for this sync job.""" 341 job_with_attempts = self._fetch_job_with_attempts() 342 attempts_data = job_with_attempts.get("attempts", []) 343 344 return [ 345 SyncAttempt( 346 workspace=self.workspace, 347 connection=self.connection, 348 job_id=self.job_id, 349 attempt_number=i, 350 _attempt_data=attempt_data, 351 ) 352 for i, attempt_data in enumerate(attempts_data, start=0) 353 ]
Return a list of attempts for this sync job.
355 def raise_failure_status( 356 self, 357 *, 358 refresh_status: bool = False, 359 ) -> None: 360 """Raise an exception if the sync job failed. 361 362 By default, this method will use the latest status available. If you want to refresh the 363 status before checking for failure, set `refresh_status=True`. If the job has failed, this 364 method will raise a `AirbyteConnectionSyncError`. 365 366 Otherwise, do nothing. 367 """ 368 if not refresh_status and self._latest_job_info: 369 latest_status = self._latest_job_info.status 370 else: 371 latest_status = self.get_job_status() 372 373 if latest_status in FAILED_STATUSES: 374 raise AirbyteConnectionSyncError( 375 workspace=self.workspace, 376 connection_id=self.connection.connection_id, 377 job_id=self.job_id, 378 job_status=self.get_job_status(), 379 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
381 def wait_for_completion( 382 self, 383 *, 384 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 385 raise_timeout: bool = True, 386 raise_failure: bool = False, 387 ) -> JobStatusEnum: 388 """Wait for a job to finish running.""" 389 start_time = time.time() 390 while True: 391 latest_status = self.get_job_status() 392 if latest_status in FINAL_STATUSES: 393 if raise_failure: 394 # No-op if the job succeeded or is still running: 395 self.raise_failure_status() 396 397 return latest_status 398 399 if time.time() - start_time > wait_timeout: 400 if raise_timeout: 401 raise AirbyteConnectionSyncTimeoutError( 402 workspace=self.workspace, 403 connection_id=self.connection.connection_id, 404 job_id=self.job_id, 405 job_status=latest_status, 406 timeout=wait_timeout, 407 ) 408 409 return latest_status # This will be a non-final status 410 411 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
413 def get_sql_cache(self) -> CacheBase: 414 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 415 if self._cache: 416 return self._cache 417 418 destination_configuration = self._get_destination_configuration() 419 self._cache = destination_to_cache(destination_configuration=destination_configuration) 420 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
422 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 423 """Return a SQL Engine for querying a SQL-based destination.""" 424 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
426 def get_sql_table_name(self, stream_name: str) -> str: 427 """Return the SQL table name of the named stream.""" 428 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
430 def get_sql_table( 431 self, 432 stream_name: str, 433 ) -> sqlalchemy.Table: 434 """Return a SQLAlchemy table object for the named stream.""" 435 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
437 def get_dataset(self, stream_name: str) -> CachedDataset: 438 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 439 440 This can be used to read and analyze the data in a SQL-based destination. 441 442 TODO: In a future iteration, we can consider providing stream configuration information 443 (catalog information) to the `CachedDataset` object via the "Get stream properties" 444 API: https://reference.airbyte.com/reference/getstreamproperties 445 """ 446 return CachedDataset( 447 self.get_sql_cache(), 448 stream_name=stream_name, 449 stream_configuration=False, # Don't look for stream configuration in cache. 450 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
      (catalog information) to the CachedDataset object via the "Get stream properties"
      API: https://reference.airbyte.com/reference/getstreamproperties
452 def get_sql_database_name(self) -> str: 453 """Return the SQL database name.""" 454 cache = self.get_sql_cache() 455 return cache.get_database_name()
Return the SQL database name.
457 def get_sql_schema_name(self) -> str: 458 """Return the SQL schema name.""" 459 cache = self.get_sql_cache() 460 return cache.schema_name
Return the SQL schema name.
462 @property 463 def stream_names(self) -> list[str]: 464 """Return the set of stream names.""" 465 return self.connection.stream_names
Return the set of stream names.
467 @final 468 @property 469 def streams( 470 self, 471 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 472 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 473 474 This is a convenience wrapper around the `stream_names` 475 property and `get_dataset()` method. 476 """ 477 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
134@dataclass 135class SyncAttempt: 136 """Represents a single attempt of a sync job. 137 138 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 139 calling `.SyncResult.get_attempts()`. 140 """ 141 142 workspace: CloudWorkspace 143 connection: CloudConnection 144 job_id: int 145 attempt_number: int 146 _attempt_data: dict[str, Any] | None = None 147 148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"] 152 153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"] 157 158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0) 162 163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0) 167 168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp) 173 174 def _get_attempt_data(self) -> dict[str, Any]: 175 """Get attempt data from the provided attempt data.""" 176 if self._attempt_data is None: 177 raise ValueError( 178 "Attempt data not provided. SyncAttempt should be created via " 179 "SyncResult.get_attempts()." 180 ) 181 return self._attempt_data["attempt"] 182 183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append( 207 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 208 ) 209 result = "\n".join(log_lines) 210 elif "logLines" in logs_data: 211 log_lines = logs_data["logLines"] 212 if log_lines: 213 result = "\n".join(log_lines) 214 215 return result
Represents a single attempt of a sync job.
This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by
calling .SyncResult.get_attempts().
148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"]
Return the attempt ID.
153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"]
Return the attempt status.
158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0)
Return the number of bytes synced in this attempt.
163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0)
Return the number of records synced in this attempt.
168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp)
Return the creation time of the attempt.
183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append( 207 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 208 ) 209 result = "\n".join(log_lines) 210 elif "logLines" in logs_data: 211 log_lines = logs_data["logLines"] 212 if log_lines: 213 result = "\n".join(log_lines) 214 215 return result
Return the complete log text for this attempt.
Returns:
String containing all log text for this attempt, with lines separated by newlines.