airbyte.cloud.sync_results
Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection object. You can obtain this object by calling
.CloudWorkspace.get_connection().
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync() will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import asdict, dataclass 106from typing import TYPE_CHECKING, Any 107 108from typing_extensions import final 109 110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse 111 112from airbyte._util import api_util 113from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 114from airbyte.datasets import CachedDataset 115from airbyte.destinations._translate_dest_to_cache import destination_to_cache 116from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 117 118 119DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 120"""The default timeout for waiting for a sync job to complete, in seconds.""" 121 122if TYPE_CHECKING: 123 from datetime import datetime 124 125 import sqlalchemy 126 127 from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum 128 from airbyte.caches.base import CacheBase 129 from airbyte.cloud.connections import CloudConnection 130 from airbyte.cloud.workspaces import CloudWorkspace 131 132 133@dataclass 134class SyncAttempt: 135 """Represents a single attempt of a sync job. 136 137 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 138 calling `.SyncResult.get_attempts()`. 139 """ 140 141 workspace: CloudWorkspace 142 connection: CloudConnection 143 job_id: int 144 attempt_number: int 145 _attempt_data: dict[str, Any] | None = None 146 147 @property 148 def attempt_id(self) -> int: 149 """Return the attempt ID.""" 150 return self._get_attempt_data()["id"] 151 152 @property 153 def status(self) -> str: 154 """Return the attempt status.""" 155 return self._get_attempt_data()["status"] 156 157 @property 158 def bytes_synced(self) -> int: 159 """Return the number of bytes synced in this attempt.""" 160 return self._get_attempt_data().get("bytesSynced", 0) 161 162 @property 163 def records_synced(self) -> int: 164 """Return the number of records synced in this attempt.""" 165 return self._get_attempt_data().get("recordsSynced", 0) 166 167 @property 168 def created_at(self) -> datetime: 169 """Return the creation time of the attempt.""" 170 timestamp = self._get_attempt_data()["createdAt"] 171 return ab_datetime_parse(timestamp) 172 173 def _get_attempt_data(self) -> dict[str, Any]: 174 """Get attempt data from the provided attempt data.""" 175 if self._attempt_data is None: 176 raise ValueError( 177 "Attempt data not provided. SyncAttempt should be created via " 178 "SyncResult.get_attempts()." 179 ) 180 return self._attempt_data["attempt"] 181 182 def get_full_log_text(self) -> str: 183 """Return the complete log text for this attempt. 184 185 Returns: 186 String containing all log text for this attempt, with lines separated by newlines. 187 """ 188 if self._attempt_data is None: 189 return "" 190 191 logs_data = self._attempt_data.get("logs") 192 if not logs_data: 193 return "" 194 195 result = "" 196 197 if "events" in logs_data: 198 log_events = logs_data["events"] 199 if log_events: 200 log_lines = [] 201 for event in log_events: 202 timestamp = event.get("timestamp", "") 203 level = event.get("level", "INFO") 204 message = event.get("message", "") 205 log_lines.append( 206 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 207 ) 208 result = "\n".join(log_lines) 209 elif "logLines" in logs_data: 210 log_lines = logs_data["logLines"] 211 if log_lines: 212 result = "\n".join(log_lines) 213 214 return result 215 216 217@dataclass 218class SyncResult: 219 """The result of a sync operation. 220 221 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 222 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 223 """ 224 225 workspace: CloudWorkspace 226 connection: CloudConnection 227 job_id: int 228 table_name_prefix: str = "" 229 table_name_suffix: str = "" 230 _latest_job_info: JobResponse | None = None 231 _connection_response: ConnectionResponse | None = None 232 _cache: CacheBase | None = None 233 _job_with_attempts_info: dict[str, Any] | None = None 234 235 @property 236 def job_url(self) -> str: 237 """Return the URL of the sync job. 238 239 Note: This currently returns the connection's job history URL, as there is no direct URL 240 to a specific job in the Airbyte Cloud web app. 241 242 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 243 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 244 """ 245 return f"{self.connection.job_history_url}" 246 247 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 248 """Return connection info for the sync job.""" 249 if self._connection_response and not force_refresh: 250 return self._connection_response 251 252 self._connection_response = api_util.get_connection( 253 workspace_id=self.workspace.workspace_id, 254 api_root=self.workspace.api_root, 255 connection_id=self.connection.connection_id, 256 client_id=self.workspace.client_id, 257 client_secret=self.workspace.client_secret, 258 bearer_token=self.workspace.bearer_token, 259 ) 260 return self._connection_response 261 262 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 263 """Return the destination configuration for the sync job.""" 264 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 265 destination_response = api_util.get_destination( 266 destination_id=connection_info.destination_id, 267 api_root=self.workspace.api_root, 268 client_id=self.workspace.client_id, 269 client_secret=self.workspace.client_secret, 270 bearer_token=self.workspace.bearer_token, 271 ) 272 return asdict(destination_response.configuration) 273 274 def is_job_complete(self) -> bool: 275 """Check if the sync job is complete.""" 276 return self.get_job_status() in FINAL_STATUSES 277 278 def get_job_status(self) -> JobStatusEnum: 279 """Check if the sync job is still running.""" 280 return self._fetch_latest_job_info().status 281 282 def _fetch_latest_job_info(self) -> JobResponse: 283 """Return the job info for the sync job.""" 284 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 285 return self._latest_job_info 286 287 self._latest_job_info = api_util.get_job_info( 288 job_id=self.job_id, 289 api_root=self.workspace.api_root, 290 client_id=self.workspace.client_id, 291 client_secret=self.workspace.client_secret, 292 bearer_token=self.workspace.bearer_token, 293 ) 294 return self._latest_job_info 295 296 @property 297 def bytes_synced(self) -> int: 298 """Return the number of records processed.""" 299 return self._fetch_latest_job_info().bytes_synced or 0 300 301 @property 302 def records_synced(self) -> int: 303 """Return the number of records processed.""" 304 return self._fetch_latest_job_info().rows_synced or 0 305 306 @property 307 def start_time(self) -> datetime: 308 """Return the start time of the sync job in UTC.""" 309 try: 310 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 311 except (ValueError, TypeError) as e: 312 if "Invalid isoformat string" in str(e): 313 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 314 api_root=self.workspace.api_root, 315 path="/jobs/get", 316 json={"id": self.job_id}, 317 client_id=self.workspace.client_id, 318 client_secret=self.workspace.client_secret, 319 bearer_token=self.workspace.bearer_token, 320 ) 321 raw_start_time = job_info_raw.get("startTime") 322 if raw_start_time: 323 return ab_datetime_parse(raw_start_time) 324 raise 325 326 def _fetch_job_with_attempts(self) -> dict[str, Any]: 327 """Fetch job info with attempts from Config API using lazy loading pattern.""" 328 if self._job_with_attempts_info is not None: 329 return self._job_with_attempts_info 330 331 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 332 api_root=self.workspace.api_root, 333 path="/jobs/get", 334 json={ 335 "id": self.job_id, 336 }, 337 client_id=self.workspace.client_id, 338 client_secret=self.workspace.client_secret, 339 bearer_token=self.workspace.bearer_token, 340 ) 341 return self._job_with_attempts_info 342 343 def get_attempts(self) -> list[SyncAttempt]: 344 """Return a list of attempts for this sync job.""" 345 job_with_attempts = self._fetch_job_with_attempts() 346 attempts_data = job_with_attempts.get("attempts", []) 347 348 return [ 349 SyncAttempt( 350 workspace=self.workspace, 351 connection=self.connection, 352 job_id=self.job_id, 353 attempt_number=i, 354 _attempt_data=attempt_data, 355 ) 356 for i, attempt_data in enumerate(attempts_data, start=0) 357 ] 358 359 def raise_failure_status( 360 self, 361 *, 362 refresh_status: bool = False, 363 ) -> None: 364 """Raise an exception if the sync job failed. 365 366 By default, this method will use the latest status available. If you want to refresh the 367 status before checking for failure, set `refresh_status=True`. If the job has failed, this 368 method will raise a `AirbyteConnectionSyncError`. 369 370 Otherwise, do nothing. 371 """ 372 if not refresh_status and self._latest_job_info: 373 latest_status = self._latest_job_info.status 374 else: 375 latest_status = self.get_job_status() 376 377 if latest_status in FAILED_STATUSES: 378 raise AirbyteConnectionSyncError( 379 workspace=self.workspace, 380 connection_id=self.connection.connection_id, 381 job_id=self.job_id, 382 job_status=self.get_job_status(), 383 ) 384 385 def wait_for_completion( 386 self, 387 *, 388 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 389 raise_timeout: bool = True, 390 raise_failure: bool = False, 391 ) -> JobStatusEnum: 392 """Wait for a job to finish running.""" 393 start_time = time.time() 394 while True: 395 latest_status = self.get_job_status() 396 if latest_status in FINAL_STATUSES: 397 if raise_failure: 398 # No-op if the job succeeded or is still running: 399 self.raise_failure_status() 400 401 return latest_status 402 403 if time.time() - start_time > wait_timeout: 404 if raise_timeout: 405 raise AirbyteConnectionSyncTimeoutError( 406 workspace=self.workspace, 407 connection_id=self.connection.connection_id, 408 job_id=self.job_id, 409 job_status=latest_status, 410 timeout=wait_timeout, 411 ) 412 413 return latest_status # This will be a non-final status 414 415 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 416 417 def get_sql_cache(self) -> CacheBase: 418 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 419 if self._cache: 420 return self._cache 421 422 destination_configuration = self._get_destination_configuration() 423 self._cache = destination_to_cache(destination_configuration=destination_configuration) 424 return self._cache 425 426 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 427 """Return a SQL Engine for querying a SQL-based destination.""" 428 return self.get_sql_cache().get_sql_engine() 429 430 def get_sql_table_name(self, stream_name: str) -> str: 431 """Return the SQL table name of the named stream.""" 432 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 433 434 def get_sql_table( 435 self, 436 stream_name: str, 437 ) -> sqlalchemy.Table: 438 """Return a SQLAlchemy table object for the named stream.""" 439 return self.get_sql_cache().processor.get_sql_table(stream_name) 440 441 def get_dataset(self, stream_name: str) -> CachedDataset: 442 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 443 444 This can be used to read and analyze the data in a SQL-based destination. 445 446 TODO: In a future iteration, we can consider providing stream configuration information 447 (catalog information) to the `CachedDataset` object via the "Get stream properties" 448 API: https://reference.airbyte.com/reference/getstreamproperties 449 """ 450 return CachedDataset( 451 self.get_sql_cache(), 452 stream_name=stream_name, 453 stream_configuration=False, # Don't look for stream configuration in cache. 454 ) 455 456 def get_sql_database_name(self) -> str: 457 """Return the SQL database name.""" 458 cache = self.get_sql_cache() 459 return cache.get_database_name() 460 461 def get_sql_schema_name(self) -> str: 462 """Return the SQL schema name.""" 463 cache = self.get_sql_cache() 464 return cache.schema_name 465 466 @property 467 def stream_names(self) -> list[str]: 468 """Return the set of stream names.""" 469 return self.connection.stream_names 470 471 @final 472 @property 473 def streams( 474 self, 475 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 476 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 477 478 This is a convenience wrapper around the `stream_names` 479 property and `get_dataset()` method. 480 """ 481 return self._SyncResultStreams(self) 482 483 class _SyncResultStreams(Mapping[str, CachedDataset]): 484 """A mapping of stream names to cached datasets.""" 485 486 def __init__( 487 self, 488 parent: SyncResult, 489 /, 490 ) -> None: 491 self.parent: SyncResult = parent 492 493 def __getitem__(self, key: str) -> CachedDataset: 494 return self.parent.get_dataset(stream_name=key) 495 496 def __iter__(self) -> Iterator[str]: 497 return iter(self.parent.stream_names) 498 499 def __len__(self) -> int: 500 return len(self.parent.stream_names) 501 502 503__all__ = [ 504 "SyncResult", 505 "SyncAttempt", 506]
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: JobResponse | None = None 232 _connection_response: ConnectionResponse | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 bearer_token=self.workspace.bearer_token, 260 ) 261 return self._connection_response 262 263 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 264 """Return the destination configuration for the sync job.""" 265 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 266 destination_response = api_util.get_destination( 267 destination_id=connection_info.destination_id, 268 api_root=self.workspace.api_root, 269 client_id=self.workspace.client_id, 270 client_secret=self.workspace.client_secret, 271 bearer_token=self.workspace.bearer_token, 272 ) 273 return asdict(destination_response.configuration) 274 275 def is_job_complete(self) -> bool: 276 """Check if the sync job is complete.""" 277 return self.get_job_status() in FINAL_STATUSES 278 279 def get_job_status(self) -> JobStatusEnum: 280 """Check if the sync job is still running.""" 281 return self._fetch_latest_job_info().status 282 283 def _fetch_latest_job_info(self) -> JobResponse: 284 """Return the job info for the sync job.""" 285 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 286 return self._latest_job_info 287 288 self._latest_job_info = api_util.get_job_info( 289 job_id=self.job_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 bearer_token=self.workspace.bearer_token, 294 ) 295 return self._latest_job_info 296 297 @property 298 def bytes_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().bytes_synced or 0 301 302 @property 303 def records_synced(self) -> int: 304 """Return the number of records processed.""" 305 return self._fetch_latest_job_info().rows_synced or 0 306 307 @property 308 def start_time(self) -> datetime: 309 """Return the start time of the sync job in UTC.""" 310 try: 311 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 312 except (ValueError, TypeError) as e: 313 if "Invalid isoformat string" in str(e): 314 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 315 api_root=self.workspace.api_root, 316 path="/jobs/get", 317 json={"id": self.job_id}, 318 client_id=self.workspace.client_id, 319 client_secret=self.workspace.client_secret, 320 bearer_token=self.workspace.bearer_token, 321 ) 322 raw_start_time = job_info_raw.get("startTime") 323 if raw_start_time: 324 return ab_datetime_parse(raw_start_time) 325 raise 326 327 def _fetch_job_with_attempts(self) -> dict[str, Any]: 328 """Fetch job info with attempts from Config API using lazy loading pattern.""" 329 if self._job_with_attempts_info is not None: 330 return self._job_with_attempts_info 331 332 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 333 api_root=self.workspace.api_root, 334 path="/jobs/get", 335 json={ 336 "id": self.job_id, 337 }, 338 client_id=self.workspace.client_id, 339 client_secret=self.workspace.client_secret, 340 bearer_token=self.workspace.bearer_token, 341 ) 342 return self._job_with_attempts_info 343 344 def get_attempts(self) -> list[SyncAttempt]: 345 """Return a list of attempts for this sync job.""" 346 job_with_attempts = self._fetch_job_with_attempts() 347 attempts_data = job_with_attempts.get("attempts", []) 348 349 return [ 350 SyncAttempt( 351 workspace=self.workspace, 352 connection=self.connection, 353 job_id=self.job_id, 354 attempt_number=i, 355 _attempt_data=attempt_data, 356 ) 357 for i, attempt_data in enumerate(attempts_data, start=0) 358 ] 359 360 def raise_failure_status( 361 self, 362 *, 363 refresh_status: bool = False, 364 ) -> None: 365 """Raise an exception if the sync job failed. 366 367 By default, this method will use the latest status available. If you want to refresh the 368 status before checking for failure, set `refresh_status=True`. If the job has failed, this 369 method will raise a `AirbyteConnectionSyncError`. 370 371 Otherwise, do nothing. 372 """ 373 if not refresh_status and self._latest_job_info: 374 latest_status = self._latest_job_info.status 375 else: 376 latest_status = self.get_job_status() 377 378 if latest_status in FAILED_STATUSES: 379 raise AirbyteConnectionSyncError( 380 workspace=self.workspace, 381 connection_id=self.connection.connection_id, 382 job_id=self.job_id, 383 job_status=self.get_job_status(), 384 ) 385 386 def wait_for_completion( 387 self, 388 *, 389 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 390 raise_timeout: bool = True, 391 raise_failure: bool = False, 392 ) -> JobStatusEnum: 393 """Wait for a job to finish running.""" 394 start_time = time.time() 395 while True: 396 latest_status = self.get_job_status() 397 if latest_status in FINAL_STATUSES: 398 if raise_failure: 399 # No-op if the job succeeded or is still running: 400 self.raise_failure_status() 401 402 return latest_status 403 404 if time.time() - start_time > wait_timeout: 405 if raise_timeout: 406 raise AirbyteConnectionSyncTimeoutError( 407 workspace=self.workspace, 408 connection_id=self.connection.connection_id, 409 job_id=self.job_id, 410 job_status=latest_status, 411 timeout=wait_timeout, 412 ) 413 414 return latest_status # This will be a non-final status 415 416 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 417 418 def get_sql_cache(self) -> CacheBase: 419 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 420 if self._cache: 421 return self._cache 422 423 destination_configuration = self._get_destination_configuration() 424 self._cache = destination_to_cache(destination_configuration=destination_configuration) 425 return self._cache 426 427 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 428 """Return a SQL Engine for querying a SQL-based destination.""" 429 return self.get_sql_cache().get_sql_engine() 430 431 def get_sql_table_name(self, stream_name: str) -> str: 432 """Return the SQL table name of the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 434 435 def get_sql_table( 436 self, 437 stream_name: str, 438 ) -> sqlalchemy.Table: 439 """Return a SQLAlchemy table object for the named stream.""" 440 return self.get_sql_cache().processor.get_sql_table(stream_name) 441 442 def get_dataset(self, stream_name: str) -> CachedDataset: 443 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 444 445 This can be used to read and analyze the data in a SQL-based destination. 446 447 TODO: In a future iteration, we can consider providing stream configuration information 448 (catalog information) to the `CachedDataset` object via the "Get stream properties" 449 API: https://reference.airbyte.com/reference/getstreamproperties 450 """ 451 return CachedDataset( 452 self.get_sql_cache(), 453 stream_name=stream_name, 454 stream_configuration=False, # Don't look for stream configuration in cache. 455 ) 456 457 def get_sql_database_name(self) -> str: 458 """Return the SQL database name.""" 459 cache = self.get_sql_cache() 460 return cache.get_database_name() 461 462 def get_sql_schema_name(self) -> str: 463 """Return the SQL schema name.""" 464 cache = self.get_sql_cache() 465 return cache.schema_name 466 467 @property 468 def stream_names(self) -> list[str]: 469 """Return the set of stream names.""" 470 return self.connection.stream_names 471 472 @final 473 @property 474 def streams( 475 self, 476 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 477 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 478 479 This is a convenience wrapper around the `stream_names` 480 property and `get_dataset()` method. 481 """ 482 return self._SyncResultStreams(self) 483 484 class _SyncResultStreams(Mapping[str, CachedDataset]): 485 """A mapping of stream names to cached datasets.""" 486 487 def __init__( 488 self, 489 parent: SyncResult, 490 /, 491 ) -> None: 492 self.parent: SyncResult = parent 493 494 def __getitem__(self, key: str) -> CachedDataset: 495 return self.parent.get_dataset(stream_name=key) 496 497 def __iter__(self) -> Iterator[str]: 498 return iter(self.parent.stream_names) 499 500 def __len__(self) -> int: 501 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
275 def is_job_complete(self) -> bool: 276 """Check if the sync job is complete.""" 277 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
279 def get_job_status(self) -> JobStatusEnum: 280 """Check if the sync job is still running.""" 281 return self._fetch_latest_job_info().status
Check if the sync job is still running.
297 @property 298 def bytes_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
302 @property 303 def records_synced(self) -> int: 304 """Return the number of records processed.""" 305 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
307 @property 308 def start_time(self) -> datetime: 309 """Return the start time of the sync job in UTC.""" 310 try: 311 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 312 except (ValueError, TypeError) as e: 313 if "Invalid isoformat string" in str(e): 314 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 315 api_root=self.workspace.api_root, 316 path="/jobs/get", 317 json={"id": self.job_id}, 318 client_id=self.workspace.client_id, 319 client_secret=self.workspace.client_secret, 320 bearer_token=self.workspace.bearer_token, 321 ) 322 raw_start_time = job_info_raw.get("startTime") 323 if raw_start_time: 324 return ab_datetime_parse(raw_start_time) 325 raise
Return the start time of the sync job in UTC.
344 def get_attempts(self) -> list[SyncAttempt]: 345 """Return a list of attempts for this sync job.""" 346 job_with_attempts = self._fetch_job_with_attempts() 347 attempts_data = job_with_attempts.get("attempts", []) 348 349 return [ 350 SyncAttempt( 351 workspace=self.workspace, 352 connection=self.connection, 353 job_id=self.job_id, 354 attempt_number=i, 355 _attempt_data=attempt_data, 356 ) 357 for i, attempt_data in enumerate(attempts_data, start=0) 358 ]
Return a list of attempts for this sync job.
360 def raise_failure_status( 361 self, 362 *, 363 refresh_status: bool = False, 364 ) -> None: 365 """Raise an exception if the sync job failed. 366 367 By default, this method will use the latest status available. If you want to refresh the 368 status before checking for failure, set `refresh_status=True`. If the job has failed, this 369 method will raise a `AirbyteConnectionSyncError`. 370 371 Otherwise, do nothing. 372 """ 373 if not refresh_status and self._latest_job_info: 374 latest_status = self._latest_job_info.status 375 else: 376 latest_status = self.get_job_status() 377 378 if latest_status in FAILED_STATUSES: 379 raise AirbyteConnectionSyncError( 380 workspace=self.workspace, 381 connection_id=self.connection.connection_id, 382 job_id=self.job_id, 383 job_status=self.get_job_status(), 384 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
386 def wait_for_completion( 387 self, 388 *, 389 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 390 raise_timeout: bool = True, 391 raise_failure: bool = False, 392 ) -> JobStatusEnum: 393 """Wait for a job to finish running.""" 394 start_time = time.time() 395 while True: 396 latest_status = self.get_job_status() 397 if latest_status in FINAL_STATUSES: 398 if raise_failure: 399 # No-op if the job succeeded or is still running: 400 self.raise_failure_status() 401 402 return latest_status 403 404 if time.time() - start_time > wait_timeout: 405 if raise_timeout: 406 raise AirbyteConnectionSyncTimeoutError( 407 workspace=self.workspace, 408 connection_id=self.connection.connection_id, 409 job_id=self.job_id, 410 job_status=latest_status, 411 timeout=wait_timeout, 412 ) 413 414 return latest_status # This will be a non-final status 415 416 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
418 def get_sql_cache(self) -> CacheBase: 419 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 420 if self._cache: 421 return self._cache 422 423 destination_configuration = self._get_destination_configuration() 424 self._cache = destination_to_cache(destination_configuration=destination_configuration) 425 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
427 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 428 """Return a SQL Engine for querying a SQL-based destination.""" 429 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
431 def get_sql_table_name(self, stream_name: str) -> str: 432 """Return the SQL table name of the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
435 def get_sql_table( 436 self, 437 stream_name: str, 438 ) -> sqlalchemy.Table: 439 """Return a SQLAlchemy table object for the named stream.""" 440 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
442 def get_dataset(self, stream_name: str) -> CachedDataset: 443 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 444 445 This can be used to read and analyze the data in a SQL-based destination. 446 447 TODO: In a future iteration, we can consider providing stream configuration information 448 (catalog information) to the `CachedDataset` object via the "Get stream properties" 449 API: https://reference.airbyte.com/reference/getstreamproperties 450 """ 451 return CachedDataset( 452 self.get_sql_cache(), 453 stream_name=stream_name, 454 stream_configuration=False, # Don't look for stream configuration in cache. 455 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
457 def get_sql_database_name(self) -> str: 458 """Return the SQL database name.""" 459 cache = self.get_sql_cache() 460 return cache.get_database_name()
Return the SQL database name.
462 def get_sql_schema_name(self) -> str: 463 """Return the SQL schema name.""" 464 cache = self.get_sql_cache() 465 return cache.schema_name
Return the SQL schema name.
467 @property 468 def stream_names(self) -> list[str]: 469 """Return the set of stream names.""" 470 return self.connection.stream_names
Return the set of stream names.
472 @final 473 @property 474 def streams( 475 self, 476 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 477 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 478 479 This is a convenience wrapper around the `stream_names` 480 property and `get_dataset()` method. 481 """ 482 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
134@dataclass 135class SyncAttempt: 136 """Represents a single attempt of a sync job. 137 138 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 139 calling `.SyncResult.get_attempts()`. 140 """ 141 142 workspace: CloudWorkspace 143 connection: CloudConnection 144 job_id: int 145 attempt_number: int 146 _attempt_data: dict[str, Any] | None = None 147 148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"] 152 153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"] 157 158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0) 162 163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0) 167 168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp) 173 174 def _get_attempt_data(self) -> dict[str, Any]: 175 """Get attempt data from the provided attempt data.""" 176 if self._attempt_data is None: 177 raise ValueError( 178 "Attempt data not provided. SyncAttempt should be created via " 179 "SyncResult.get_attempts()." 180 ) 181 return self._attempt_data["attempt"] 182 183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append( 207 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 208 ) 209 result = "\n".join(log_lines) 210 elif "logLines" in logs_data: 211 log_lines = logs_data["logLines"] 212 if log_lines: 213 result = "\n".join(log_lines) 214 215 return result
Represents a single attempt of a sync job.
This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt by
calling .SyncResult.get_attempts().
148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"]
Return the attempt ID.
153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"]
Return the attempt status.
158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0)
Return the number of bytes synced in this attempt.
163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0)
Return the number of records synced in this attempt.
168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp)
Return the creation time of the attempt.
183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append( 207 f"[{timestamp}] {level}: {message}" # pyrefly: ignore[bad-argument-type] 208 ) 209 result = "\n".join(log_lines) 210 elif "logLines" in logs_data: 211 log_lines = logs_data["logLines"] 212 if log_lines: 213 result = "\n".join(log_lines) 214 215 return result
Return the complete log text for this attempt.
Returns:
String containing all log text for this attempt, with lines separated by newlines.