airbyte.cloud.sync_results
Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection object. You can obtain this object by calling
.CloudWorkspace.get_connection().
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync() will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import asdict, dataclass 106from typing import TYPE_CHECKING, Any 107 108from typing_extensions import final 109 110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse 111 112from airbyte._util import api_util 113from airbyte.caches._utils._dest_to_cache import destination_to_cache 114from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 115from airbyte.cloud.models import CloudConnectionInfo, CloudJobInfo, JobStatusEnum 116from airbyte.datasets import CachedDataset 117from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 118 119 120DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 121"""The default timeout for waiting for a sync job to complete, in seconds.""" 122 123if TYPE_CHECKING: 124 from datetime import datetime 125 126 import sqlalchemy 127 128 from airbyte.caches.base import CacheBase 129 from airbyte.cloud.connections import CloudConnection 130 from airbyte.cloud.workspaces import CloudWorkspace 131 132 133@dataclass 134class SyncAttempt: 135 """Represents a single attempt of a sync job. 136 137 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 138 calling `.SyncResult.get_attempts()`. 139 """ 140 141 workspace: CloudWorkspace 142 connection: CloudConnection 143 job_id: int 144 attempt_number: int 145 _attempt_data: dict[str, Any] | None = None 146 147 @property 148 def attempt_id(self) -> int: 149 """Return the attempt ID.""" 150 return self._get_attempt_data()["id"] 151 152 @property 153 def status(self) -> str: 154 """Return the attempt status.""" 155 return self._get_attempt_data()["status"] 156 157 @property 158 def bytes_synced(self) -> int: 159 """Return the number of bytes synced in this attempt.""" 160 return self._get_attempt_data().get("bytesSynced", 0) 161 162 @property 163 def records_synced(self) -> int: 164 """Return the number of records synced in this attempt.""" 165 return self._get_attempt_data().get("recordsSynced", 0) 166 167 @property 168 def created_at(self) -> datetime: 169 """Return the creation time of the attempt.""" 170 timestamp = self._get_attempt_data()["createdAt"] 171 return ab_datetime_parse(timestamp) 172 173 def _get_attempt_data(self) -> dict[str, Any]: 174 """Get attempt data from the provided attempt data.""" 175 if self._attempt_data is None: 176 raise ValueError( 177 "Attempt data not provided. SyncAttempt should be created via " 178 "SyncResult.get_attempts()." 179 ) 180 return self._attempt_data["attempt"] 181 182 def get_full_log_text(self) -> str: 183 """Return the complete log text for this attempt. 184 185 Returns: 186 String containing all log text for this attempt, with lines separated by newlines. 187 """ 188 if self._attempt_data is None: 189 return "" 190 191 logs_data = self._attempt_data.get("logs") 192 if not logs_data: 193 return "" 194 195 result = "" 196 197 if "events" in logs_data: 198 log_events = logs_data["events"] 199 if log_events: 200 log_lines = [] 201 for event in log_events: 202 timestamp = event.get("timestamp", "") 203 level = event.get("level", "INFO") 204 message = event.get("message", "") 205 log_lines.append( 206 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 207 ) 208 result = "\n".join(log_lines) 209 elif "logLines" in logs_data: 210 log_lines = logs_data["logLines"] 211 if log_lines: 212 result = "\n".join(log_lines) 213 214 return result 215 216 217@dataclass 218class SyncResult: 219 """The result of a sync operation. 220 221 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 222 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 223 """ 224 225 workspace: CloudWorkspace 226 connection: CloudConnection 227 job_id: int 228 table_name_prefix: str = "" 229 table_name_suffix: str = "" 230 _latest_job_info: CloudJobInfo | None = None 231 _connection_response: CloudConnectionInfo | None = None 232 _cache: CacheBase | None = None 233 _job_with_attempts_info: dict[str, Any] | None = None 234 235 @property 236 def job_url(self) -> str: 237 """Return the URL of the sync job. 238 239 Note: This currently returns the connection's job history URL, as there is no direct URL 240 to a specific job in the Airbyte Cloud web app. 241 242 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 243 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 244 """ 245 return f"{self.connection.job_history_url}" 246 247 def _get_connection_info(self, *, force_refresh: bool = False) -> CloudConnectionInfo: 248 """Return connection info for the sync job.""" 249 if self._connection_response and not force_refresh: 250 return self._connection_response 251 252 self._connection_response = CloudConnectionInfo.from_api_response( 253 api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 bearer_token=self.workspace.bearer_token, 260 ) 261 ) 262 return self._connection_response 263 264 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 265 """Return the destination configuration for the sync job.""" 266 connection_info = self._get_connection_info(force_refresh=force_refresh) 267 destination_response = api_util.get_destination( 268 destination_id=connection_info.destination_id, 269 api_root=self.workspace.api_root, 270 client_id=self.workspace.client_id, 271 client_secret=self.workspace.client_secret, 272 bearer_token=self.workspace.bearer_token, 273 ) 274 return asdict(destination_response.configuration) 275 276 def is_job_complete(self) -> bool: 277 """Check if the sync job is complete.""" 278 return self.get_job_status() in FINAL_STATUSES 279 280 def get_job_status(self) -> JobStatusEnum: 281 """Check if the sync job is still running.""" 282 return self._fetch_latest_job_info().status 283 284 def _fetch_latest_job_info(self) -> CloudJobInfo: 285 """Return the job info for the sync job.""" 286 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 287 return self._latest_job_info 288 289 self._latest_job_info = CloudJobInfo.from_api_response( 290 api_util.get_job_info( 291 job_id=self.job_id, 292 api_root=self.workspace.api_root, 293 client_id=self.workspace.client_id, 294 client_secret=self.workspace.client_secret, 295 bearer_token=self.workspace.bearer_token, 296 ) 297 ) 298 return self._latest_job_info 299 300 @property 301 def bytes_synced(self) -> int: 302 """Return the number of records processed.""" 303 return self._fetch_latest_job_info().bytes_synced or 0 304 305 @property 306 def records_synced(self) -> int: 307 """Return the number of records processed.""" 308 return self._fetch_latest_job_info().rows_synced or 0 309 310 @property 311 def start_time(self) -> datetime: 312 """Return the start time of the sync job in UTC.""" 313 try: 314 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 315 except (ValueError, TypeError) as e: 316 if "Invalid isoformat string" in str(e): 317 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 318 api_root=self.workspace.api_root, 319 config_api_root=self.workspace.config_api_root, 320 path="/jobs/get", 321 json={"id": self.job_id}, 322 client_id=self.workspace.client_id, 323 client_secret=self.workspace.client_secret, 324 bearer_token=self.workspace.bearer_token, 325 ) 326 raw_start_time = job_info_raw.get("startTime") 327 if raw_start_time: 328 return ab_datetime_parse(raw_start_time) 329 raise 330 331 def _fetch_job_with_attempts(self) -> dict[str, Any]: 332 """Fetch job info with attempts from Config API using lazy loading pattern.""" 333 if self._job_with_attempts_info is not None: 334 return self._job_with_attempts_info 335 336 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 337 api_root=self.workspace.api_root, 338 config_api_root=self.workspace.config_api_root, 339 path="/jobs/get", 340 json={ 341 "id": self.job_id, 342 }, 343 client_id=self.workspace.client_id, 344 client_secret=self.workspace.client_secret, 345 bearer_token=self.workspace.bearer_token, 346 ) 347 return self._job_with_attempts_info 348 349 def get_attempts(self) -> list[SyncAttempt]: 350 """Return a list of attempts for this sync job.""" 351 job_with_attempts = self._fetch_job_with_attempts() 352 attempts_data = job_with_attempts.get("attempts", []) 353 354 return [ 355 SyncAttempt( 356 workspace=self.workspace, 357 connection=self.connection, 358 job_id=self.job_id, 359 attempt_number=i, 360 _attempt_data=attempt_data, 361 ) 362 for i, attempt_data in enumerate(attempts_data, start=0) 363 ] 364 365 def raise_failure_status( 366 self, 367 *, 368 refresh_status: bool = False, 369 ) -> None: 370 """Raise an exception if the sync job failed. 371 372 By default, this method will use the latest status available. If you want to refresh the 373 status before checking for failure, set `refresh_status=True`. If the job has failed, this 374 method will raise a `AirbyteConnectionSyncError`. 375 376 Otherwise, do nothing. 377 """ 378 if not refresh_status and self._latest_job_info: 379 latest_status = self._latest_job_info.status 380 else: 381 latest_status = self.get_job_status() 382 383 if latest_status in FAILED_STATUSES: 384 raise AirbyteConnectionSyncError( 385 workspace=self.workspace, 386 connection_id=self.connection.connection_id, 387 job_id=self.job_id, 388 job_status=self.get_job_status(), 389 ) 390 391 def wait_for_completion( 392 self, 393 *, 394 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 395 raise_timeout: bool = True, 396 raise_failure: bool = False, 397 ) -> JobStatusEnum: 398 """Wait for a job to finish running.""" 399 start_time = time.time() 400 while True: 401 latest_status = self.get_job_status() 402 if latest_status in FINAL_STATUSES: 403 if raise_failure: 404 # No-op if the job succeeded or is still running: 405 self.raise_failure_status() 406 407 return latest_status 408 409 if time.time() - start_time > wait_timeout: 410 if raise_timeout: 411 raise AirbyteConnectionSyncTimeoutError( 412 workspace=self.workspace, 413 connection_id=self.connection.connection_id, 414 job_id=self.job_id, 415 job_status=latest_status, 416 timeout=wait_timeout, 417 ) 418 419 return latest_status # This will be a non-final status 420 421 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 422 423 def get_sql_cache(self) -> CacheBase: 424 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 425 if self._cache: 426 return self._cache 427 428 destination_configuration = self._get_destination_configuration() 429 self._cache = destination_to_cache(destination_configuration=destination_configuration) 430 return self._cache 431 432 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 433 """Return a SQL Engine for querying a SQL-based destination.""" 434 return self.get_sql_cache().get_sql_engine() 435 436 def get_sql_table_name(self, stream_name: str) -> str: 437 """Return the SQL table name of the named stream.""" 438 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 439 440 def get_sql_table( 441 self, 442 stream_name: str, 443 ) -> sqlalchemy.Table: 444 """Return a SQLAlchemy table object for the named stream.""" 445 return self.get_sql_cache().processor.get_sql_table(stream_name) 446 447 def get_dataset(self, stream_name: str) -> CachedDataset: 448 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 449 450 This can be used to read and analyze the data in a SQL-based destination. 451 452 TODO: In a future iteration, we can consider providing stream configuration information 453 (catalog information) to the `CachedDataset` object via the "Get stream properties" 454 API: https://reference.airbyte.com/reference/getstreamproperties 455 """ 456 return CachedDataset( 457 self.get_sql_cache(), 458 stream_name=stream_name, 459 stream_configuration=False, # Don't look for stream configuration in cache. 460 ) 461 462 def get_sql_database_name(self) -> str: 463 """Return the SQL database name.""" 464 cache = self.get_sql_cache() 465 return cache.get_database_name() 466 467 def get_sql_schema_name(self) -> str: 468 """Return the SQL schema name.""" 469 cache = self.get_sql_cache() 470 return cache.schema_name 471 472 @property 473 def stream_names(self) -> list[str]: 474 """Return the set of stream names.""" 475 return self.connection.stream_names 476 477 @final 478 @property 479 def streams( 480 self, 481 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 482 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 483 484 This is a convenience wrapper around the `stream_names` 485 property and `get_dataset()` method. 486 """ 487 return self._SyncResultStreams(self) 488 489 class _SyncResultStreams(Mapping[str, CachedDataset]): 490 """A mapping of stream names to cached datasets.""" 491 492 def __init__( 493 self, 494 parent: SyncResult, 495 /, 496 ) -> None: 497 self.parent: SyncResult = parent 498 499 def __getitem__(self, key: str) -> CachedDataset: 500 return self.parent.get_dataset(stream_name=key) 501 502 def __iter__(self) -> Iterator[str]: 503 return iter(self.parent.stream_names) 504 505 def __len__(self) -> int: 506 return len(self.parent.stream_names) 507 508 509__all__ = [ 510 "SyncResult", 511 "SyncAttempt", 512]
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: CloudJobInfo | None = None 232 _connection_response: CloudConnectionInfo | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> CloudConnectionInfo: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = CloudConnectionInfo.from_api_response( 254 api_util.get_connection( 255 workspace_id=self.workspace.workspace_id, 256 api_root=self.workspace.api_root, 257 connection_id=self.connection.connection_id, 258 client_id=self.workspace.client_id, 259 client_secret=self.workspace.client_secret, 260 bearer_token=self.workspace.bearer_token, 261 ) 262 ) 263 return self._connection_response 264 265 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 266 """Return the destination configuration for the sync job.""" 267 connection_info = self._get_connection_info(force_refresh=force_refresh) 268 destination_response = api_util.get_destination( 269 destination_id=connection_info.destination_id, 270 api_root=self.workspace.api_root, 271 client_id=self.workspace.client_id, 272 client_secret=self.workspace.client_secret, 273 bearer_token=self.workspace.bearer_token, 274 ) 275 return asdict(destination_response.configuration) 276 277 def is_job_complete(self) -> bool: 278 """Check if the sync job is complete.""" 279 return self.get_job_status() in FINAL_STATUSES 280 281 def get_job_status(self) -> JobStatusEnum: 282 """Check if the sync job is still running.""" 283 return self._fetch_latest_job_info().status 284 285 def _fetch_latest_job_info(self) -> CloudJobInfo: 286 """Return the job info for the sync job.""" 287 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 288 return self._latest_job_info 289 290 self._latest_job_info = CloudJobInfo.from_api_response( 291 api_util.get_job_info( 292 job_id=self.job_id, 293 api_root=self.workspace.api_root, 294 client_id=self.workspace.client_id, 295 client_secret=self.workspace.client_secret, 296 bearer_token=self.workspace.bearer_token, 297 ) 298 ) 299 return self._latest_job_info 300 301 @property 302 def bytes_synced(self) -> int: 303 """Return the number of records processed.""" 304 return self._fetch_latest_job_info().bytes_synced or 0 305 306 @property 307 def records_synced(self) -> int: 308 """Return the number of records processed.""" 309 return self._fetch_latest_job_info().rows_synced or 0 310 311 @property 312 def start_time(self) -> datetime: 313 """Return the start time of the sync job in UTC.""" 314 try: 315 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 316 except (ValueError, TypeError) as e: 317 if "Invalid isoformat string" in str(e): 318 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 319 api_root=self.workspace.api_root, 320 config_api_root=self.workspace.config_api_root, 321 path="/jobs/get", 322 json={"id": self.job_id}, 323 client_id=self.workspace.client_id, 324 client_secret=self.workspace.client_secret, 325 bearer_token=self.workspace.bearer_token, 326 ) 327 raw_start_time = job_info_raw.get("startTime") 328 if raw_start_time: 329 return ab_datetime_parse(raw_start_time) 330 raise 331 332 def _fetch_job_with_attempts(self) -> dict[str, Any]: 333 """Fetch job info with attempts from Config API using lazy loading pattern.""" 334 if self._job_with_attempts_info is not None: 335 return self._job_with_attempts_info 336 337 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 338 api_root=self.workspace.api_root, 339 config_api_root=self.workspace.config_api_root, 340 path="/jobs/get", 341 json={ 342 "id": self.job_id, 343 }, 344 client_id=self.workspace.client_id, 345 client_secret=self.workspace.client_secret, 346 bearer_token=self.workspace.bearer_token, 347 ) 348 return self._job_with_attempts_info 349 350 def get_attempts(self) -> list[SyncAttempt]: 351 """Return a list of attempts for this sync job.""" 352 job_with_attempts = self._fetch_job_with_attempts() 353 attempts_data = job_with_attempts.get("attempts", []) 354 355 return [ 356 SyncAttempt( 357 workspace=self.workspace, 358 connection=self.connection, 359 job_id=self.job_id, 360 attempt_number=i, 361 _attempt_data=attempt_data, 362 ) 363 for i, attempt_data in enumerate(attempts_data, start=0) 364 ] 365 366 def raise_failure_status( 367 self, 368 *, 369 refresh_status: bool = False, 370 ) -> None: 371 """Raise an exception if the sync job failed. 372 373 By default, this method will use the latest status available. If you want to refresh the 374 status before checking for failure, set `refresh_status=True`. If the job has failed, this 375 method will raise a `AirbyteConnectionSyncError`. 376 377 Otherwise, do nothing. 378 """ 379 if not refresh_status and self._latest_job_info: 380 latest_status = self._latest_job_info.status 381 else: 382 latest_status = self.get_job_status() 383 384 if latest_status in FAILED_STATUSES: 385 raise AirbyteConnectionSyncError( 386 workspace=self.workspace, 387 connection_id=self.connection.connection_id, 388 job_id=self.job_id, 389 job_status=self.get_job_status(), 390 ) 391 392 def wait_for_completion( 393 self, 394 *, 395 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 396 raise_timeout: bool = True, 397 raise_failure: bool = False, 398 ) -> JobStatusEnum: 399 """Wait for a job to finish running.""" 400 start_time = time.time() 401 while True: 402 latest_status = self.get_job_status() 403 if latest_status in FINAL_STATUSES: 404 if raise_failure: 405 # No-op if the job succeeded or is still running: 406 self.raise_failure_status() 407 408 return latest_status 409 410 if time.time() - start_time > wait_timeout: 411 if raise_timeout: 412 raise AirbyteConnectionSyncTimeoutError( 413 workspace=self.workspace, 414 connection_id=self.connection.connection_id, 415 job_id=self.job_id, 416 job_status=latest_status, 417 timeout=wait_timeout, 418 ) 419 420 return latest_status # This will be a non-final status 421 422 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 423 424 def get_sql_cache(self) -> CacheBase: 425 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 426 if self._cache: 427 return self._cache 428 429 destination_configuration = self._get_destination_configuration() 430 self._cache = destination_to_cache(destination_configuration=destination_configuration) 431 return self._cache 432 433 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 434 """Return a SQL Engine for querying a SQL-based destination.""" 435 return self.get_sql_cache().get_sql_engine() 436 437 def get_sql_table_name(self, stream_name: str) -> str: 438 """Return the SQL table name of the named stream.""" 439 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 440 441 def get_sql_table( 442 self, 443 stream_name: str, 444 ) -> sqlalchemy.Table: 445 """Return a SQLAlchemy table object for the named stream.""" 446 return self.get_sql_cache().processor.get_sql_table(stream_name) 447 448 def get_dataset(self, stream_name: str) -> CachedDataset: 449 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 450 451 This can be used to read and analyze the data in a SQL-based destination. 452 453 TODO: In a future iteration, we can consider providing stream configuration information 454 (catalog information) to the `CachedDataset` object via the "Get stream properties" 455 API: https://reference.airbyte.com/reference/getstreamproperties 456 """ 457 return CachedDataset( 458 self.get_sql_cache(), 459 stream_name=stream_name, 460 stream_configuration=False, # Don't look for stream configuration in cache. 461 ) 462 463 def get_sql_database_name(self) -> str: 464 """Return the SQL database name.""" 465 cache = self.get_sql_cache() 466 return cache.get_database_name() 467 468 def get_sql_schema_name(self) -> str: 469 """Return the SQL schema name.""" 470 cache = self.get_sql_cache() 471 return cache.schema_name 472 473 @property 474 def stream_names(self) -> list[str]: 475 """Return the set of stream names.""" 476 return self.connection.stream_names 477 478 @final 479 @property 480 def streams( 481 self, 482 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 483 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 484 485 This is a convenience wrapper around the `stream_names` 486 property and `get_dataset()` method. 487 """ 488 return self._SyncResultStreams(self) 489 490 class _SyncResultStreams(Mapping[str, CachedDataset]): 491 """A mapping of stream names to cached datasets.""" 492 493 def __init__( 494 self, 495 parent: SyncResult, 496 /, 497 ) -> None: 498 self.parent: SyncResult = parent 499 500 def __getitem__(self, key: str) -> CachedDataset: 501 return self.parent.get_dataset(stream_name=key) 502 503 def __iter__(self) -> Iterator[str]: 504 return iter(self.parent.stream_names) 505 506 def __len__(self) -> int: 507 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
277 def is_job_complete(self) -> bool: 278 """Check if the sync job is complete.""" 279 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
281 def get_job_status(self) -> JobStatusEnum: 282 """Check if the sync job is still running.""" 283 return self._fetch_latest_job_info().status
Check if the sync job is still running.
301 @property 302 def bytes_synced(self) -> int: 303 """Return the number of records processed.""" 304 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
306 @property 307 def records_synced(self) -> int: 308 """Return the number of records processed.""" 309 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
311 @property 312 def start_time(self) -> datetime: 313 """Return the start time of the sync job in UTC.""" 314 try: 315 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 316 except (ValueError, TypeError) as e: 317 if "Invalid isoformat string" in str(e): 318 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 319 api_root=self.workspace.api_root, 320 config_api_root=self.workspace.config_api_root, 321 path="/jobs/get", 322 json={"id": self.job_id}, 323 client_id=self.workspace.client_id, 324 client_secret=self.workspace.client_secret, 325 bearer_token=self.workspace.bearer_token, 326 ) 327 raw_start_time = job_info_raw.get("startTime") 328 if raw_start_time: 329 return ab_datetime_parse(raw_start_time) 330 raise
Return the start time of the sync job in UTC.
350 def get_attempts(self) -> list[SyncAttempt]: 351 """Return a list of attempts for this sync job.""" 352 job_with_attempts = self._fetch_job_with_attempts() 353 attempts_data = job_with_attempts.get("attempts", []) 354 355 return [ 356 SyncAttempt( 357 workspace=self.workspace, 358 connection=self.connection, 359 job_id=self.job_id, 360 attempt_number=i, 361 _attempt_data=attempt_data, 362 ) 363 for i, attempt_data in enumerate(attempts_data, start=0) 364 ]
Return a list of attempts for this sync job.
366 def raise_failure_status( 367 self, 368 *, 369 refresh_status: bool = False, 370 ) -> None: 371 """Raise an exception if the sync job failed. 372 373 By default, this method will use the latest status available. If you want to refresh the 374 status before checking for failure, set `refresh_status=True`. If the job has failed, this 375 method will raise a `AirbyteConnectionSyncError`. 376 377 Otherwise, do nothing. 378 """ 379 if not refresh_status and self._latest_job_info: 380 latest_status = self._latest_job_info.status 381 else: 382 latest_status = self.get_job_status() 383 384 if latest_status in FAILED_STATUSES: 385 raise AirbyteConnectionSyncError( 386 workspace=self.workspace, 387 connection_id=self.connection.connection_id, 388 job_id=self.job_id, 389 job_status=self.get_job_status(), 390 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
392 def wait_for_completion( 393 self, 394 *, 395 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 396 raise_timeout: bool = True, 397 raise_failure: bool = False, 398 ) -> JobStatusEnum: 399 """Wait for a job to finish running.""" 400 start_time = time.time() 401 while True: 402 latest_status = self.get_job_status() 403 if latest_status in FINAL_STATUSES: 404 if raise_failure: 405 # No-op if the job succeeded or is still running: 406 self.raise_failure_status() 407 408 return latest_status 409 410 if time.time() - start_time > wait_timeout: 411 if raise_timeout: 412 raise AirbyteConnectionSyncTimeoutError( 413 workspace=self.workspace, 414 connection_id=self.connection.connection_id, 415 job_id=self.job_id, 416 job_status=latest_status, 417 timeout=wait_timeout, 418 ) 419 420 return latest_status # This will be a non-final status 421 422 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
424 def get_sql_cache(self) -> CacheBase: 425 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 426 if self._cache: 427 return self._cache 428 429 destination_configuration = self._get_destination_configuration() 430 self._cache = destination_to_cache(destination_configuration=destination_configuration) 431 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
433 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 434 """Return a SQL Engine for querying a SQL-based destination.""" 435 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
437 def get_sql_table_name(self, stream_name: str) -> str: 438 """Return the SQL table name of the named stream.""" 439 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
441 def get_sql_table( 442 self, 443 stream_name: str, 444 ) -> sqlalchemy.Table: 445 """Return a SQLAlchemy table object for the named stream.""" 446 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
448 def get_dataset(self, stream_name: str) -> CachedDataset: 449 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 450 451 This can be used to read and analyze the data in a SQL-based destination. 452 453 TODO: In a future iteration, we can consider providing stream configuration information 454 (catalog information) to the `CachedDataset` object via the "Get stream properties" 455 API: https://reference.airbyte.com/reference/getstreamproperties 456 """ 457 return CachedDataset( 458 self.get_sql_cache(), 459 stream_name=stream_name, 460 stream_configuration=False, # Don't look for stream configuration in cache. 461 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
463 def get_sql_database_name(self) -> str: 464 """Return the SQL database name.""" 465 cache = self.get_sql_cache() 466 return cache.get_database_name()
Return the SQL database name.
468 def get_sql_schema_name(self) -> str: 469 """Return the SQL schema name.""" 470 cache = self.get_sql_cache() 471 return cache.schema_name
Return the SQL schema name.
473 @property 474 def stream_names(self) -> list[str]: 475 """Return the set of stream names.""" 476 return self.connection.stream_names
Return the set of stream names.
478 @final 479 @property 480 def streams( 481 self, 482 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 483 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 484 485 This is a convenience wrapper around the `stream_names` 486 property and `get_dataset()` method. 487 """ 488 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
134@dataclass 135class SyncAttempt: 136 """Represents a single attempt of a sync job. 137 138 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 139 calling `.SyncResult.get_attempts()`. 140 """ 141 142 workspace: CloudWorkspace 143 connection: CloudConnection 144 job_id: int 145 attempt_number: int 146 _attempt_data: dict[str, Any] | None = None 147 148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"] 152 153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"] 157 158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0) 162 163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0) 167 168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp) 173 174 def _get_attempt_data(self) -> dict[str, Any]: 175 """Get attempt data from the provided attempt data.""" 176 if self._attempt_data is None: 177 raise ValueError( 178 "Attempt data not provided. SyncAttempt should be created via " 179 "SyncResult.get_attempts()." 180 ) 181 return self._attempt_data["attempt"] 182 183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append( 207 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 208 ) 209 result = "\n".join(log_lines) 210 elif "logLines" in logs_data: 211 log_lines = logs_data["logLines"] 212 if log_lines: 213 result = "\n".join(log_lines) 214 215 return result
Represents a single attempt of a sync job.
This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by
calling .SyncResult.get_attempts().
148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"]
Return the attempt ID.
153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"]
Return the attempt status.
158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0)
Return the number of bytes synced in this attempt.
163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0)
Return the number of records synced in this attempt.
168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp)
Return the creation time of the attempt.
183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append( 207 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 208 ) 209 result = "\n".join(log_lines) 210 elif "logLines" in logs_data: 211 log_lines = logs_data["logLines"] 212 if log_lines: 213 result = "\n".join(log_lines) 214 215 return result
Return the complete log text for this attempt.
Returns:
String containing all log text for this attempt, with lines separated by newlines.