airbyte.cloud.sync_results
Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection
object. You can obtain this object by calling
.CloudWorkspace.get_connection()
.
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection
object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync()
will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False
.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES
.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import asdict, dataclass 106from typing import TYPE_CHECKING, Any 107 108from typing_extensions import final 109 110from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse 111 112from airbyte._util import api_util 113from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 114from airbyte.datasets import CachedDataset 115from airbyte.destinations._translate_dest_to_cache import destination_to_cache 116from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 117 118 119DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 120"""The default timeout for waiting for a sync job to complete, in seconds.""" 121 122if TYPE_CHECKING: 123 from datetime import datetime 124 125 import sqlalchemy 126 127 from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum 128 from airbyte.caches.base import CacheBase 129 from airbyte.cloud.connections import CloudConnection 130 from airbyte.cloud.workspaces import CloudWorkspace 131 132 133@dataclass 134class SyncAttempt: 135 """Represents a single attempt of a sync job. 136 137 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 138 calling `.SyncResult.get_attempts()`. 139 """ 140 141 workspace: CloudWorkspace 142 connection: CloudConnection 143 job_id: int 144 attempt_number: int 145 _attempt_data: dict[str, Any] | None = None 146 147 @property 148 def attempt_id(self) -> int: 149 """Return the attempt ID.""" 150 return self._get_attempt_data()["id"] 151 152 @property 153 def status(self) -> str: 154 """Return the attempt status.""" 155 return self._get_attempt_data()["status"] 156 157 @property 158 def bytes_synced(self) -> int: 159 """Return the number of bytes synced in this attempt.""" 160 return self._get_attempt_data().get("bytesSynced", 0) 161 162 @property 163 def records_synced(self) -> int: 164 """Return the number of records synced in this attempt.""" 165 return self._get_attempt_data().get("recordsSynced", 0) 166 167 @property 168 def created_at(self) -> datetime: 169 """Return the creation time of the attempt.""" 170 timestamp = self._get_attempt_data()["createdAt"] 171 return ab_datetime_parse(timestamp) 172 173 def _get_attempt_data(self) -> dict[str, Any]: 174 """Get attempt data from the provided attempt data.""" 175 if self._attempt_data is None: 176 raise ValueError( 177 "Attempt data not provided. SyncAttempt should be created via " 178 "SyncResult.get_attempts()." 179 ) 180 return self._attempt_data["attempt"] 181 182 def get_full_log_text(self) -> str: 183 """Return the complete log text for this attempt. 184 185 Returns: 186 String containing all log text for this attempt, with lines separated by newlines. 187 """ 188 if self._attempt_data is None: 189 return "" 190 191 logs_data = self._attempt_data.get("logs") 192 if not logs_data: 193 return "" 194 195 result = "" 196 197 if "events" in logs_data: 198 log_events = logs_data["events"] 199 if log_events: 200 log_lines = [] 201 for event in log_events: 202 timestamp = event.get("timestamp", "") 203 level = event.get("level", "INFO") 204 message = event.get("message", "") 205 log_lines.append(f"[{timestamp}] {level}: {message}") 206 result = "\n".join(log_lines) 207 elif "logLines" in logs_data: 208 log_lines = logs_data["logLines"] 209 if log_lines: 210 result = "\n".join(log_lines) 211 212 return result 213 214 215@dataclass 216class SyncResult: 217 """The result of a sync operation. 218 219 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 220 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 221 """ 222 223 workspace: CloudWorkspace 224 connection: CloudConnection 225 job_id: int 226 table_name_prefix: str = "" 227 table_name_suffix: str = "" 228 _latest_job_info: JobResponse | None = None 229 _connection_response: ConnectionResponse | None = None 230 _cache: CacheBase | None = None 231 _job_with_attempts_info: dict[str, Any] | None = None 232 233 @property 234 def job_url(self) -> str: 235 """Return the URL of the sync job. 236 237 Note: This currently returns the connection's job history URL, as there is no direct URL 238 to a specific job in the Airbyte Cloud web app. 239 240 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 241 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 242 """ 243 return f"{self.connection.job_history_url}" 244 245 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 246 """Return connection info for the sync job.""" 247 if self._connection_response and not force_refresh: 248 return self._connection_response 249 250 self._connection_response = api_util.get_connection( 251 workspace_id=self.workspace.workspace_id, 252 api_root=self.workspace.api_root, 253 connection_id=self.connection.connection_id, 254 client_id=self.workspace.client_id, 255 client_secret=self.workspace.client_secret, 256 ) 257 return self._connection_response 258 259 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 260 """Return the destination configuration for the sync job.""" 261 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 262 destination_response = api_util.get_destination( 263 destination_id=connection_info.destination_id, 264 api_root=self.workspace.api_root, 265 client_id=self.workspace.client_id, 266 client_secret=self.workspace.client_secret, 267 ) 268 return asdict(destination_response.configuration) 269 270 def is_job_complete(self) -> bool: 271 """Check if the sync job is complete.""" 272 return self.get_job_status() in FINAL_STATUSES 273 274 def get_job_status(self) -> JobStatusEnum: 275 """Check if the sync job is still running.""" 276 return self._fetch_latest_job_info().status 277 278 def _fetch_latest_job_info(self) -> JobResponse: 279 """Return the job info for the sync job.""" 280 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 281 return self._latest_job_info 282 283 self._latest_job_info = api_util.get_job_info( 284 job_id=self.job_id, 285 api_root=self.workspace.api_root, 286 client_id=self.workspace.client_id, 287 client_secret=self.workspace.client_secret, 288 ) 289 return self._latest_job_info 290 291 @property 292 def bytes_synced(self) -> int: 293 """Return the number of records processed.""" 294 return self._fetch_latest_job_info().bytes_synced or 0 295 296 @property 297 def records_synced(self) -> int: 298 """Return the number of records processed.""" 299 return self._fetch_latest_job_info().rows_synced or 0 300 301 @property 302 def start_time(self) -> datetime: 303 """Return the start time of the sync job in UTC.""" 304 try: 305 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 306 except (ValueError, TypeError) as e: 307 if "Invalid isoformat string" in str(e): 308 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 309 api_root=self.workspace.api_root, 310 path="/jobs/get", 311 json={"id": self.job_id}, 312 client_id=self.workspace.client_id, 313 client_secret=self.workspace.client_secret, 314 ) 315 raw_start_time = job_info_raw.get("startTime") 316 if raw_start_time: 317 return ab_datetime_parse(raw_start_time) 318 raise 319 320 def _fetch_job_with_attempts(self) -> dict[str, Any]: 321 """Fetch job info with attempts from Config API using lazy loading pattern.""" 322 if self._job_with_attempts_info is not None: 323 return self._job_with_attempts_info 324 325 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 326 api_root=self.workspace.api_root, 327 path="/jobs/get", 328 json={ 329 "id": self.job_id, 330 }, 331 client_id=self.workspace.client_id, 332 client_secret=self.workspace.client_secret, 333 ) 334 return self._job_with_attempts_info 335 336 def get_attempts(self) -> list[SyncAttempt]: 337 """Return a list of attempts for this sync job.""" 338 job_with_attempts = self._fetch_job_with_attempts() 339 attempts_data = job_with_attempts.get("attempts", []) 340 341 return [ 342 SyncAttempt( 343 workspace=self.workspace, 344 connection=self.connection, 345 job_id=self.job_id, 346 attempt_number=i, 347 _attempt_data=attempt_data, 348 ) 349 for i, attempt_data in enumerate(attempts_data, start=0) 350 ] 351 352 def raise_failure_status( 353 self, 354 *, 355 refresh_status: bool = False, 356 ) -> None: 357 """Raise an exception if the sync job failed. 358 359 By default, this method will use the latest status available. If you want to refresh the 360 status before checking for failure, set `refresh_status=True`. If the job has failed, this 361 method will raise a `AirbyteConnectionSyncError`. 362 363 Otherwise, do nothing. 364 """ 365 if not refresh_status and self._latest_job_info: 366 latest_status = self._latest_job_info.status 367 else: 368 latest_status = self.get_job_status() 369 370 if latest_status in FAILED_STATUSES: 371 raise AirbyteConnectionSyncError( 372 workspace=self.workspace, 373 connection_id=self.connection.connection_id, 374 job_id=self.job_id, 375 job_status=self.get_job_status(), 376 ) 377 378 def wait_for_completion( 379 self, 380 *, 381 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 382 raise_timeout: bool = True, 383 raise_failure: bool = False, 384 ) -> JobStatusEnum: 385 """Wait for a job to finish running.""" 386 start_time = time.time() 387 while True: 388 latest_status = self.get_job_status() 389 if latest_status in FINAL_STATUSES: 390 if raise_failure: 391 # No-op if the job succeeded or is still running: 392 self.raise_failure_status() 393 394 return latest_status 395 396 if time.time() - start_time > wait_timeout: 397 if raise_timeout: 398 raise AirbyteConnectionSyncTimeoutError( 399 workspace=self.workspace, 400 connection_id=self.connection.connection_id, 401 job_id=self.job_id, 402 job_status=latest_status, 403 timeout=wait_timeout, 404 ) 405 406 return latest_status # This will be a non-final status 407 408 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 409 410 def get_sql_cache(self) -> CacheBase: 411 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 412 if self._cache: 413 return self._cache 414 415 destination_configuration = self._get_destination_configuration() 416 self._cache = destination_to_cache(destination_configuration=destination_configuration) 417 return self._cache 418 419 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 420 """Return a SQL Engine for querying a SQL-based destination.""" 421 return self.get_sql_cache().get_sql_engine() 422 423 def get_sql_table_name(self, stream_name: str) -> str: 424 """Return the SQL table name of the named stream.""" 425 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 426 427 def get_sql_table( 428 self, 429 stream_name: str, 430 ) -> sqlalchemy.Table: 431 """Return a SQLAlchemy table object for the named stream.""" 432 return self.get_sql_cache().processor.get_sql_table(stream_name) 433 434 def get_dataset(self, stream_name: str) -> CachedDataset: 435 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 436 437 This can be used to read and analyze the data in a SQL-based destination. 438 439 TODO: In a future iteration, we can consider providing stream configuration information 440 (catalog information) to the `CachedDataset` object via the "Get stream properties" 441 API: https://reference.airbyte.com/reference/getstreamproperties 442 """ 443 return CachedDataset( 444 self.get_sql_cache(), 445 stream_name=stream_name, 446 stream_configuration=False, # Don't look for stream configuration in cache. 447 ) 448 449 def get_sql_database_name(self) -> str: 450 """Return the SQL database name.""" 451 cache = self.get_sql_cache() 452 return cache.get_database_name() 453 454 def get_sql_schema_name(self) -> str: 455 """Return the SQL schema name.""" 456 cache = self.get_sql_cache() 457 return cache.schema_name 458 459 @property 460 def stream_names(self) -> list[str]: 461 """Return the set of stream names.""" 462 return self.connection.stream_names 463 464 @final 465 @property 466 def streams( 467 self, 468 ) -> _SyncResultStreams: 469 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 470 471 This is a convenience wrapper around the `stream_names` 472 property and `get_dataset()` method. 473 """ 474 return self._SyncResultStreams(self) 475 476 class _SyncResultStreams(Mapping[str, CachedDataset]): 477 """A mapping of stream names to cached datasets.""" 478 479 def __init__( 480 self, 481 parent: SyncResult, 482 /, 483 ) -> None: 484 self.parent: SyncResult = parent 485 486 def __getitem__(self, key: str) -> CachedDataset: 487 return self.parent.get_dataset(stream_name=key) 488 489 def __iter__(self) -> Iterator[str]: 490 return iter(self.parent.stream_names) 491 492 def __len__(self) -> int: 493 return len(self.parent.stream_names) 494 495 496__all__ = [ 497 "SyncResult", 498 "SyncAttempt", 499]
216@dataclass 217class SyncResult: 218 """The result of a sync operation. 219 220 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 221 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 222 """ 223 224 workspace: CloudWorkspace 225 connection: CloudConnection 226 job_id: int 227 table_name_prefix: str = "" 228 table_name_suffix: str = "" 229 _latest_job_info: JobResponse | None = None 230 _connection_response: ConnectionResponse | None = None 231 _cache: CacheBase | None = None 232 _job_with_attempts_info: dict[str, Any] | None = None 233 234 @property 235 def job_url(self) -> str: 236 """Return the URL of the sync job. 237 238 Note: This currently returns the connection's job history URL, as there is no direct URL 239 to a specific job in the Airbyte Cloud web app. 240 241 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 242 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 243 """ 244 return f"{self.connection.job_history_url}" 245 246 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 247 """Return connection info for the sync job.""" 248 if self._connection_response and not force_refresh: 249 return self._connection_response 250 251 self._connection_response = api_util.get_connection( 252 workspace_id=self.workspace.workspace_id, 253 api_root=self.workspace.api_root, 254 connection_id=self.connection.connection_id, 255 client_id=self.workspace.client_id, 256 client_secret=self.workspace.client_secret, 257 ) 258 return self._connection_response 259 260 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 261 """Return the destination configuration for the sync job.""" 262 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 263 destination_response = api_util.get_destination( 264 destination_id=connection_info.destination_id, 265 api_root=self.workspace.api_root, 266 client_id=self.workspace.client_id, 267 client_secret=self.workspace.client_secret, 268 ) 269 return asdict(destination_response.configuration) 270 271 def is_job_complete(self) -> bool: 272 """Check if the sync job is complete.""" 273 return self.get_job_status() in FINAL_STATUSES 274 275 def get_job_status(self) -> JobStatusEnum: 276 """Check if the sync job is still running.""" 277 return self._fetch_latest_job_info().status 278 279 def _fetch_latest_job_info(self) -> JobResponse: 280 """Return the job info for the sync job.""" 281 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 282 return self._latest_job_info 283 284 self._latest_job_info = api_util.get_job_info( 285 job_id=self.job_id, 286 api_root=self.workspace.api_root, 287 client_id=self.workspace.client_id, 288 client_secret=self.workspace.client_secret, 289 ) 290 return self._latest_job_info 291 292 @property 293 def bytes_synced(self) -> int: 294 """Return the number of records processed.""" 295 return self._fetch_latest_job_info().bytes_synced or 0 296 297 @property 298 def records_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().rows_synced or 0 301 302 @property 303 def start_time(self) -> datetime: 304 """Return the start time of the sync job in UTC.""" 305 try: 306 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 307 except (ValueError, TypeError) as e: 308 if "Invalid isoformat string" in str(e): 309 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 310 api_root=self.workspace.api_root, 311 path="/jobs/get", 312 json={"id": self.job_id}, 313 client_id=self.workspace.client_id, 314 client_secret=self.workspace.client_secret, 315 ) 316 raw_start_time = job_info_raw.get("startTime") 317 if raw_start_time: 318 return ab_datetime_parse(raw_start_time) 319 raise 320 321 def _fetch_job_with_attempts(self) -> dict[str, Any]: 322 """Fetch job info with attempts from Config API using lazy loading pattern.""" 323 if self._job_with_attempts_info is not None: 324 return self._job_with_attempts_info 325 326 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 327 api_root=self.workspace.api_root, 328 path="/jobs/get", 329 json={ 330 "id": self.job_id, 331 }, 332 client_id=self.workspace.client_id, 333 client_secret=self.workspace.client_secret, 334 ) 335 return self._job_with_attempts_info 336 337 def get_attempts(self) -> list[SyncAttempt]: 338 """Return a list of attempts for this sync job.""" 339 job_with_attempts = self._fetch_job_with_attempts() 340 attempts_data = job_with_attempts.get("attempts", []) 341 342 return [ 343 SyncAttempt( 344 workspace=self.workspace, 345 connection=self.connection, 346 job_id=self.job_id, 347 attempt_number=i, 348 _attempt_data=attempt_data, 349 ) 350 for i, attempt_data in enumerate(attempts_data, start=0) 351 ] 352 353 def raise_failure_status( 354 self, 355 *, 356 refresh_status: bool = False, 357 ) -> None: 358 """Raise an exception if the sync job failed. 359 360 By default, this method will use the latest status available. If you want to refresh the 361 status before checking for failure, set `refresh_status=True`. If the job has failed, this 362 method will raise a `AirbyteConnectionSyncError`. 363 364 Otherwise, do nothing. 365 """ 366 if not refresh_status and self._latest_job_info: 367 latest_status = self._latest_job_info.status 368 else: 369 latest_status = self.get_job_status() 370 371 if latest_status in FAILED_STATUSES: 372 raise AirbyteConnectionSyncError( 373 workspace=self.workspace, 374 connection_id=self.connection.connection_id, 375 job_id=self.job_id, 376 job_status=self.get_job_status(), 377 ) 378 379 def wait_for_completion( 380 self, 381 *, 382 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 383 raise_timeout: bool = True, 384 raise_failure: bool = False, 385 ) -> JobStatusEnum: 386 """Wait for a job to finish running.""" 387 start_time = time.time() 388 while True: 389 latest_status = self.get_job_status() 390 if latest_status in FINAL_STATUSES: 391 if raise_failure: 392 # No-op if the job succeeded or is still running: 393 self.raise_failure_status() 394 395 return latest_status 396 397 if time.time() - start_time > wait_timeout: 398 if raise_timeout: 399 raise AirbyteConnectionSyncTimeoutError( 400 workspace=self.workspace, 401 connection_id=self.connection.connection_id, 402 job_id=self.job_id, 403 job_status=latest_status, 404 timeout=wait_timeout, 405 ) 406 407 return latest_status # This will be a non-final status 408 409 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 410 411 def get_sql_cache(self) -> CacheBase: 412 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 413 if self._cache: 414 return self._cache 415 416 destination_configuration = self._get_destination_configuration() 417 self._cache = destination_to_cache(destination_configuration=destination_configuration) 418 return self._cache 419 420 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 421 """Return a SQL Engine for querying a SQL-based destination.""" 422 return self.get_sql_cache().get_sql_engine() 423 424 def get_sql_table_name(self, stream_name: str) -> str: 425 """Return the SQL table name of the named stream.""" 426 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 427 428 def get_sql_table( 429 self, 430 stream_name: str, 431 ) -> sqlalchemy.Table: 432 """Return a SQLAlchemy table object for the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table(stream_name) 434 435 def get_dataset(self, stream_name: str) -> CachedDataset: 436 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 437 438 This can be used to read and analyze the data in a SQL-based destination. 439 440 TODO: In a future iteration, we can consider providing stream configuration information 441 (catalog information) to the `CachedDataset` object via the "Get stream properties" 442 API: https://reference.airbyte.com/reference/getstreamproperties 443 """ 444 return CachedDataset( 445 self.get_sql_cache(), 446 stream_name=stream_name, 447 stream_configuration=False, # Don't look for stream configuration in cache. 448 ) 449 450 def get_sql_database_name(self) -> str: 451 """Return the SQL database name.""" 452 cache = self.get_sql_cache() 453 return cache.get_database_name() 454 455 def get_sql_schema_name(self) -> str: 456 """Return the SQL schema name.""" 457 cache = self.get_sql_cache() 458 return cache.schema_name 459 460 @property 461 def stream_names(self) -> list[str]: 462 """Return the set of stream names.""" 463 return self.connection.stream_names 464 465 @final 466 @property 467 def streams( 468 self, 469 ) -> _SyncResultStreams: 470 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 471 472 This is a convenience wrapper around the `stream_names` 473 property and `get_dataset()` method. 474 """ 475 return self._SyncResultStreams(self) 476 477 class _SyncResultStreams(Mapping[str, CachedDataset]): 478 """A mapping of stream names to cached datasets.""" 479 480 def __init__( 481 self, 482 parent: SyncResult, 483 /, 484 ) -> None: 485 self.parent: SyncResult = parent 486 487 def __getitem__(self, key: str) -> CachedDataset: 488 return self.parent.get_dataset(stream_name=key) 489 490 def __iter__(self) -> Iterator[str]: 491 return iter(self.parent.stream_names) 492 493 def __len__(self) -> int: 494 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
234 @property 235 def job_url(self) -> str: 236 """Return the URL of the sync job. 237 238 Note: This currently returns the connection's job history URL, as there is no direct URL 239 to a specific job in the Airbyte Cloud web app. 240 241 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 242 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 243 """ 244 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
271 def is_job_complete(self) -> bool: 272 """Check if the sync job is complete.""" 273 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
275 def get_job_status(self) -> JobStatusEnum: 276 """Check if the sync job is still running.""" 277 return self._fetch_latest_job_info().status
Check if the sync job is still running.
292 @property 293 def bytes_synced(self) -> int: 294 """Return the number of records processed.""" 295 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
297 @property 298 def records_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
302 @property 303 def start_time(self) -> datetime: 304 """Return the start time of the sync job in UTC.""" 305 try: 306 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 307 except (ValueError, TypeError) as e: 308 if "Invalid isoformat string" in str(e): 309 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 310 api_root=self.workspace.api_root, 311 path="/jobs/get", 312 json={"id": self.job_id}, 313 client_id=self.workspace.client_id, 314 client_secret=self.workspace.client_secret, 315 ) 316 raw_start_time = job_info_raw.get("startTime") 317 if raw_start_time: 318 return ab_datetime_parse(raw_start_time) 319 raise
Return the start time of the sync job in UTC.
337 def get_attempts(self) -> list[SyncAttempt]: 338 """Return a list of attempts for this sync job.""" 339 job_with_attempts = self._fetch_job_with_attempts() 340 attempts_data = job_with_attempts.get("attempts", []) 341 342 return [ 343 SyncAttempt( 344 workspace=self.workspace, 345 connection=self.connection, 346 job_id=self.job_id, 347 attempt_number=i, 348 _attempt_data=attempt_data, 349 ) 350 for i, attempt_data in enumerate(attempts_data, start=0) 351 ]
Return a list of attempts for this sync job.
353 def raise_failure_status( 354 self, 355 *, 356 refresh_status: bool = False, 357 ) -> None: 358 """Raise an exception if the sync job failed. 359 360 By default, this method will use the latest status available. If you want to refresh the 361 status before checking for failure, set `refresh_status=True`. If the job has failed, this 362 method will raise a `AirbyteConnectionSyncError`. 363 364 Otherwise, do nothing. 365 """ 366 if not refresh_status and self._latest_job_info: 367 latest_status = self._latest_job_info.status 368 else: 369 latest_status = self.get_job_status() 370 371 if latest_status in FAILED_STATUSES: 372 raise AirbyteConnectionSyncError( 373 workspace=self.workspace, 374 connection_id=self.connection.connection_id, 375 job_id=self.job_id, 376 job_status=self.get_job_status(), 377 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
379 def wait_for_completion( 380 self, 381 *, 382 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 383 raise_timeout: bool = True, 384 raise_failure: bool = False, 385 ) -> JobStatusEnum: 386 """Wait for a job to finish running.""" 387 start_time = time.time() 388 while True: 389 latest_status = self.get_job_status() 390 if latest_status in FINAL_STATUSES: 391 if raise_failure: 392 # No-op if the job succeeded or is still running: 393 self.raise_failure_status() 394 395 return latest_status 396 397 if time.time() - start_time > wait_timeout: 398 if raise_timeout: 399 raise AirbyteConnectionSyncTimeoutError( 400 workspace=self.workspace, 401 connection_id=self.connection.connection_id, 402 job_id=self.job_id, 403 job_status=latest_status, 404 timeout=wait_timeout, 405 ) 406 407 return latest_status # This will be a non-final status 408 409 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
411 def get_sql_cache(self) -> CacheBase: 412 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 413 if self._cache: 414 return self._cache 415 416 destination_configuration = self._get_destination_configuration() 417 self._cache = destination_to_cache(destination_configuration=destination_configuration) 418 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
420 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 421 """Return a SQL Engine for querying a SQL-based destination.""" 422 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
424 def get_sql_table_name(self, stream_name: str) -> str: 425 """Return the SQL table name of the named stream.""" 426 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
428 def get_sql_table( 429 self, 430 stream_name: str, 431 ) -> sqlalchemy.Table: 432 """Return a SQLAlchemy table object for the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
435 def get_dataset(self, stream_name: str) -> CachedDataset: 436 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 437 438 This can be used to read and analyze the data in a SQL-based destination. 439 440 TODO: In a future iteration, we can consider providing stream configuration information 441 (catalog information) to the `CachedDataset` object via the "Get stream properties" 442 API: https://reference.airbyte.com/reference/getstreamproperties 443 """ 444 return CachedDataset( 445 self.get_sql_cache(), 446 stream_name=stream_name, 447 stream_configuration=False, # Don't look for stream configuration in cache. 448 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
450 def get_sql_database_name(self) -> str: 451 """Return the SQL database name.""" 452 cache = self.get_sql_cache() 453 return cache.get_database_name()
Return the SQL database name.
455 def get_sql_schema_name(self) -> str: 456 """Return the SQL schema name.""" 457 cache = self.get_sql_cache() 458 return cache.schema_name
Return the SQL schema name.
460 @property 461 def stream_names(self) -> list[str]: 462 """Return the set of stream names.""" 463 return self.connection.stream_names
Return the set of stream names.
465 @final 466 @property 467 def streams( 468 self, 469 ) -> _SyncResultStreams: 470 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 471 472 This is a convenience wrapper around the `stream_names` 473 property and `get_dataset()` method. 474 """ 475 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
134@dataclass 135class SyncAttempt: 136 """Represents a single attempt of a sync job. 137 138 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncAttempt` by 139 calling `.SyncResult.get_attempts()`. 140 """ 141 142 workspace: CloudWorkspace 143 connection: CloudConnection 144 job_id: int 145 attempt_number: int 146 _attempt_data: dict[str, Any] | None = None 147 148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"] 152 153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"] 157 158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0) 162 163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0) 167 168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp) 173 174 def _get_attempt_data(self) -> dict[str, Any]: 175 """Get attempt data from the provided attempt data.""" 176 if self._attempt_data is None: 177 raise ValueError( 178 "Attempt data not provided. SyncAttempt should be created via " 179 "SyncResult.get_attempts()." 180 ) 181 return self._attempt_data["attempt"] 182 183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append(f"[{timestamp}] {level}: {message}") 207 result = "\n".join(log_lines) 208 elif "logLines" in logs_data: 209 log_lines = logs_data["logLines"] 210 if log_lines: 211 result = "\n".join(log_lines) 212 213 return result
Represents a single attempt of a sync job.
This class is not meant to be instantiated directly. Instead, obtain a SyncAttempt
by
calling .SyncResult.get_attempts()
.
148 @property 149 def attempt_id(self) -> int: 150 """Return the attempt ID.""" 151 return self._get_attempt_data()["id"]
Return the attempt ID.
153 @property 154 def status(self) -> str: 155 """Return the attempt status.""" 156 return self._get_attempt_data()["status"]
Return the attempt status.
158 @property 159 def bytes_synced(self) -> int: 160 """Return the number of bytes synced in this attempt.""" 161 return self._get_attempt_data().get("bytesSynced", 0)
Return the number of bytes synced in this attempt.
163 @property 164 def records_synced(self) -> int: 165 """Return the number of records synced in this attempt.""" 166 return self._get_attempt_data().get("recordsSynced", 0)
Return the number of records synced in this attempt.
168 @property 169 def created_at(self) -> datetime: 170 """Return the creation time of the attempt.""" 171 timestamp = self._get_attempt_data()["createdAt"] 172 return ab_datetime_parse(timestamp)
Return the creation time of the attempt.
183 def get_full_log_text(self) -> str: 184 """Return the complete log text for this attempt. 185 186 Returns: 187 String containing all log text for this attempt, with lines separated by newlines. 188 """ 189 if self._attempt_data is None: 190 return "" 191 192 logs_data = self._attempt_data.get("logs") 193 if not logs_data: 194 return "" 195 196 result = "" 197 198 if "events" in logs_data: 199 log_events = logs_data["events"] 200 if log_events: 201 log_lines = [] 202 for event in log_events: 203 timestamp = event.get("timestamp", "") 204 level = event.get("level", "INFO") 205 message = event.get("message", "") 206 log_lines.append(f"[{timestamp}] {level}: {message}") 207 result = "\n".join(log_lines) 208 elif "logLines" in logs_data: 209 log_lines = logs_data["logLines"] 210 if log_lines: 211 result = "\n".join(log_lines) 212 213 return result
Return the complete log text for this attempt.
Returns:
String containing all log text for this attempt, with lines separated by newlines.