airbyte.cloud.sync_results
Sync results for Airbyte Cloud workspaces.
Examples
Run a sync job and wait for completion
To get started, we'll need a .CloudConnection
object. You can obtain this object by calling
.CloudWorkspace.get_connection()
.
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Get a connection object
connection = workspace.get_connection(connection_id="456")
Once we have a .CloudConnection
object, we can simply call run_sync()
to start a sync job and wait for it to complete.
# Run a sync job
sync_result: SyncResult = connection.run_sync()
Run a sync job and return immediately
By default, run_sync()
will wait for the job to complete and raise an
exception if the job fails. You can instead return immediately by setting
wait=False
.
# Start the sync job and return immediately
sync_result: SyncResult = connection.run_sync(wait=False)
while not sync_result.is_job_complete():
print("Job is still running...")
time.sleep(5)
print(f"Job is complete! Status: {sync_result.get_job_status()}")
Examining the sync result
You can examine the sync result to get more information about the job:
sync_result: SyncResult = connection.run_sync()
# Print the job details
print(
f'''
Job ID: {sync_result.job_id}
Job URL: {sync_result.job_url}
Start Time: {sync_result.start_time}
Records Synced: {sync_result.records_synced}
Bytes Synced: {sync_result.bytes_synced}
Job Status: {sync_result.get_job_status()}
List of Stream Names: {', '.join(sync_result.stream_names)}
'''
)
Reading data from Airbyte Cloud sync result
This feature is currently only available for specific SQL-based destinations. This includes
SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be
determined by inspecting the constant airbyte.cloud.constants.READABLE_DESTINATION_TYPES
.
If your destination is supported, you can read records directly from the SyncResult object.
# Assuming we've already created a `connection` object...
sync_result = connection.get_sync_result()
# Print a list of available stream names
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get the SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Sync results for Airbyte Cloud workspaces. 3 4## Examples 5 6### Run a sync job and wait for completion 7 8To get started, we'll need a `.CloudConnection` object. You can obtain this object by calling 9`.CloudWorkspace.get_connection()`. 10 11```python 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Get a connection object 21connection = workspace.get_connection(connection_id="456") 22``` 23 24Once we have a `.CloudConnection` object, we can simply call `run_sync()` 25to start a sync job and wait for it to complete. 26 27```python 28# Run a sync job 29sync_result: SyncResult = connection.run_sync() 30``` 31 32### Run a sync job and return immediately 33 34By default, `run_sync()` will wait for the job to complete and raise an 35exception if the job fails. You can instead return immediately by setting 36`wait=False`. 37 38```python 39# Start the sync job and return immediately 40sync_result: SyncResult = connection.run_sync(wait=False) 41 42while not sync_result.is_job_complete(): 43 print("Job is still running...") 44 time.sleep(5) 45 46print(f"Job is complete! Status: {sync_result.get_job_status()}") 47``` 48 49### Examining the sync result 50 51You can examine the sync result to get more information about the job: 52 53```python 54sync_result: SyncResult = connection.run_sync() 55 56# Print the job details 57print( 58 f''' 59 Job ID: {sync_result.job_id} 60 Job URL: {sync_result.job_url} 61 Start Time: {sync_result.start_time} 62 Records Synced: {sync_result.records_synced} 63 Bytes Synced: {sync_result.bytes_synced} 64 Job Status: {sync_result.get_job_status()} 65 List of Stream Names: {', '.join(sync_result.stream_names)} 66 ''' 67) 68``` 69 70### Reading data from Airbyte Cloud sync result 71 72**This feature is currently only available for specific SQL-based destinations.** This includes 73SQL-based destinations such as Snowflake and BigQuery. The list of supported destinations may be 74determined by inspecting the constant `airbyte.cloud.constants.READABLE_DESTINATION_TYPES`. 75 76If your destination is supported, you can read records directly from the SyncResult object. 77 78```python 79# Assuming we've already created a `connection` object... 80sync_result = connection.get_sync_result() 81 82# Print a list of available stream names 83print(sync_result.stream_names) 84 85# Get a dataset from the sync result 86dataset: CachedDataset = sync_result.get_dataset("users") 87 88# Get the SQLAlchemy table to use in SQL queries... 89users_table = dataset.to_sql_table() 90print(f"Table name: {users_table.name}") 91 92# Or iterate over the dataset directly 93for record in dataset: 94 print(record) 95``` 96 97------ 98 99""" 100 101from __future__ import annotations 102 103import time 104from collections.abc import Iterator, Mapping 105from dataclasses import dataclass 106from datetime import datetime 107from typing import TYPE_CHECKING, Any, final 108 109from airbyte._util import api_util 110from airbyte.cloud._destination_util import create_cache_from_destination_config 111from airbyte.cloud.constants import FAILED_STATUSES, FINAL_STATUSES 112from airbyte.datasets import CachedDataset 113from airbyte.exceptions import AirbyteConnectionSyncError, AirbyteConnectionSyncTimeoutError 114 115 116DEFAULT_SYNC_TIMEOUT_SECONDS = 30 * 60 # 30 minutes 117"""The default timeout for waiting for a sync job to complete, in seconds.""" 118 119if TYPE_CHECKING: 120 import sqlalchemy 121 122 from airbyte._util.api_imports import ConnectionResponse, JobResponse, JobStatusEnum 123 from airbyte.caches.base import CacheBase 124 from airbyte.cloud.connections import CloudConnection 125 from airbyte.cloud.workspaces import CloudWorkspace 126 127 128@dataclass 129class SyncResult: 130 """The result of a sync operation. 131 132 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 133 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 134 """ 135 136 workspace: CloudWorkspace 137 connection: CloudConnection 138 job_id: str 139 table_name_prefix: str = "" 140 table_name_suffix: str = "" 141 _latest_job_info: JobResponse | None = None 142 _connection_response: ConnectionResponse | None = None 143 _cache: CacheBase | None = None 144 145 @property 146 def job_url(self) -> str: 147 """Return the URL of the sync job.""" 148 return f"{self.connection.job_history_url}/{self.job_id}" 149 150 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 151 """Return connection info for the sync job.""" 152 if self._connection_response and not force_refresh: 153 return self._connection_response 154 155 self._connection_response = api_util.get_connection( 156 workspace_id=self.workspace.workspace_id, 157 api_root=self.workspace.api_root, 158 api_key=self.workspace.api_key, 159 connection_id=self.connection.connection_id, 160 ) 161 return self._connection_response 162 163 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 164 """Return the destination configuration for the sync job.""" 165 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 166 destination_response = api_util.get_destination( 167 destination_id=connection_info.destination_id, 168 api_root=self.workspace.api_root, 169 api_key=self.workspace.api_key, 170 ) 171 return destination_response.configuration 172 173 def is_job_complete(self) -> bool: 174 """Check if the sync job is complete.""" 175 return self.get_job_status() in FINAL_STATUSES 176 177 def get_job_status(self) -> JobStatusEnum: 178 """Check if the sync job is still running.""" 179 return self._fetch_latest_job_info().status 180 181 def _fetch_latest_job_info(self) -> JobResponse: 182 """Return the job info for the sync job.""" 183 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 184 return self._latest_job_info 185 186 self._latest_job_info = api_util.get_job_info( 187 job_id=self.job_id, 188 api_root=self.workspace.api_root, 189 api_key=self.workspace.api_key, 190 ) 191 return self._latest_job_info 192 193 @property 194 def bytes_synced(self) -> int: 195 """Return the number of records processed.""" 196 return self._fetch_latest_job_info().bytes_synced 197 198 @property 199 def records_synced(self) -> int: 200 """Return the number of records processed.""" 201 return self._fetch_latest_job_info().rows_synced 202 203 @property 204 def start_time(self) -> datetime: 205 """Return the start time of the sync job in UTC.""" 206 # Parse from ISO 8601 format: 207 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 208 209 def raise_failure_status( 210 self, 211 *, 212 refresh_status: bool = False, 213 ) -> None: 214 """Raise an exception if the sync job failed. 215 216 By default, this method will use the latest status available. If you want to refresh the 217 status before checking for failure, set `refresh_status=True`. If the job has failed, this 218 method will raise a `AirbyteConnectionSyncError`. 219 220 Otherwise, do nothing. 221 """ 222 if not refresh_status and self._latest_job_info: 223 latest_status = self._latest_job_info.status 224 else: 225 latest_status = self.get_job_status() 226 227 if latest_status in FAILED_STATUSES: 228 raise AirbyteConnectionSyncError( 229 workspace=self.workspace, 230 connection_id=self.connection.connection_id, 231 job_id=self.job_id, 232 job_status=self.get_job_status(), 233 ) 234 235 def wait_for_completion( 236 self, 237 *, 238 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 239 raise_timeout: bool = True, 240 raise_failure: bool = False, 241 ) -> JobStatusEnum: 242 """Wait for a job to finish running.""" 243 start_time = time.time() 244 while True: 245 latest_status = self.get_job_status() 246 if latest_status in FINAL_STATUSES: 247 if raise_failure: 248 # No-op if the job succeeded or is still running: 249 self.raise_failure_status() 250 251 return latest_status 252 253 if time.time() - start_time > wait_timeout: 254 if raise_timeout: 255 raise AirbyteConnectionSyncTimeoutError( 256 workspace=self.workspace, 257 connection_id=self.connection.connection_id, 258 job_id=self.job_id, 259 job_status=latest_status, 260 timeout=wait_timeout, 261 ) 262 263 return latest_status # This will be a non-final status 264 265 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 266 267 def get_sql_cache(self) -> CacheBase: 268 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 269 if self._cache: 270 return self._cache 271 272 destination_configuration: dict[str, Any] = self._get_destination_configuration() 273 self._cache = create_cache_from_destination_config( 274 destination_configuration=destination_configuration 275 ) 276 return self._cache 277 278 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 279 """Return a SQL Engine for querying a SQL-based destination.""" 280 return self.get_sql_cache().get_sql_engine() 281 282 def get_sql_table_name(self, stream_name: str) -> str: 283 """Return the SQL table name of the named stream.""" 284 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 285 286 def get_sql_table( 287 self, 288 stream_name: str, 289 ) -> sqlalchemy.Table: 290 """Return a SQLAlchemy table object for the named stream.""" 291 return self.get_sql_cache().processor.get_sql_table(stream_name) 292 293 def get_dataset(self, stream_name: str) -> CachedDataset: 294 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 295 296 This can be used to read and analyze the data in a SQL-based destination. 297 298 TODO: In a future iteration, we can consider providing stream configuration information 299 (catalog information) to the `CachedDataset` object via the "Get stream properties" 300 API: https://reference.airbyte.com/reference/getstreamproperties 301 """ 302 return CachedDataset( 303 self.get_sql_cache(), 304 stream_name=stream_name, 305 stream_configuration=False, # Don't look for stream configuration in cache. 306 ) 307 308 def get_sql_database_name(self) -> str: 309 """Return the SQL database name.""" 310 cache = self.get_sql_cache() 311 return cache.get_database_name() 312 313 def get_sql_schema_name(self) -> str: 314 """Return the SQL schema name.""" 315 cache = self.get_sql_cache() 316 return cache.schema_name 317 318 @property 319 def stream_names(self) -> list[str]: 320 """Return the set of stream names.""" 321 return self.connection.stream_names 322 323 @final 324 @property 325 def streams( 326 self, 327 ) -> _SyncResultStreams: 328 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 329 330 This is a convenience wrapper around the `stream_names` 331 property and `get_dataset()` method. 332 """ 333 return self._SyncResultStreams(self) 334 335 class _SyncResultStreams(Mapping[str, CachedDataset]): 336 """A mapping of stream names to cached datasets.""" 337 338 def __init__( 339 self, 340 parent: SyncResult, 341 /, 342 ) -> None: 343 self.parent: SyncResult = parent 344 345 def __getitem__(self, key: str) -> CachedDataset: 346 return self.parent.get_dataset(stream_name=key) 347 348 def __iter__(self) -> Iterator[str]: 349 return iter(self.parent.stream_names) 350 351 def __len__(self) -> int: 352 return len(self.parent.stream_names) 353 354 355__all__ = [ 356 "SyncResult", 357]
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: str 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}" 150 151 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 152 """Return connection info for the sync job.""" 153 if self._connection_response and not force_refresh: 154 return self._connection_response 155 156 self._connection_response = api_util.get_connection( 157 workspace_id=self.workspace.workspace_id, 158 api_root=self.workspace.api_root, 159 api_key=self.workspace.api_key, 160 connection_id=self.connection.connection_id, 161 ) 162 return self._connection_response 163 164 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 165 """Return the destination configuration for the sync job.""" 166 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 167 destination_response = api_util.get_destination( 168 destination_id=connection_info.destination_id, 169 api_root=self.workspace.api_root, 170 api_key=self.workspace.api_key, 171 ) 172 return destination_response.configuration 173 174 def is_job_complete(self) -> bool: 175 """Check if the sync job is complete.""" 176 return self.get_job_status() in FINAL_STATUSES 177 178 def get_job_status(self) -> JobStatusEnum: 179 """Check if the sync job is still running.""" 180 return self._fetch_latest_job_info().status 181 182 def _fetch_latest_job_info(self) -> JobResponse: 183 """Return the job info for the sync job.""" 184 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 185 return self._latest_job_info 186 187 self._latest_job_info = api_util.get_job_info( 188 job_id=self.job_id, 189 api_root=self.workspace.api_root, 190 api_key=self.workspace.api_key, 191 ) 192 return self._latest_job_info 193 194 @property 195 def bytes_synced(self) -> int: 196 """Return the number of records processed.""" 197 return self._fetch_latest_job_info().bytes_synced 198 199 @property 200 def records_synced(self) -> int: 201 """Return the number of records processed.""" 202 return self._fetch_latest_job_info().rows_synced 203 204 @property 205 def start_time(self) -> datetime: 206 """Return the start time of the sync job in UTC.""" 207 # Parse from ISO 8601 format: 208 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 209 210 def raise_failure_status( 211 self, 212 *, 213 refresh_status: bool = False, 214 ) -> None: 215 """Raise an exception if the sync job failed. 216 217 By default, this method will use the latest status available. If you want to refresh the 218 status before checking for failure, set `refresh_status=True`. If the job has failed, this 219 method will raise a `AirbyteConnectionSyncError`. 220 221 Otherwise, do nothing. 222 """ 223 if not refresh_status and self._latest_job_info: 224 latest_status = self._latest_job_info.status 225 else: 226 latest_status = self.get_job_status() 227 228 if latest_status in FAILED_STATUSES: 229 raise AirbyteConnectionSyncError( 230 workspace=self.workspace, 231 connection_id=self.connection.connection_id, 232 job_id=self.job_id, 233 job_status=self.get_job_status(), 234 ) 235 236 def wait_for_completion( 237 self, 238 *, 239 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 240 raise_timeout: bool = True, 241 raise_failure: bool = False, 242 ) -> JobStatusEnum: 243 """Wait for a job to finish running.""" 244 start_time = time.time() 245 while True: 246 latest_status = self.get_job_status() 247 if latest_status in FINAL_STATUSES: 248 if raise_failure: 249 # No-op if the job succeeded or is still running: 250 self.raise_failure_status() 251 252 return latest_status 253 254 if time.time() - start_time > wait_timeout: 255 if raise_timeout: 256 raise AirbyteConnectionSyncTimeoutError( 257 workspace=self.workspace, 258 connection_id=self.connection.connection_id, 259 job_id=self.job_id, 260 job_status=latest_status, 261 timeout=wait_timeout, 262 ) 263 264 return latest_status # This will be a non-final status 265 266 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 267 268 def get_sql_cache(self) -> CacheBase: 269 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 270 if self._cache: 271 return self._cache 272 273 destination_configuration: dict[str, Any] = self._get_destination_configuration() 274 self._cache = create_cache_from_destination_config( 275 destination_configuration=destination_configuration 276 ) 277 return self._cache 278 279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 return self.get_sql_cache().get_sql_engine() 282 283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 286 287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 return self.get_sql_cache().processor.get_sql_table(stream_name) 293 294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 ) 308 309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name() 313 314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name 318 319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names 323 324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self) 335 336 class _SyncResultStreams(Mapping[str, CachedDataset]): 337 """A mapping of stream names to cached datasets.""" 338 339 def __init__( 340 self, 341 parent: SyncResult, 342 /, 343 ) -> None: 344 self.parent: SyncResult = parent 345 346 def __getitem__(self, key: str) -> CachedDataset: 347 return self.parent.get_dataset(stream_name=key) 348 349 def __iter__(self) -> Iterator[str]: 350 return iter(self.parent.stream_names) 351 352 def __len__(self) -> int: 353 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}"
Return the URL of the sync job.
174 def is_job_complete(self) -> bool: 175 """Check if the sync job is complete.""" 176 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
178 def get_job_status(self) -> JobStatusEnum: 179 """Check if the sync job is still running.""" 180 return self._fetch_latest_job_info().status
Check if the sync job is still running.
194 @property 195 def bytes_synced(self) -> int: 196 """Return the number of records processed.""" 197 return self._fetch_latest_job_info().bytes_synced
Return the number of records processed.
199 @property 200 def records_synced(self) -> int: 201 """Return the number of records processed.""" 202 return self._fetch_latest_job_info().rows_synced
Return the number of records processed.
204 @property 205 def start_time(self) -> datetime: 206 """Return the start time of the sync job in UTC.""" 207 # Parse from ISO 8601 format: 208 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
210 def raise_failure_status( 211 self, 212 *, 213 refresh_status: bool = False, 214 ) -> None: 215 """Raise an exception if the sync job failed. 216 217 By default, this method will use the latest status available. If you want to refresh the 218 status before checking for failure, set `refresh_status=True`. If the job has failed, this 219 method will raise a `AirbyteConnectionSyncError`. 220 221 Otherwise, do nothing. 222 """ 223 if not refresh_status and self._latest_job_info: 224 latest_status = self._latest_job_info.status 225 else: 226 latest_status = self.get_job_status() 227 228 if latest_status in FAILED_STATUSES: 229 raise AirbyteConnectionSyncError( 230 workspace=self.workspace, 231 connection_id=self.connection.connection_id, 232 job_id=self.job_id, 233 job_status=self.get_job_status(), 234 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
236 def wait_for_completion( 237 self, 238 *, 239 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 240 raise_timeout: bool = True, 241 raise_failure: bool = False, 242 ) -> JobStatusEnum: 243 """Wait for a job to finish running.""" 244 start_time = time.time() 245 while True: 246 latest_status = self.get_job_status() 247 if latest_status in FINAL_STATUSES: 248 if raise_failure: 249 # No-op if the job succeeded or is still running: 250 self.raise_failure_status() 251 252 return latest_status 253 254 if time.time() - start_time > wait_timeout: 255 if raise_timeout: 256 raise AirbyteConnectionSyncTimeoutError( 257 workspace=self.workspace, 258 connection_id=self.connection.connection_id, 259 job_id=self.job_id, 260 job_status=latest_status, 261 timeout=wait_timeout, 262 ) 263 264 return latest_status # This will be a non-final status 265 266 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
268 def get_sql_cache(self) -> CacheBase: 269 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 270 if self._cache: 271 return self._cache 272 273 destination_configuration: dict[str, Any] = self._get_destination_configuration() 274 self._cache = create_cache_from_destination_config( 275 destination_configuration=destination_configuration 276 ) 277 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name()
Return the SQL database name.
314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name
Return the SQL schema name.
319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names
Return the set of stream names.
324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.