airbyte.cloud.sync_results
Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection
object. You can obtain this object by calling
.CloudWorkspace.get_connection()
.
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection
object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync()
will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False
.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES
.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import asdict, dataclass 106from datetime import datetime 107from typing import TYPE_CHECKING, Any, final 108 109from airbyte._util import api_util 110from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 111from airbyte.datasets import CachedDataset 112from airbyte.destinations._translate_dest_to_cache import destination_to_cache 113from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 114 115 116DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 117"""The default timeout for waiting for a sync job to complete, in seconds.""" 118 119if TYPE_CHECKING: 120 import sqlalchemy 121 122 from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum 123 from airbyte.caches.base import CacheBase 124 from airbyte.cloud.connections import CloudConnection 125 from airbyte.cloud.workspaces import CloudWorkspace 126 127 128@dataclass 129class SyncResult: 130 """The result of a sync operation. 131 132 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 133 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 134 """ 135 136 workspace: CloudWorkspace 137 connection: CloudConnection 138 job_id: int 139 table_name_prefix: str = "" 140 table_name_suffix: str = "" 141 _latest_job_info: JobResponse | None = None 142 _connection_response: ConnectionResponse | None = None 143 _cache: CacheBase | None = None 144 145 @property 146 def job_url(self) -> str: 147 """Return the URL of the sync job. 148 149 Note: This currently returns the connection's job history URL, as there is no direct URL 150 to a specific job in the Airbyte Cloud web app. 151 152 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 153 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 154 """ 155 return f"{self.connection.job_history_url}" 156 157 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 158 """Return connection info for the sync job.""" 159 if self._connection_response and not force_refresh: 160 return self._connection_response 161 162 self._connection_response = api_util.get_connection( 163 workspace_id=self.workspace.workspace_id, 164 api_root=self.workspace.api_root, 165 connection_id=self.connection.connection_id, 166 client_id=self.workspace.client_id, 167 client_secret=self.workspace.client_secret, 168 ) 169 return self._connection_response 170 171 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 172 """Return the destination configuration for the sync job.""" 173 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 174 destination_response = api_util.get_destination( 175 destination_id=connection_info.destination_id, 176 api_root=self.workspace.api_root, 177 client_id=self.workspace.client_id, 178 client_secret=self.workspace.client_secret, 179 ) 180 return asdict(destination_response.configuration) 181 182 def is_job_complete(self) -> bool: 183 """Check if the sync job is complete.""" 184 return self.get_job_status() in FINAL_STATUSES 185 186 def get_job_status(self) -> JobStatusEnum: 187 """Check if the sync job is still running.""" 188 return self._fetch_latest_job_info().status 189 190 def _fetch_latest_job_info(self) -> JobResponse: 191 """Return the job info for the sync job.""" 192 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 193 return self._latest_job_info 194 195 self._latest_job_info = api_util.get_job_info( 196 job_id=self.job_id, 197 api_root=self.workspace.api_root, 198 client_id=self.workspace.client_id, 199 client_secret=self.workspace.client_secret, 200 ) 201 return self._latest_job_info 202 203 @property 204 def bytes_synced(self) -> int: 205 """Return the number of records processed.""" 206 return self._fetch_latest_job_info().bytes_synced or 0 207 208 @property 209 def records_synced(self) -> int: 210 """Return the number of records processed.""" 211 return self._fetch_latest_job_info().rows_synced or 0 212 213 @property 214 def start_time(self) -> datetime: 215 """Return the start time of the sync job in UTC.""" 216 # Parse from ISO 8601 format: 217 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 218 219 def raise_failure_status( 220 self, 221 *, 222 refresh_status: bool = False, 223 ) -> None: 224 """Raise an exception if the sync job failed. 225 226 By default, this method will use the latest status available. If you want to refresh the 227 status before checking for failure, set `refresh_status=True`. If the job has failed, this 228 method will raise a `AirbyteConnectionSyncError`. 229 230 Otherwise, do nothing. 231 """ 232 if not refresh_status and self._latest_job_info: 233 latest_status = self._latest_job_info.status 234 else: 235 latest_status = self.get_job_status() 236 237 if latest_status in FAILED_STATUSES: 238 raise AirbyteConnectionSyncError( 239 workspace=self.workspace, 240 connection_id=self.connection.connection_id, 241 job_id=self.job_id, 242 job_status=self.get_job_status(), 243 ) 244 245 def wait_for_completion( 246 self, 247 *, 248 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 249 raise_timeout: bool = True, 250 raise_failure: bool = False, 251 ) -> JobStatusEnum: 252 """Wait for a job to finish running.""" 253 start_time = time.time() 254 while True: 255 latest_status = self.get_job_status() 256 if latest_status in FINAL_STATUSES: 257 if raise_failure: 258 # No-op if the job succeeded or is still running: 259 self.raise_failure_status() 260 261 return latest_status 262 263 if time.time() - start_time > wait_timeout: 264 if raise_timeout: 265 raise AirbyteConnectionSyncTimeoutError( 266 workspace=self.workspace, 267 connection_id=self.connection.connection_id, 268 job_id=self.job_id, 269 job_status=latest_status, 270 timeout=wait_timeout, 271 ) 272 273 return latest_status # This will be a non-final status 274 275 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 276 277 def get_sql_cache(self) -> CacheBase: 278 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 279 if self._cache: 280 return self._cache 281 282 destination_configuration = self._get_destination_configuration() 283 self._cache = destination_to_cache(destination_configuration=destination_configuration) 284 return self._cache 285 286 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 287 """Return a SQL Engine for querying a SQL-based destination.""" 288 return self.get_sql_cache().get_sql_engine() 289 290 def get_sql_table_name(self, stream_name: str) -> str: 291 """Return the SQL table name of the named stream.""" 292 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 293 294 def get_sql_table( 295 self, 296 stream_name: str, 297 ) -> sqlalchemy.Table: 298 """Return a SQLAlchemy table object for the named stream.""" 299 return self.get_sql_cache().processor.get_sql_table(stream_name) 300 301 def get_dataset(self, stream_name: str) -> CachedDataset: 302 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 303 304 This can be used to read and analyze the data in a SQL-based destination. 305 306 TODO: In a future iteration, we can consider providing stream configuration information 307 (catalog information) to the `CachedDataset` object via the "Get stream properties" 308 API: https://reference.airbyte.com/reference/getstreamproperties 309 """ 310 return CachedDataset( 311 self.get_sql_cache(), 312 stream_name=stream_name, 313 stream_configuration=False, # Don't look for stream configuration in cache. 314 ) 315 316 def get_sql_database_name(self) -> str: 317 """Return the SQL database name.""" 318 cache = self.get_sql_cache() 319 return cache.get_database_name() 320 321 def get_sql_schema_name(self) -> str: 322 """Return the SQL schema name.""" 323 cache = self.get_sql_cache() 324 return cache.schema_name 325 326 @property 327 def stream_names(self) -> list[str]: 328 """Return the set of stream names.""" 329 return self.connection.stream_names 330 331 @final 332 @property 333 def streams( 334 self, 335 ) -> _SyncResultStreams: 336 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 337 338 This is a convenience wrapper around the `stream_names` 339 property and `get_dataset()` method. 340 """ 341 return self._SyncResultStreams(self) 342 343 class _SyncResultStreams(Mapping[str, CachedDataset]): 344 """A mapping of stream names to cached datasets.""" 345 346 def __init__( 347 self, 348 parent: SyncResult, 349 /, 350 ) -> None: 351 self.parent: SyncResult = parent 352 353 def __getitem__(self, key: str) -> CachedDataset: 354 return self.parent.get_dataset(stream_name=key) 355 356 def __iter__(self) -> Iterator[str]: 357 return iter(self.parent.stream_names) 358 359 def __len__(self) -> int: 360 return len(self.parent.stream_names) 361 362 363__all__ = [ 364 "SyncResult", 365]
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: int 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job. 149 150 Note: This currently returns the connection's job history URL, as there is no direct URL 151 to a specific job in the Airbyte Cloud web app. 152 153 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 154 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 155 """ 156 return f"{self.connection.job_history_url}" 157 158 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 159 """Return connection info for the sync job.""" 160 if self._connection_response and not force_refresh: 161 return self._connection_response 162 163 self._connection_response = api_util.get_connection( 164 workspace_id=self.workspace.workspace_id, 165 api_root=self.workspace.api_root, 166 connection_id=self.connection.connection_id, 167 client_id=self.workspace.client_id, 168 client_secret=self.workspace.client_secret, 169 ) 170 return self._connection_response 171 172 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 173 """Return the destination configuration for the sync job.""" 174 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 175 destination_response = api_util.get_destination( 176 destination_id=connection_info.destination_id, 177 api_root=self.workspace.api_root, 178 client_id=self.workspace.client_id, 179 client_secret=self.workspace.client_secret, 180 ) 181 return asdict(destination_response.configuration) 182 183 def is_job_complete(self) -> bool: 184 """Check if the sync job is complete.""" 185 return self.get_job_status() in FINAL_STATUSES 186 187 def get_job_status(self) -> JobStatusEnum: 188 """Check if the sync job is still running.""" 189 return self._fetch_latest_job_info().status 190 191 def _fetch_latest_job_info(self) -> JobResponse: 192 """Return the job info for the sync job.""" 193 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 194 return self._latest_job_info 195 196 self._latest_job_info = api_util.get_job_info( 197 job_id=self.job_id, 198 api_root=self.workspace.api_root, 199 client_id=self.workspace.client_id, 200 client_secret=self.workspace.client_secret, 201 ) 202 return self._latest_job_info 203 204 @property 205 def bytes_synced(self) -> int: 206 """Return the number of records processed.""" 207 return self._fetch_latest_job_info().bytes_synced or 0 208 209 @property 210 def records_synced(self) -> int: 211 """Return the number of records processed.""" 212 return self._fetch_latest_job_info().rows_synced or 0 213 214 @property 215 def start_time(self) -> datetime: 216 """Return the start time of the sync job in UTC.""" 217 # Parse from ISO 8601 format: 218 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 219 220 def raise_failure_status( 221 self, 222 *, 223 refresh_status: bool = False, 224 ) -> None: 225 """Raise an exception if the sync job failed. 226 227 By default, this method will use the latest status available. If you want to refresh the 228 status before checking for failure, set `refresh_status=True`. If the job has failed, this 229 method will raise a `AirbyteConnectionSyncError`. 230 231 Otherwise, do nothing. 232 """ 233 if not refresh_status and self._latest_job_info: 234 latest_status = self._latest_job_info.status 235 else: 236 latest_status = self.get_job_status() 237 238 if latest_status in FAILED_STATUSES: 239 raise AirbyteConnectionSyncError( 240 workspace=self.workspace, 241 connection_id=self.connection.connection_id, 242 job_id=self.job_id, 243 job_status=self.get_job_status(), 244 ) 245 246 def wait_for_completion( 247 self, 248 *, 249 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 250 raise_timeout: bool = True, 251 raise_failure: bool = False, 252 ) -> JobStatusEnum: 253 """Wait for a job to finish running.""" 254 start_time = time.time() 255 while True: 256 latest_status = self.get_job_status() 257 if latest_status in FINAL_STATUSES: 258 if raise_failure: 259 # No-op if the job succeeded or is still running: 260 self.raise_failure_status() 261 262 return latest_status 263 264 if time.time() - start_time > wait_timeout: 265 if raise_timeout: 266 raise AirbyteConnectionSyncTimeoutError( 267 workspace=self.workspace, 268 connection_id=self.connection.connection_id, 269 job_id=self.job_id, 270 job_status=latest_status, 271 timeout=wait_timeout, 272 ) 273 274 return latest_status # This will be a non-final status 275 276 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 277 278 def get_sql_cache(self) -> CacheBase: 279 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 280 if self._cache: 281 return self._cache 282 283 destination_configuration = self._get_destination_configuration() 284 self._cache = destination_to_cache(destination_configuration=destination_configuration) 285 return self._cache 286 287 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 288 """Return a SQL Engine for querying a SQL-based destination.""" 289 return self.get_sql_cache().get_sql_engine() 290 291 def get_sql_table_name(self, stream_name: str) -> str: 292 """Return the SQL table name of the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 294 295 def get_sql_table( 296 self, 297 stream_name: str, 298 ) -> sqlalchemy.Table: 299 """Return a SQLAlchemy table object for the named stream.""" 300 return self.get_sql_cache().processor.get_sql_table(stream_name) 301 302 def get_dataset(self, stream_name: str) -> CachedDataset: 303 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 304 305 This can be used to read and analyze the data in a SQL-based destination. 306 307 TODO: In a future iteration, we can consider providing stream configuration information 308 (catalog information) to the `CachedDataset` object via the "Get stream properties" 309 API: https://reference.airbyte.com/reference/getstreamproperties 310 """ 311 return CachedDataset( 312 self.get_sql_cache(), 313 stream_name=stream_name, 314 stream_configuration=False, # Don't look for stream configuration in cache. 315 ) 316 317 def get_sql_database_name(self) -> str: 318 """Return the SQL database name.""" 319 cache = self.get_sql_cache() 320 return cache.get_database_name() 321 322 def get_sql_schema_name(self) -> str: 323 """Return the SQL schema name.""" 324 cache = self.get_sql_cache() 325 return cache.schema_name 326 327 @property 328 def stream_names(self) -> list[str]: 329 """Return the set of stream names.""" 330 return self.connection.stream_names 331 332 @final 333 @property 334 def streams( 335 self, 336 ) -> _SyncResultStreams: 337 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 338 339 This is a convenience wrapper around the `stream_names` 340 property and `get_dataset()` method. 341 """ 342 return self._SyncResultStreams(self) 343 344 class _SyncResultStreams(Mapping[str, CachedDataset]): 345 """A mapping of stream names to cached datasets.""" 346 347 def __init__( 348 self, 349 parent: SyncResult, 350 /, 351 ) -> None: 352 self.parent: SyncResult = parent 353 354 def __getitem__(self, key: str) -> CachedDataset: 355 return self.parent.get_dataset(stream_name=key) 356 357 def __iter__(self) -> Iterator[str]: 358 return iter(self.parent.stream_names) 359 360 def __len__(self) -> int: 361 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job. 149 150 Note: This currently returns the connection's job history URL, as there is no direct URL 151 to a specific job in the Airbyte Cloud web app. 152 153 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 154 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 155 """ 156 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
183 def is_job_complete(self) -> bool: 184 """Check if the sync job is complete.""" 185 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
187 def get_job_status(self) -> JobStatusEnum: 188 """Check if the sync job is still running.""" 189 return self._fetch_latest_job_info().status
Check if the sync job is still running.
204 @property 205 def bytes_synced(self) -> int: 206 """Return the number of records processed.""" 207 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
209 @property 210 def records_synced(self) -> int: 211 """Return the number of records processed.""" 212 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
214 @property 215 def start_time(self) -> datetime: 216 """Return the start time of the sync job in UTC.""" 217 # Parse from ISO 8601 format: 218 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
220 def raise_failure_status( 221 self, 222 *, 223 refresh_status: bool = False, 224 ) -> None: 225 """Raise an exception if the sync job failed. 226 227 By default, this method will use the latest status available. If you want to refresh the 228 status before checking for failure, set `refresh_status=True`. If the job has failed, this 229 method will raise a `AirbyteConnectionSyncError`. 230 231 Otherwise, do nothing. 232 """ 233 if not refresh_status and self._latest_job_info: 234 latest_status = self._latest_job_info.status 235 else: 236 latest_status = self.get_job_status() 237 238 if latest_status in FAILED_STATUSES: 239 raise AirbyteConnectionSyncError( 240 workspace=self.workspace, 241 connection_id=self.connection.connection_id, 242 job_id=self.job_id, 243 job_status=self.get_job_status(), 244 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
246 def wait_for_completion( 247 self, 248 *, 249 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 250 raise_timeout: bool = True, 251 raise_failure: bool = False, 252 ) -> JobStatusEnum: 253 """Wait for a job to finish running.""" 254 start_time = time.time() 255 while True: 256 latest_status = self.get_job_status() 257 if latest_status in FINAL_STATUSES: 258 if raise_failure: 259 # No-op if the job succeeded or is still running: 260 self.raise_failure_status() 261 262 return latest_status 263 264 if time.time() - start_time > wait_timeout: 265 if raise_timeout: 266 raise AirbyteConnectionSyncTimeoutError( 267 workspace=self.workspace, 268 connection_id=self.connection.connection_id, 269 job_id=self.job_id, 270 job_status=latest_status, 271 timeout=wait_timeout, 272 ) 273 274 return latest_status # This will be a non-final status 275 276 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
278 def get_sql_cache(self) -> CacheBase: 279 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 280 if self._cache: 281 return self._cache 282 283 destination_configuration = self._get_destination_configuration() 284 self._cache = destination_to_cache(destination_configuration=destination_configuration) 285 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
287 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 288 """Return a SQL Engine for querying a SQL-based destination.""" 289 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
291 def get_sql_table_name(self, stream_name: str) -> str: 292 """Return the SQL table name of the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
295 def get_sql_table( 296 self, 297 stream_name: str, 298 ) -> sqlalchemy.Table: 299 """Return a SQLAlchemy table object for the named stream.""" 300 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
302 def get_dataset(self, stream_name: str) -> CachedDataset: 303 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 304 305 This can be used to read and analyze the data in a SQL-based destination. 306 307 TODO: In a future iteration, we can consider providing stream configuration information 308 (catalog information) to the `CachedDataset` object via the "Get stream properties" 309 API: https://reference.airbyte.com/reference/getstreamproperties 310 """ 311 return CachedDataset( 312 self.get_sql_cache(), 313 stream_name=stream_name, 314 stream_configuration=False, # Don't look for stream configuration in cache. 315 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
317 def get_sql_database_name(self) -> str: 318 """Return the SQL database name.""" 319 cache = self.get_sql_cache() 320 return cache.get_database_name()
Return the SQL database name.
322 def get_sql_schema_name(self) -> str: 323 """Return the SQL schema name.""" 324 cache = self.get_sql_cache() 325 return cache.schema_name
Return the SQL schema name.
327 @property 328 def stream_names(self) -> list[str]: 329 """Return the set of stream names.""" 330 return self.connection.stream_names
Return the set of stream names.
332 @final 333 @property 334 def streams( 335 self, 336 ) -> _SyncResultStreams: 337 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 338 339 This is a convenience wrapper around the `stream_names` 340 property and `get_dataset()` method. 341 """ 342 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.