airbyte.cloud.sync_results
Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection
object. You can obtain this object by calling
.CloudWorkspace.get_connection()
.
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection
object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync()
will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False
.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES
.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import asdict, dataclass 106from datetime import datetime 107from typing import TYPE_CHECKING, Any, final 108 109from airbyte._util import api_util 110from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 111from airbyte.datasets import CachedDataset 112from airbyte.destinations._translate_dest_to_cache import destination_to_cache 113from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 114 115 116DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 117"""The default timeout for waiting for a sync job to complete, in seconds.""" 118 119if TYPE_CHECKING: 120 import sqlalchemy 121 122 from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum 123 from airbyte.caches.base import CacheBase 124 from airbyte.cloud.connections import CloudConnection 125 from airbyte.cloud.workspaces import CloudWorkspace 126 127 128@dataclass 129class SyncResult: 130 """The result of a sync operation. 131 132 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 133 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 134 """ 135 136 workspace: CloudWorkspace 137 connection: CloudConnection 138 job_id: int 139 table_name_prefix: str = "" 140 table_name_suffix: str = "" 141 _latest_job_info: JobResponse | None = None 142 _connection_response: ConnectionResponse | None = None 143 _cache: CacheBase | None = None 144 145 @property 146 def job_url(self) -> str: 147 """Return the URL of the sync job.""" 148 return f"{self.connection.job_history_url}/{self.job_id}" 149 150 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 151 """Return connection info for the sync job.""" 152 if self._connection_response and not force_refresh: 153 return self._connection_response 154 155 self._connection_response = api_util.get_connection( 156 workspace_id=self.workspace.workspace_id, 157 api_root=self.workspace.api_root, 158 connection_id=self.connection.connection_id, 159 client_id=self.workspace.client_id, 160 client_secret=self.workspace.client_secret, 161 ) 162 return self._connection_response 163 164 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 165 """Return the destination configuration for the sync job.""" 166 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 167 destination_response = api_util.get_destination( 168 destination_id=connection_info.destination_id, 169 api_root=self.workspace.api_root, 170 client_id=self.workspace.client_id, 171 client_secret=self.workspace.client_secret, 172 ) 173 return asdict(destination_response.configuration) 174 175 def is_job_complete(self) -> bool: 176 """Check if the sync job is complete.""" 177 return self.get_job_status() in FINAL_STATUSES 178 179 def get_job_status(self) -> JobStatusEnum: 180 """Check if the sync job is still running.""" 181 return self._fetch_latest_job_info().status 182 183 def _fetch_latest_job_info(self) -> JobResponse: 184 """Return the job info for the sync job.""" 185 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 186 return self._latest_job_info 187 188 self._latest_job_info = api_util.get_job_info( 189 job_id=self.job_id, 190 api_root=self.workspace.api_root, 191 client_id=self.workspace.client_id, 192 client_secret=self.workspace.client_secret, 193 ) 194 return self._latest_job_info 195 196 @property 197 def bytes_synced(self) -> int: 198 """Return the number of records processed.""" 199 return self._fetch_latest_job_info().bytes_synced or 0 200 201 @property 202 def records_synced(self) -> int: 203 """Return the number of records processed.""" 204 return self._fetch_latest_job_info().rows_synced or 0 205 206 @property 207 def start_time(self) -> datetime: 208 """Return the start time of the sync job in UTC.""" 209 # Parse from ISO 8601 format: 210 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 211 212 def raise_failure_status( 213 self, 214 *, 215 refresh_status: bool = False, 216 ) -> None: 217 """Raise an exception if the sync job failed. 218 219 By default, this method will use the latest status available. If you want to refresh the 220 status before checking for failure, set `refresh_status=True`. If the job has failed, this 221 method will raise a `AirbyteConnectionSyncError`. 222 223 Otherwise, do nothing. 224 """ 225 if not refresh_status and self._latest_job_info: 226 latest_status = self._latest_job_info.status 227 else: 228 latest_status = self.get_job_status() 229 230 if latest_status in FAILED_STATUSES: 231 raise AirbyteConnectionSyncError( 232 workspace=self.workspace, 233 connection_id=self.connection.connection_id, 234 job_id=self.job_id, 235 job_status=self.get_job_status(), 236 ) 237 238 def wait_for_completion( 239 self, 240 *, 241 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 242 raise_timeout: bool = True, 243 raise_failure: bool = False, 244 ) -> JobStatusEnum: 245 """Wait for a job to finish running.""" 246 start_time = time.time() 247 while True: 248 latest_status = self.get_job_status() 249 if latest_status in FINAL_STATUSES: 250 if raise_failure: 251 # No-op if the job succeeded or is still running: 252 self.raise_failure_status() 253 254 return latest_status 255 256 if time.time() - start_time > wait_timeout: 257 if raise_timeout: 258 raise AirbyteConnectionSyncTimeoutError( 259 workspace=self.workspace, 260 connection_id=self.connection.connection_id, 261 job_id=self.job_id, 262 job_status=latest_status, 263 timeout=wait_timeout, 264 ) 265 266 return latest_status # This will be a non-final status 267 268 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 269 270 def get_sql_cache(self) -> CacheBase: 271 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 272 if self._cache: 273 return self._cache 274 275 destination_configuration = self._get_destination_configuration() 276 self._cache = destination_to_cache(destination_configuration=destination_configuration) 277 return self._cache 278 279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 return self.get_sql_cache().get_sql_engine() 282 283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 286 287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 return self.get_sql_cache().processor.get_sql_table(stream_name) 293 294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 ) 308 309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name() 313 314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name 318 319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names 323 324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self) 335 336 class _SyncResultStreams(Mapping[str, CachedDataset]): 337 """A mapping of stream names to cached datasets.""" 338 339 def __init__( 340 self, 341 parent: SyncResult, 342 /, 343 ) -> None: 344 self.parent: SyncResult = parent 345 346 def __getitem__(self, key: str) -> CachedDataset: 347 return self.parent.get_dataset(stream_name=key) 348 349 def __iter__(self) -> Iterator[str]: 350 return iter(self.parent.stream_names) 351 352 def __len__(self) -> int: 353 return len(self.parent.stream_names) 354 355 356__all__ = [ 357 "SyncResult", 358]
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: int 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}" 150 151 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 152 """Return connection info for the sync job.""" 153 if self._connection_response and not force_refresh: 154 return self._connection_response 155 156 self._connection_response = api_util.get_connection( 157 workspace_id=self.workspace.workspace_id, 158 api_root=self.workspace.api_root, 159 connection_id=self.connection.connection_id, 160 client_id=self.workspace.client_id, 161 client_secret=self.workspace.client_secret, 162 ) 163 return self._connection_response 164 165 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 166 """Return the destination configuration for the sync job.""" 167 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 168 destination_response = api_util.get_destination( 169 destination_id=connection_info.destination_id, 170 api_root=self.workspace.api_root, 171 client_id=self.workspace.client_id, 172 client_secret=self.workspace.client_secret, 173 ) 174 return asdict(destination_response.configuration) 175 176 def is_job_complete(self) -> bool: 177 """Check if the sync job is complete.""" 178 return self.get_job_status() in FINAL_STATUSES 179 180 def get_job_status(self) -> JobStatusEnum: 181 """Check if the sync job is still running.""" 182 return self._fetch_latest_job_info().status 183 184 def _fetch_latest_job_info(self) -> JobResponse: 185 """Return the job info for the sync job.""" 186 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 187 return self._latest_job_info 188 189 self._latest_job_info = api_util.get_job_info( 190 job_id=self.job_id, 191 api_root=self.workspace.api_root, 192 client_id=self.workspace.client_id, 193 client_secret=self.workspace.client_secret, 194 ) 195 return self._latest_job_info 196 197 @property 198 def bytes_synced(self) -> int: 199 """Return the number of records processed.""" 200 return self._fetch_latest_job_info().bytes_synced or 0 201 202 @property 203 def records_synced(self) -> int: 204 """Return the number of records processed.""" 205 return self._fetch_latest_job_info().rows_synced or 0 206 207 @property 208 def start_time(self) -> datetime: 209 """Return the start time of the sync job in UTC.""" 210 # Parse from ISO 8601 format: 211 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 212 213 def raise_failure_status( 214 self, 215 *, 216 refresh_status: bool = False, 217 ) -> None: 218 """Raise an exception if the sync job failed. 219 220 By default, this method will use the latest status available. If you want to refresh the 221 status before checking for failure, set `refresh_status=True`. If the job has failed, this 222 method will raise a `AirbyteConnectionSyncError`. 223 224 Otherwise, do nothing. 225 """ 226 if not refresh_status and self._latest_job_info: 227 latest_status = self._latest_job_info.status 228 else: 229 latest_status = self.get_job_status() 230 231 if latest_status in FAILED_STATUSES: 232 raise AirbyteConnectionSyncError( 233 workspace=self.workspace, 234 connection_id=self.connection.connection_id, 235 job_id=self.job_id, 236 job_status=self.get_job_status(), 237 ) 238 239 def wait_for_completion( 240 self, 241 *, 242 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 243 raise_timeout: bool = True, 244 raise_failure: bool = False, 245 ) -> JobStatusEnum: 246 """Wait for a job to finish running.""" 247 start_time = time.time() 248 while True: 249 latest_status = self.get_job_status() 250 if latest_status in FINAL_STATUSES: 251 if raise_failure: 252 # No-op if the job succeeded or is still running: 253 self.raise_failure_status() 254 255 return latest_status 256 257 if time.time() - start_time > wait_timeout: 258 if raise_timeout: 259 raise AirbyteConnectionSyncTimeoutError( 260 workspace=self.workspace, 261 connection_id=self.connection.connection_id, 262 job_id=self.job_id, 263 job_status=latest_status, 264 timeout=wait_timeout, 265 ) 266 267 return latest_status # This will be a non-final status 268 269 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 270 271 def get_sql_cache(self) -> CacheBase: 272 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 273 if self._cache: 274 return self._cache 275 276 destination_configuration = self._get_destination_configuration() 277 self._cache = destination_to_cache(destination_configuration=destination_configuration) 278 return self._cache 279 280 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 281 """Return a SQL Engine for querying a SQL-based destination.""" 282 return self.get_sql_cache().get_sql_engine() 283 284 def get_sql_table_name(self, stream_name: str) -> str: 285 """Return the SQL table name of the named stream.""" 286 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 287 288 def get_sql_table( 289 self, 290 stream_name: str, 291 ) -> sqlalchemy.Table: 292 """Return a SQLAlchemy table object for the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table(stream_name) 294 295 def get_dataset(self, stream_name: str) -> CachedDataset: 296 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 297 298 This can be used to read and analyze the data in a SQL-based destination. 299 300 TODO: In a future iteration, we can consider providing stream configuration information 301 (catalog information) to the `CachedDataset` object via the "Get stream properties" 302 API: https://reference.airbyte.com/reference/getstreamproperties 303 """ 304 return CachedDataset( 305 self.get_sql_cache(), 306 stream_name=stream_name, 307 stream_configuration=False, # Don't look for stream configuration in cache. 308 ) 309 310 def get_sql_database_name(self) -> str: 311 """Return the SQL database name.""" 312 cache = self.get_sql_cache() 313 return cache.get_database_name() 314 315 def get_sql_schema_name(self) -> str: 316 """Return the SQL schema name.""" 317 cache = self.get_sql_cache() 318 return cache.schema_name 319 320 @property 321 def stream_names(self) -> list[str]: 322 """Return the set of stream names.""" 323 return self.connection.stream_names 324 325 @final 326 @property 327 def streams( 328 self, 329 ) -> _SyncResultStreams: 330 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 331 332 This is a convenience wrapper around the `stream_names` 333 property and `get_dataset()` method. 334 """ 335 return self._SyncResultStreams(self) 336 337 class _SyncResultStreams(Mapping[str, CachedDataset]): 338 """A mapping of stream names to cached datasets.""" 339 340 def __init__( 341 self, 342 parent: SyncResult, 343 /, 344 ) -> None: 345 self.parent: SyncResult = parent 346 347 def __getitem__(self, key: str) -> CachedDataset: 348 return self.parent.get_dataset(stream_name=key) 349 350 def __iter__(self) -> Iterator[str]: 351 return iter(self.parent.stream_names) 352 353 def __len__(self) -> int: 354 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}"
Return the URL of the sync job.
176 def is_job_complete(self) -> bool: 177 """Check if the sync job is complete.""" 178 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
180 def get_job_status(self) -> JobStatusEnum: 181 """Check if the sync job is still running.""" 182 return self._fetch_latest_job_info().status
Check if the sync job is still running.
197 @property 198 def bytes_synced(self) -> int: 199 """Return the number of records processed.""" 200 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
202 @property 203 def records_synced(self) -> int: 204 """Return the number of records processed.""" 205 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
207 @property 208 def start_time(self) -> datetime: 209 """Return the start time of the sync job in UTC.""" 210 # Parse from ISO 8601 format: 211 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
213 def raise_failure_status( 214 self, 215 *, 216 refresh_status: bool = False, 217 ) -> None: 218 """Raise an exception if the sync job failed. 219 220 By default, this method will use the latest status available. If you want to refresh the 221 status before checking for failure, set `refresh_status=True`. If the job has failed, this 222 method will raise a `AirbyteConnectionSyncError`. 223 224 Otherwise, do nothing. 225 """ 226 if not refresh_status and self._latest_job_info: 227 latest_status = self._latest_job_info.status 228 else: 229 latest_status = self.get_job_status() 230 231 if latest_status in FAILED_STATUSES: 232 raise AirbyteConnectionSyncError( 233 workspace=self.workspace, 234 connection_id=self.connection.connection_id, 235 job_id=self.job_id, 236 job_status=self.get_job_status(), 237 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
239 def wait_for_completion( 240 self, 241 *, 242 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 243 raise_timeout: bool = True, 244 raise_failure: bool = False, 245 ) -> JobStatusEnum: 246 """Wait for a job to finish running.""" 247 start_time = time.time() 248 while True: 249 latest_status = self.get_job_status() 250 if latest_status in FINAL_STATUSES: 251 if raise_failure: 252 # No-op if the job succeeded or is still running: 253 self.raise_failure_status() 254 255 return latest_status 256 257 if time.time() - start_time > wait_timeout: 258 if raise_timeout: 259 raise AirbyteConnectionSyncTimeoutError( 260 workspace=self.workspace, 261 connection_id=self.connection.connection_id, 262 job_id=self.job_id, 263 job_status=latest_status, 264 timeout=wait_timeout, 265 ) 266 267 return latest_status # This will be a non-final status 268 269 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
271 def get_sql_cache(self) -> CacheBase: 272 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 273 if self._cache: 274 return self._cache 275 276 destination_configuration = self._get_destination_configuration() 277 self._cache = destination_to_cache(destination_configuration=destination_configuration) 278 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
280 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 281 """Return a SQL Engine for querying a SQL-based destination.""" 282 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
284 def get_sql_table_name(self, stream_name: str) -> str: 285 """Return the SQL table name of the named stream.""" 286 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
288 def get_sql_table( 289 self, 290 stream_name: str, 291 ) -> sqlalchemy.Table: 292 """Return a SQLAlchemy table object for the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
295 def get_dataset(self, stream_name: str) -> CachedDataset: 296 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 297 298 This can be used to read and analyze the data in a SQL-based destination. 299 300 TODO: In a future iteration, we can consider providing stream configuration information 301 (catalog information) to the `CachedDataset` object via the "Get stream properties" 302 API: https://reference.airbyte.com/reference/getstreamproperties 303 """ 304 return CachedDataset( 305 self.get_sql_cache(), 306 stream_name=stream_name, 307 stream_configuration=False, # Don't look for stream configuration in cache. 308 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
310 def get_sql_database_name(self) -> str: 311 """Return the SQL database name.""" 312 cache = self.get_sql_cache() 313 return cache.get_database_name()
Return the SQL database name.
315 def get_sql_schema_name(self) -> str: 316 """Return the SQL schema name.""" 317 cache = self.get_sql_cache() 318 return cache.schema_name
Return the SQL schema name.
320 @property 321 def stream_names(self) -> list[str]: 322 """Return the set of stream names.""" 323 return self.connection.stream_names
Return the set of stream names.
325 @final 326 @property 327 def streams( 328 self, 329 ) -> _SyncResultStreams: 330 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 331 332 This is a convenience wrapper around the `stream_names` 333 property and `get_dataset()` method. 334 """ 335 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.