airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Usage Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult
object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
ℹ️ Experimental Features
You can use the airbyte.cloud.experimental
module to access experimental features.
These additional features are subject to change and may not be available in all environments.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Usage Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50 51ℹ️ **Experimental Features** 52 53You can use the `airbyte.cloud.experimental` module to access experimental features. 54These additional features are subject to change and may not be available in all environments. 55""" # noqa: RUF002 # Allow emoji 56 57from __future__ import annotations 58 59from airbyte.cloud import connections, constants, sync_results, workspaces 60from airbyte.cloud.connections import CloudConnection 61from airbyte.cloud.constants import JobStatusEnum 62from airbyte.cloud.sync_results import SyncResult 63from airbyte.cloud.workspaces import CloudWorkspace 64 65 66__all__ = [ 67 # Submodules 68 "workspaces", 69 "connections", 70 "constants", 71 "sync_results", 72 # Classes 73 "CloudWorkspace", 74 "CloudConnection", 75 "SyncResult", 76 # Enums 77 "JobStatusEnum", 78]
36@dataclass 37class CloudWorkspace: 38 """A remote workspace on the Airbyte Cloud. 39 40 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 41 instances, both OSS and Enterprise. 42 """ 43 44 workspace_id: str 45 api_key: str 46 api_root: str = CLOUD_API_ROOT 47 48 @property 49 def workspace_url(self) -> str | None: 50 return f"{self.api_root}/workspaces/{self.workspace_id}" 51 52 # Test connection and creds 53 54 def connect(self) -> None: 55 """Check that the workspace is reachable and raise an exception otherwise. 56 57 Note: It is not necessary to call this method before calling other operations. It 58 serves primarily as a simple check to ensure that the workspace is reachable 59 and credentials are correct. 60 """ 61 _ = get_workspace( 62 api_root=self.api_root, 63 api_key=self.api_key, 64 workspace_id=self.workspace_id, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}") 67 68 # Deploy and delete sources 69 70 # TODO: Make this a public API 71 def _deploy_source( 72 self, 73 source: Source, 74 ) -> str: 75 """Deploy a source to the workspace. 76 77 Returns the newly deployed source ID. 78 """ 79 source_configuration = source.get_config().copy() 80 source_configuration["sourceType"] = source.name.replace("source-", "") 81 82 deployed_source = create_source( 83 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 84 api_root=self.api_root, 85 api_key=self.api_key, 86 workspace_id=self.workspace_id, 87 config=source_configuration, 88 ) 89 90 # Set the deployment Ids on the source object 91 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 92 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 93 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 94 95 return deployed_source.source_id 96 97 def _permanently_delete_source( 98 self, 99 source: str | Source, 100 ) -> None: 101 """Delete a source from the workspace. 102 103 You can pass either the source ID `str` or a deployed `Source` object. 104 """ 105 if not isinstance(source, (str, Source)): 106 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 107 108 if isinstance(source, Source): 109 if not source._deployed_source_id: # noqa: SLF001 110 raise ValueError("Source has not been deployed.") # noqa: TRY003 111 112 source_id = source._deployed_source_id # noqa: SLF001 113 114 elif isinstance(source, str): 115 source_id = source 116 117 delete_source( 118 source_id=source_id, 119 api_root=self.api_root, 120 api_key=self.api_key, 121 ) 122 123 # Deploy and delete destinations 124 125 # TODO: Make this a public API 126 def _deploy_cache_as_destination( 127 self, 128 cache: CacheBase, 129 ) -> str: 130 """Deploy a cache to the workspace as a new destination. 131 132 Returns the newly deployed destination ID. 133 """ 134 cache_type_name = cache.__class__.__name__.replace("Cache", "") 135 136 deployed_destination: DestinationResponse = create_destination( 137 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 138 api_root=self.api_root, 139 api_key=self.api_key, 140 workspace_id=self.workspace_id, 141 config=get_destination_config_from_cache(cache), 142 ) 143 144 # Set the deployment Ids on the source object 145 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 146 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 147 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 148 149 return deployed_destination.destination_id 150 151 def _permanently_delete_destination( 152 self, 153 *, 154 destination: str | None = None, 155 cache: CacheBase | None = None, 156 ) -> None: 157 """Delete a deployed destination from the workspace. 158 159 You can pass either the `Cache` class or the deployed destination ID as a `str`. 160 """ 161 if destination is None and cache is None: 162 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 163 if destination is not None and cache is not None: 164 raise ValueError( # noqa: TRY003 165 "You must provide either a destination ID or a cache object, not both." 166 ) 167 168 if cache: 169 if not cache._deployed_destination_id: # noqa: SLF001 170 raise ValueError("Cache has not been deployed.") # noqa: TRY003 171 172 destination = cache._deployed_destination_id # noqa: SLF001 173 174 if destination is None: 175 raise ValueError("No destination ID provided.") # noqa: TRY003 176 177 delete_destination( 178 destination_id=destination, 179 api_root=self.api_root, 180 api_key=self.api_key, 181 ) 182 183 # Deploy and delete connections 184 185 # TODO: Make this a public API 186 def _deploy_connection( 187 self, 188 source: Source | str, 189 cache: CacheBase | None = None, 190 destination: str | None = None, 191 table_prefix: str | None = None, 192 selected_streams: list[str] | None = None, 193 ) -> CloudConnection: 194 """Deploy a source and cache to the workspace as a new connection. 195 196 Returns the newly deployed connection ID as a `str`. 197 198 Args: 199 source (Source | str): The source to deploy. You can pass either an already deployed 200 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 201 it will be deployed automatically. 202 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 203 `cache` or `destination`, but not both. 204 destination (str, optional): The destination ID to use. You can provide 205 `cache` or `destination`, but not both. 206 """ 207 # Resolve source ID 208 source_id: str 209 if isinstance(source, Source): 210 selected_streams = selected_streams or source.get_selected_streams() 211 if source._deployed_source_id: # noqa: SLF001 212 source_id = source._deployed_source_id # noqa: SLF001 213 else: 214 source_id = self._deploy_source(source) 215 else: 216 source_id = source 217 if not selected_streams: 218 raise exc.PyAirbyteInputError( 219 guidance="You must provide `selected_streams` when deploying a source ID." 220 ) 221 222 # Resolve destination ID 223 destination_id: str 224 if destination: 225 destination_id = destination 226 elif cache: 227 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 228 if not cache._deployed_destination_id: # noqa: SLF001 229 destination_id = self._deploy_cache_as_destination(cache) 230 else: 231 destination_id = cache._deployed_destination_id # noqa: SLF001 232 else: 233 raise exc.PyAirbyteInputError( 234 guidance="You must provide either a destination ID or a cache object." 235 ) 236 237 assert source_id is not None 238 assert destination_id is not None 239 240 deployed_connection = create_connection( 241 name="Connection (Deployed by PyAirbyte)", 242 source_id=source_id, 243 destination_id=destination_id, 244 api_root=self.api_root, 245 api_key=self.api_key, 246 workspace_id=self.workspace_id, 247 selected_stream_names=selected_streams, 248 prefix=table_prefix or "", 249 ) 250 251 if isinstance(source, Source): 252 source._deployed_api_root = self.api_root # noqa: SLF001 253 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 254 source._deployed_source_id = source_id # noqa: SLF001 255 if cache: 256 cache._deployed_api_root = self.api_root # noqa: SLF001 257 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 258 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 259 260 return CloudConnection( 261 workspace=self, 262 connection_id=deployed_connection.connection_id, 263 source=deployed_connection.source_id, 264 destination=deployed_connection.destination_id, 265 ) 266 267 def get_connection( 268 self, 269 connection_id: str, 270 ) -> CloudConnection: 271 """Get a connection by ID. 272 273 This method does not fetch data from the API. It returns a `CloudConnection` object, 274 which will be loaded lazily as needed. 275 """ 276 return CloudConnection( 277 workspace=self, 278 connection_id=connection_id, 279 ) 280 281 def _permanently_delete_connection( 282 self, 283 connection: str | CloudConnection, 284 *, 285 delete_source: bool = False, 286 delete_destination: bool = False, 287 ) -> None: 288 """Delete a deployed connection from the workspace.""" 289 if connection is None: 290 raise ValueError("No connection ID provided.") # noqa: TRY003 291 292 if isinstance(connection, str): 293 connection = CloudConnection( 294 workspace=self, 295 connection_id=connection, 296 ) 297 298 delete_connection( 299 connection_id=connection.connection_id, 300 api_root=self.api_root, 301 api_key=self.api_key, 302 workspace_id=self.workspace_id, 303 ) 304 if delete_source: 305 self._permanently_delete_source(source=connection.source_id) 306 307 if delete_destination: 308 self._permanently_delete_destination(destination=connection.destination_id) 309 310 # Run syncs 311 312 def run_sync( 313 self, 314 connection_id: str, 315 *, 316 wait: bool = True, 317 wait_timeout: int = 300, 318 ) -> SyncResult: 319 """Run a sync on a deployed connection.""" 320 connection = CloudConnection( 321 workspace=self, 322 connection_id=connection_id, 323 ) 324 return connection.run_sync(wait=wait, wait_timeout=wait_timeout) 325 326 # Get sync results and previous sync logs 327 328 def get_sync_result( 329 self, 330 connection_id: str, 331 job_id: str | None = None, 332 ) -> SyncResult | None: 333 """Get the sync result for a connection job. 334 335 If `job_id` is not provided, the most recent sync job will be used. 336 337 Returns `None` if job_id is omitted and no previous jobs are found. 338 """ 339 connection = CloudConnection( 340 workspace=self, 341 connection_id=connection_id, 342 ) 343 if job_id is None: 344 results = self.get_previous_sync_logs( 345 connection_id=connection_id, 346 limit=1, 347 ) 348 if results: 349 return results[0] 350 351 return None 352 connection = CloudConnection( 353 workspace=self, 354 connection_id=connection_id, 355 ) 356 return SyncResult( 357 workspace=self, 358 connection=connection, 359 job_id=job_id, 360 ) 361 362 def get_previous_sync_logs( 363 self, 364 connection_id: str, 365 *, 366 limit: int = 10, 367 ) -> list[SyncResult]: 368 """Get the previous sync logs for a connection.""" 369 connection = CloudConnection( 370 workspace=self, 371 connection_id=connection_id, 372 ) 373 return connection.get_previous_sync_logs( 374 limit=limit, 375 )
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
54 def connect(self) -> None: 55 """Check that the workspace is reachable and raise an exception otherwise. 56 57 Note: It is not necessary to call this method before calling other operations. It 58 serves primarily as a simple check to ensure that the workspace is reachable 59 and credentials are correct. 60 """ 61 _ = get_workspace( 62 api_root=self.api_root, 63 api_key=self.api_key, 64 workspace_id=self.workspace_id, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
267 def get_connection( 268 self, 269 connection_id: str, 270 ) -> CloudConnection: 271 """Get a connection by ID. 272 273 This method does not fetch data from the API. It returns a `CloudConnection` object, 274 which will be loaded lazily as needed. 275 """ 276 return CloudConnection( 277 workspace=self, 278 connection_id=connection_id, 279 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
312 def run_sync( 313 self, 314 connection_id: str, 315 *, 316 wait: bool = True, 317 wait_timeout: int = 300, 318 ) -> SyncResult: 319 """Run a sync on a deployed connection.""" 320 connection = CloudConnection( 321 workspace=self, 322 connection_id=connection_id, 323 ) 324 return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
Run a sync on a deployed connection.
328 def get_sync_result( 329 self, 330 connection_id: str, 331 job_id: str | None = None, 332 ) -> SyncResult | None: 333 """Get the sync result for a connection job. 334 335 If `job_id` is not provided, the most recent sync job will be used. 336 337 Returns `None` if job_id is omitted and no previous jobs are found. 338 """ 339 connection = CloudConnection( 340 workspace=self, 341 connection_id=connection_id, 342 ) 343 if job_id is None: 344 results = self.get_previous_sync_logs( 345 connection_id=connection_id, 346 limit=1, 347 ) 348 if results: 349 return results[0] 350 351 return None 352 connection = CloudConnection( 353 workspace=self, 354 connection_id=connection_id, 355 ) 356 return SyncResult( 357 workspace=self, 358 connection=connection, 359 job_id=job_id, 360 )
Get the sync result for a connection job.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
362 def get_previous_sync_logs( 363 self, 364 connection_id: str, 365 *, 366 limit: int = 10, 367 ) -> list[SyncResult]: 368 """Get the previous sync logs for a connection.""" 369 connection = CloudConnection( 370 workspace=self, 371 connection_id=connection_id, 372 ) 373 return connection.get_previous_sync_logs( 374 limit=limit, 375 )
Get the previous sync logs for a connection.
19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 50 def _fetch_connection_info(self) -> ConnectionResponse: 51 """Populate the connection with data from the API.""" 52 return api_util.get_connection( 53 workspace_id=self.workspace.workspace_id, 54 connection_id=self.connection_id, 55 api_root=self.workspace.api_root, 56 api_key=self.workspace.api_key, 57 ) 58 59 # Properties 60 61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id) 71 72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id) 82 83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams] 90 91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix 98 99 @property 100 def connection_url(self) -> str | None: 101 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 102 103 @property 104 def job_history_url(self) -> str | None: 105 return f"{self.connection_url}/job-history" 106 107 # Run Sync 108 109 def run_sync( 110 self, 111 *, 112 wait: bool = True, 113 wait_timeout: int = 300, 114 ) -> SyncResult: 115 """Run a sync.""" 116 connection_response = api_util.run_connection( 117 connection_id=self.connection_id, 118 api_root=self.workspace.api_root, 119 api_key=self.workspace.api_key, 120 workspace_id=self.workspace.workspace_id, 121 ) 122 sync_result = SyncResult( 123 workspace=self.workspace, 124 connection=self, 125 job_id=connection_response.job_id, 126 ) 127 128 if wait: 129 sync_result.wait_for_completion( 130 wait_timeout=wait_timeout, 131 raise_failure=True, 132 raise_timeout=True, 133 ) 134 135 return sync_result 136 137 # Logs 138 139 def get_previous_sync_logs( 140 self, 141 *, 142 limit: int = 10, 143 ) -> list[SyncResult]: 144 """Get the previous sync logs for a connection.""" 145 sync_logs: list[JobResponse] = api_util.get_job_logs( 146 connection_id=self.connection_id, 147 api_root=self.workspace.api_root, 148 api_key=self.workspace.api_key, 149 workspace_id=self.workspace.workspace_id, 150 limit=limit, 151 ) 152 return [ 153 SyncResult( 154 workspace=self.workspace, 155 connection=self, 156 job_id=sync_log.job_id, 157 _latest_job_info=sync_log, 158 ) 159 for sync_log in sync_logs 160 ] 161 162 def get_sync_result( 163 self, 164 job_id: str | None = None, 165 ) -> SyncResult | None: 166 """Get the sync result for the connection. 167 168 If `job_id` is not provided, the most recent sync job will be used. 169 170 Returns `None` if job_id is omitted and no previous jobs are found. 171 """ 172 if job_id is None: 173 # Get the most recent sync job 174 results = self.get_previous_sync_logs( 175 limit=1, 176 ) 177 if results: 178 return results[0] 179 180 return None 181 182 # Get the sync job by ID (lazy loaded) 183 return SyncResult( 184 workspace=self.workspace, 185 connection=self, 186 job_id=job_id, 187 ) 188 189 # Deletions 190 191 def _permanently_delete( 192 self, 193 *, 194 delete_source: bool = False, 195 delete_destination: bool = False, 196 ) -> None: 197 """Delete the connection. 198 199 Args: 200 delete_source: Whether to also delete the source. 201 delete_destination: Whether to also delete the destination. 202 """ 203 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 204 connection=self 205 ) 206 207 if delete_source: 208 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 209 source=self.source_id 210 ) 211 212 if delete_destination: 213 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 214 destination=self.destination_id, 215 )
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id)
The ID of the source.
72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id)
The ID of the destination.
83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams]
The stream names.
91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix
The table prefix.
109 def run_sync( 110 self, 111 *, 112 wait: bool = True, 113 wait_timeout: int = 300, 114 ) -> SyncResult: 115 """Run a sync.""" 116 connection_response = api_util.run_connection( 117 connection_id=self.connection_id, 118 api_root=self.workspace.api_root, 119 api_key=self.workspace.api_key, 120 workspace_id=self.workspace.workspace_id, 121 ) 122 sync_result = SyncResult( 123 workspace=self.workspace, 124 connection=self, 125 job_id=connection_response.job_id, 126 ) 127 128 if wait: 129 sync_result.wait_for_completion( 130 wait_timeout=wait_timeout, 131 raise_failure=True, 132 raise_timeout=True, 133 ) 134 135 return sync_result
Run a sync.
139 def get_previous_sync_logs( 140 self, 141 *, 142 limit: int = 10, 143 ) -> list[SyncResult]: 144 """Get the previous sync logs for a connection.""" 145 sync_logs: list[JobResponse] = api_util.get_job_logs( 146 connection_id=self.connection_id, 147 api_root=self.workspace.api_root, 148 api_key=self.workspace.api_key, 149 workspace_id=self.workspace.workspace_id, 150 limit=limit, 151 ) 152 return [ 153 SyncResult( 154 workspace=self.workspace, 155 connection=self, 156 job_id=sync_log.job_id, 157 _latest_job_info=sync_log, 158 ) 159 for sync_log in sync_logs 160 ]
Get the previous sync logs for a connection.
162 def get_sync_result( 163 self, 164 job_id: str | None = None, 165 ) -> SyncResult | None: 166 """Get the sync result for the connection. 167 168 If `job_id` is not provided, the most recent sync job will be used. 169 170 Returns `None` if job_id is omitted and no previous jobs are found. 171 """ 172 if job_id is None: 173 # Get the most recent sync job 174 results = self.get_previous_sync_logs( 175 limit=1, 176 ) 177 if results: 178 return results[0] 179 180 return None 181 182 # Get the sync job by ID (lazy loaded) 183 return SyncResult( 184 workspace=self.workspace, 185 connection=self, 186 job_id=job_id, 187 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: str 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}" 150 151 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 152 """Return connection info for the sync job.""" 153 if self._connection_response and not force_refresh: 154 return self._connection_response 155 156 self._connection_response = api_util.get_connection( 157 workspace_id=self.workspace.workspace_id, 158 api_root=self.workspace.api_root, 159 api_key=self.workspace.api_key, 160 connection_id=self.connection.connection_id, 161 ) 162 return self._connection_response 163 164 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 165 """Return the destination configuration for the sync job.""" 166 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 167 destination_response = api_util.get_destination( 168 destination_id=connection_info.destination_id, 169 api_root=self.workspace.api_root, 170 api_key=self.workspace.api_key, 171 ) 172 return destination_response.configuration 173 174 def is_job_complete(self) -> bool: 175 """Check if the sync job is complete.""" 176 return self.get_job_status() in FINAL_STATUSES 177 178 def get_job_status(self) -> JobStatusEnum: 179 """Check if the sync job is still running.""" 180 return self._fetch_latest_job_info().status 181 182 def _fetch_latest_job_info(self) -> JobResponse: 183 """Return the job info for the sync job.""" 184 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 185 return self._latest_job_info 186 187 self._latest_job_info = api_util.get_job_info( 188 job_id=self.job_id, 189 api_root=self.workspace.api_root, 190 api_key=self.workspace.api_key, 191 ) 192 return self._latest_job_info 193 194 @property 195 def bytes_synced(self) -> int: 196 """Return the number of records processed.""" 197 return self._fetch_latest_job_info().bytes_synced 198 199 @property 200 def records_synced(self) -> int: 201 """Return the number of records processed.""" 202 return self._fetch_latest_job_info().rows_synced 203 204 @property 205 def start_time(self) -> datetime: 206 """Return the start time of the sync job in UTC.""" 207 # Parse from ISO 8601 format: 208 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 209 210 def raise_failure_status( 211 self, 212 *, 213 refresh_status: bool = False, 214 ) -> None: 215 """Raise an exception if the sync job failed. 216 217 By default, this method will use the latest status available. If you want to refresh the 218 status before checking for failure, set `refresh_status=True`. If the job has failed, this 219 method will raise a `AirbyteConnectionSyncError`. 220 221 Otherwise, do nothing. 222 """ 223 if not refresh_status and self._latest_job_info: 224 latest_status = self._latest_job_info.status 225 else: 226 latest_status = self.get_job_status() 227 228 if latest_status in FAILED_STATUSES: 229 raise AirbyteConnectionSyncError( 230 workspace=self.workspace, 231 connection_id=self.connection.connection_id, 232 job_id=self.job_id, 233 job_status=self.get_job_status(), 234 ) 235 236 def wait_for_completion( 237 self, 238 *, 239 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 240 raise_timeout: bool = True, 241 raise_failure: bool = False, 242 ) -> JobStatusEnum: 243 """Wait for a job to finish running.""" 244 start_time = time.time() 245 while True: 246 latest_status = self.get_job_status() 247 if latest_status in FINAL_STATUSES: 248 if raise_failure: 249 # No-op if the job succeeded or is still running: 250 self.raise_failure_status() 251 252 return latest_status 253 254 if time.time() - start_time > wait_timeout: 255 if raise_timeout: 256 raise AirbyteConnectionSyncTimeoutError( 257 workspace=self.workspace, 258 connection_id=self.connection.connection_id, 259 job_id=self.job_id, 260 job_status=latest_status, 261 timeout=wait_timeout, 262 ) 263 264 return latest_status # This will be a non-final status 265 266 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 267 268 def get_sql_cache(self) -> CacheBase: 269 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 270 if self._cache: 271 return self._cache 272 273 destination_configuration: dict[str, Any] = self._get_destination_configuration() 274 self._cache = create_cache_from_destination_config( 275 destination_configuration=destination_configuration 276 ) 277 return self._cache 278 279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 self.get_sql_cache().get_sql_engine() 282 283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 286 287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 self.get_sql_cache().processor.get_sql_table(stream_name) 293 294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 ) 308 309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name() 313 314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name 318 319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names 323 324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self) 335 336 class _SyncResultStreams(Mapping[str, CachedDataset]): 337 """A mapping of stream names to cached datasets.""" 338 339 def __init__( 340 self, 341 parent: SyncResult, 342 /, 343 ) -> None: 344 self.parent: SyncResult = parent 345 346 def __getitem__(self, key: str) -> CachedDataset: 347 return self.parent.get_dataset(stream_name=key) 348 349 def __iter__(self) -> Iterator[str]: 350 """TODO""" 351 return iter(self.parent.stream_names) 352 353 def __len__(self) -> int: 354 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the airbyte.cloud.CloudWorkspace
and airbyte.cloud.CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}"
Return the URL of the sync job.
174 def is_job_complete(self) -> bool: 175 """Check if the sync job is complete.""" 176 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
178 def get_job_status(self) -> JobStatusEnum: 179 """Check if the sync job is still running.""" 180 return self._fetch_latest_job_info().status
Check if the sync job is still running.
194 @property 195 def bytes_synced(self) -> int: 196 """Return the number of records processed.""" 197 return self._fetch_latest_job_info().bytes_synced
Return the number of records processed.
199 @property 200 def records_synced(self) -> int: 201 """Return the number of records processed.""" 202 return self._fetch_latest_job_info().rows_synced
Return the number of records processed.
204 @property 205 def start_time(self) -> datetime: 206 """Return the start time of the sync job in UTC.""" 207 # Parse from ISO 8601 format: 208 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
210 def raise_failure_status( 211 self, 212 *, 213 refresh_status: bool = False, 214 ) -> None: 215 """Raise an exception if the sync job failed. 216 217 By default, this method will use the latest status available. If you want to refresh the 218 status before checking for failure, set `refresh_status=True`. If the job has failed, this 219 method will raise a `AirbyteConnectionSyncError`. 220 221 Otherwise, do nothing. 222 """ 223 if not refresh_status and self._latest_job_info: 224 latest_status = self._latest_job_info.status 225 else: 226 latest_status = self.get_job_status() 227 228 if latest_status in FAILED_STATUSES: 229 raise AirbyteConnectionSyncError( 230 workspace=self.workspace, 231 connection_id=self.connection.connection_id, 232 job_id=self.job_id, 233 job_status=self.get_job_status(), 234 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
236 def wait_for_completion( 237 self, 238 *, 239 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 240 raise_timeout: bool = True, 241 raise_failure: bool = False, 242 ) -> JobStatusEnum: 243 """Wait for a job to finish running.""" 244 start_time = time.time() 245 while True: 246 latest_status = self.get_job_status() 247 if latest_status in FINAL_STATUSES: 248 if raise_failure: 249 # No-op if the job succeeded or is still running: 250 self.raise_failure_status() 251 252 return latest_status 253 254 if time.time() - start_time > wait_timeout: 255 if raise_timeout: 256 raise AirbyteConnectionSyncTimeoutError( 257 workspace=self.workspace, 258 connection_id=self.connection.connection_id, 259 job_id=self.job_id, 260 job_status=latest_status, 261 timeout=wait_timeout, 262 ) 263 264 return latest_status # This will be a non-final status 265 266 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
268 def get_sql_cache(self) -> CacheBase: 269 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 270 if self._cache: 271 return self._cache 272 273 destination_configuration: dict[str, Any] = self._get_destination_configuration() 274 self._cache = create_cache_from_destination_config( 275 destination_configuration=destination_configuration 276 ) 277 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name()
Return the SQL database name.
314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name
Return the SQL schema name.
319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names
Return the set of stream names.
324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
7class JobStatusEnum(str, Enum): 8 PENDING = 'pending' 9 RUNNING = 'running' 10 INCOMPLETE = 'incomplete' 11 FAILED = 'failed' 12 SUCCEEDED = 'succeeded' 13 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans