airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Usage Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult
object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
ℹ️ Experimental Features
You can use the airbyte.cloud.experimental
module to access experimental features.
These additional features are subject to change and may not be available in all environments.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Usage Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50 51ℹ️ **Experimental Features** 52 53You can use the `airbyte.cloud.experimental` module to access experimental features. 54These additional features are subject to change and may not be available in all environments. 55""" # noqa: RUF002 # Allow emoji 56 57from __future__ import annotations 58 59from airbyte.cloud import connections, constants, sync_results, workspaces 60from airbyte.cloud.connections import CloudConnection 61from airbyte.cloud.constants import JobStatusEnum 62from airbyte.cloud.sync_results import SyncResult 63from airbyte.cloud.workspaces import CloudWorkspace 64 65 66__all__ = [ 67 # Submodules 68 "workspaces", 69 "connections", 70 "constants", 71 "sync_results", 72 # Classes 73 "CloudWorkspace", 74 "CloudConnection", 75 "SyncResult", 76 # Enums 77 "JobStatusEnum", 78]
36@dataclass 37class CloudWorkspace: 38 """A remote workspace on the Airbyte Cloud. 39 40 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 41 instances, both OSS and Enterprise. 42 """ 43 44 workspace_id: str 45 api_key: str 46 api_root: str = CLOUD_API_ROOT 47 48 @property 49 def workspace_url(self) -> str | None: 50 """The URL of the workspace.""" 51 return f"{self.api_root}/workspaces/{self.workspace_id}" 52 53 # Test connection and creds 54 55 def connect(self) -> None: 56 """Check that the workspace is reachable and raise an exception otherwise. 57 58 Note: It is not necessary to call this method before calling other operations. It 59 serves primarily as a simple check to ensure that the workspace is reachable 60 and credentials are correct. 61 """ 62 _ = get_workspace( 63 api_root=self.api_root, 64 api_key=self.api_key, 65 workspace_id=self.workspace_id, 66 ) 67 print(f"Successfully connected to workspace: {self.workspace_url}") 68 69 # Deploy and delete sources 70 71 # TODO: Make this a public API 72 # https://github.com/airbytehq/pyairbyte/issues/228 73 def _deploy_source( 74 self, 75 source: Source, 76 ) -> str: 77 """Deploy a source to the workspace. 78 79 Returns the newly deployed source ID. 80 """ 81 source_configuration = source.get_config().copy() 82 source_configuration["sourceType"] = source.name.replace("source-", "") 83 84 deployed_source = create_source( 85 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 86 api_root=self.api_root, 87 api_key=self.api_key, 88 workspace_id=self.workspace_id, 89 config=source_configuration, 90 ) 91 92 # Set the deployment Ids on the source object 93 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 94 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 95 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 96 97 return deployed_source.source_id 98 99 def _permanently_delete_source( 100 self, 101 source: str | Source, 102 ) -> None: 103 """Delete a source from the workspace. 104 105 You can pass either the source ID `str` or a deployed `Source` object. 106 """ 107 if not isinstance(source, str | Source): 108 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 109 110 if isinstance(source, Source): 111 if not source._deployed_source_id: # noqa: SLF001 112 raise ValueError("Source has not been deployed.") # noqa: TRY003 113 114 source_id = source._deployed_source_id # noqa: SLF001 115 116 elif isinstance(source, str): 117 source_id = source 118 119 delete_source( 120 source_id=source_id, 121 api_root=self.api_root, 122 api_key=self.api_key, 123 ) 124 125 # Deploy and delete destinations 126 127 # TODO: Make this a public API 128 # https://github.com/airbytehq/pyairbyte/issues/228 129 def _deploy_cache_as_destination( 130 self, 131 cache: CacheBase, 132 ) -> str: 133 """Deploy a cache to the workspace as a new destination. 134 135 Returns the newly deployed destination ID. 136 """ 137 cache_type_name = cache.__class__.__name__.replace("Cache", "") 138 139 deployed_destination: DestinationResponse = create_destination( 140 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 141 api_root=self.api_root, 142 api_key=self.api_key, 143 workspace_id=self.workspace_id, 144 config=get_destination_config_from_cache(cache), 145 ) 146 147 # Set the deployment Ids on the source object 148 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 149 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 150 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 151 152 return deployed_destination.destination_id 153 154 def _permanently_delete_destination( 155 self, 156 *, 157 destination: str | None = None, 158 cache: CacheBase | None = None, 159 ) -> None: 160 """Delete a deployed destination from the workspace. 161 162 You can pass either the `Cache` class or the deployed destination ID as a `str`. 163 """ 164 if destination is None and cache is None: 165 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 166 if destination is not None and cache is not None: 167 raise ValueError( # noqa: TRY003 168 "You must provide either a destination ID or a cache object, not both." 169 ) 170 171 if cache: 172 if not cache._deployed_destination_id: # noqa: SLF001 173 raise ValueError("Cache has not been deployed.") # noqa: TRY003 174 175 destination = cache._deployed_destination_id # noqa: SLF001 176 177 if destination is None: 178 raise ValueError("No destination ID provided.") # noqa: TRY003 179 180 delete_destination( 181 destination_id=destination, 182 api_root=self.api_root, 183 api_key=self.api_key, 184 ) 185 186 # Deploy and delete connections 187 188 # TODO: Make this a public API 189 # https://github.com/airbytehq/pyairbyte/issues/228 190 def _deploy_connection( 191 self, 192 source: Source | str, 193 cache: CacheBase | None = None, 194 destination: str | None = None, 195 table_prefix: str | None = None, 196 selected_streams: list[str] | None = None, 197 ) -> CloudConnection: 198 """Deploy a source and cache to the workspace as a new connection. 199 200 Returns the newly deployed connection ID as a `str`. 201 202 Args: 203 source (Source | str): The source to deploy. You can pass either an already deployed 204 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 205 it will be deployed automatically. 206 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 207 `cache` or `destination`, but not both. 208 destination (str, optional): The destination ID to use. You can provide 209 `cache` or `destination`, but not both. 210 table_prefix (str, optional): The table prefix to use for the cache. If not provided, 211 the cache's table prefix will be used. 212 selected_streams (list[str], optional): The selected stream names to use for the 213 connection. If not provided, the source's selected streams will be used. 214 """ 215 # Resolve source ID 216 source_id: str 217 if isinstance(source, Source): 218 selected_streams = selected_streams or source.get_selected_streams() 219 source_id = ( 220 source._deployed_source_id # noqa: SLF001 # Access to non-public API 221 or self._deploy_source(source) 222 ) 223 else: 224 source_id = source 225 if not selected_streams: 226 raise exc.PyAirbyteInputError( 227 guidance="You must provide `selected_streams` when deploying a source ID." 228 ) 229 230 # Resolve destination ID 231 destination_id: str 232 if destination: 233 destination_id = destination 234 elif cache: 235 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 236 if not cache._deployed_destination_id: # noqa: SLF001 237 destination_id = self._deploy_cache_as_destination(cache) 238 else: 239 destination_id = cache._deployed_destination_id # noqa: SLF001 240 else: 241 raise exc.PyAirbyteInputError( 242 guidance="You must provide either a destination ID or a cache object." 243 ) 244 245 assert source_id is not None 246 assert destination_id is not None 247 248 deployed_connection = create_connection( 249 name="Connection (Deployed by PyAirbyte)", 250 source_id=source_id, 251 destination_id=destination_id, 252 api_root=self.api_root, 253 api_key=self.api_key, 254 workspace_id=self.workspace_id, 255 selected_stream_names=selected_streams, 256 prefix=table_prefix or "", 257 ) 258 259 if isinstance(source, Source): 260 source._deployed_api_root = self.api_root # noqa: SLF001 261 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 262 source._deployed_source_id = source_id # noqa: SLF001 263 if cache: 264 cache._deployed_api_root = self.api_root # noqa: SLF001 265 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 266 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 267 268 return CloudConnection( 269 workspace=self, 270 connection_id=deployed_connection.connection_id, 271 source=deployed_connection.source_id, 272 destination=deployed_connection.destination_id, 273 ) 274 275 def get_connection( 276 self, 277 connection_id: str, 278 ) -> CloudConnection: 279 """Get a connection by ID. 280 281 This method does not fetch data from the API. It returns a `CloudConnection` object, 282 which will be loaded lazily as needed. 283 """ 284 return CloudConnection( 285 workspace=self, 286 connection_id=connection_id, 287 ) 288 289 def _permanently_delete_connection( 290 self, 291 connection: str | CloudConnection, 292 *, 293 delete_source: bool = False, 294 delete_destination: bool = False, 295 ) -> None: 296 """Delete a deployed connection from the workspace.""" 297 if connection is None: 298 raise ValueError("No connection ID provided.") # noqa: TRY003 299 300 if isinstance(connection, str): 301 connection = CloudConnection( 302 workspace=self, 303 connection_id=connection, 304 ) 305 306 delete_connection( 307 connection_id=connection.connection_id, 308 api_root=self.api_root, 309 api_key=self.api_key, 310 workspace_id=self.workspace_id, 311 ) 312 if delete_source: 313 self._permanently_delete_source(source=connection.source_id) 314 315 if delete_destination: 316 self._permanently_delete_destination(destination=connection.destination_id) 317 318 # Run syncs 319 320 def run_sync( 321 self, 322 connection_id: str, 323 *, 324 wait: bool = True, 325 wait_timeout: int = 300, 326 ) -> SyncResult: 327 """Run a sync on a deployed connection.""" 328 connection = CloudConnection( 329 workspace=self, 330 connection_id=connection_id, 331 ) 332 return connection.run_sync(wait=wait, wait_timeout=wait_timeout) 333 334 # Get sync results and previous sync logs 335 336 def get_sync_result( 337 self, 338 connection_id: str, 339 job_id: str | None = None, 340 ) -> SyncResult | None: 341 """Get the sync result for a connection job. 342 343 If `job_id` is not provided, the most recent sync job will be used. 344 345 Returns `None` if job_id is omitted and no previous jobs are found. 346 """ 347 connection = CloudConnection( 348 workspace=self, 349 connection_id=connection_id, 350 ) 351 if job_id is None: 352 results = self.get_previous_sync_logs( 353 connection_id=connection_id, 354 limit=1, 355 ) 356 if results: 357 return results[0] 358 359 return None 360 connection = CloudConnection( 361 workspace=self, 362 connection_id=connection_id, 363 ) 364 return SyncResult( 365 workspace=self, 366 connection=connection, 367 job_id=job_id, 368 ) 369 370 def get_previous_sync_logs( 371 self, 372 connection_id: str, 373 *, 374 limit: int = 10, 375 ) -> list[SyncResult]: 376 """Get the previous sync logs for a connection.""" 377 connection = CloudConnection( 378 workspace=self, 379 connection_id=connection_id, 380 ) 381 return connection.get_previous_sync_logs( 382 limit=limit, 383 )
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
48 @property 49 def workspace_url(self) -> str | None: 50 """The URL of the workspace.""" 51 return f"{self.api_root}/workspaces/{self.workspace_id}"
The URL of the workspace.
55 def connect(self) -> None: 56 """Check that the workspace is reachable and raise an exception otherwise. 57 58 Note: It is not necessary to call this method before calling other operations. It 59 serves primarily as a simple check to ensure that the workspace is reachable 60 and credentials are correct. 61 """ 62 _ = get_workspace( 63 api_root=self.api_root, 64 api_key=self.api_key, 65 workspace_id=self.workspace_id, 66 ) 67 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
275 def get_connection( 276 self, 277 connection_id: str, 278 ) -> CloudConnection: 279 """Get a connection by ID. 280 281 This method does not fetch data from the API. It returns a `CloudConnection` object, 282 which will be loaded lazily as needed. 283 """ 284 return CloudConnection( 285 workspace=self, 286 connection_id=connection_id, 287 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
320 def run_sync( 321 self, 322 connection_id: str, 323 *, 324 wait: bool = True, 325 wait_timeout: int = 300, 326 ) -> SyncResult: 327 """Run a sync on a deployed connection.""" 328 connection = CloudConnection( 329 workspace=self, 330 connection_id=connection_id, 331 ) 332 return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
Run a sync on a deployed connection.
336 def get_sync_result( 337 self, 338 connection_id: str, 339 job_id: str | None = None, 340 ) -> SyncResult | None: 341 """Get the sync result for a connection job. 342 343 If `job_id` is not provided, the most recent sync job will be used. 344 345 Returns `None` if job_id is omitted and no previous jobs are found. 346 """ 347 connection = CloudConnection( 348 workspace=self, 349 connection_id=connection_id, 350 ) 351 if job_id is None: 352 results = self.get_previous_sync_logs( 353 connection_id=connection_id, 354 limit=1, 355 ) 356 if results: 357 return results[0] 358 359 return None 360 connection = CloudConnection( 361 workspace=self, 362 connection_id=connection_id, 363 ) 364 return SyncResult( 365 workspace=self, 366 connection=connection, 367 job_id=job_id, 368 )
Get the sync result for a connection job.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
370 def get_previous_sync_logs( 371 self, 372 connection_id: str, 373 *, 374 limit: int = 10, 375 ) -> list[SyncResult]: 376 """Get the previous sync logs for a connection.""" 377 connection = CloudConnection( 378 workspace=self, 379 connection_id=connection_id, 380 ) 381 return connection.get_previous_sync_logs( 382 limit=limit, 383 )
Get the previous sync logs for a connection.
19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 50 def _fetch_connection_info(self) -> ConnectionResponse: 51 """Populate the connection with data from the API.""" 52 return api_util.get_connection( 53 workspace_id=self.workspace.workspace_id, 54 connection_id=self.connection_id, 55 api_root=self.workspace.api_root, 56 api_key=self.workspace.api_key, 57 ) 58 59 # Properties 60 61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id) 71 72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id) 82 83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams] 90 91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix 98 99 @property 100 def connection_url(self) -> str | None: 101 """The URL to the connection.""" 102 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 103 104 @property 105 def job_history_url(self) -> str | None: 106 """The URL to the job history for the connection.""" 107 return f"{self.connection_url}/job-history" 108 109 # Run Sync 110 111 def run_sync( 112 self, 113 *, 114 wait: bool = True, 115 wait_timeout: int = 300, 116 ) -> SyncResult: 117 """Run a sync.""" 118 connection_response = api_util.run_connection( 119 connection_id=self.connection_id, 120 api_root=self.workspace.api_root, 121 api_key=self.workspace.api_key, 122 workspace_id=self.workspace.workspace_id, 123 ) 124 sync_result = SyncResult( 125 workspace=self.workspace, 126 connection=self, 127 job_id=connection_response.job_id, 128 ) 129 130 if wait: 131 sync_result.wait_for_completion( 132 wait_timeout=wait_timeout, 133 raise_failure=True, 134 raise_timeout=True, 135 ) 136 137 return sync_result 138 139 # Logs 140 141 def get_previous_sync_logs( 142 self, 143 *, 144 limit: int = 10, 145 ) -> list[SyncResult]: 146 """Get the previous sync logs for a connection.""" 147 sync_logs: list[JobResponse] = api_util.get_job_logs( 148 connection_id=self.connection_id, 149 api_root=self.workspace.api_root, 150 api_key=self.workspace.api_key, 151 workspace_id=self.workspace.workspace_id, 152 limit=limit, 153 ) 154 return [ 155 SyncResult( 156 workspace=self.workspace, 157 connection=self, 158 job_id=sync_log.job_id, 159 _latest_job_info=sync_log, 160 ) 161 for sync_log in sync_logs 162 ] 163 164 def get_sync_result( 165 self, 166 job_id: str | None = None, 167 ) -> SyncResult | None: 168 """Get the sync result for the connection. 169 170 If `job_id` is not provided, the most recent sync job will be used. 171 172 Returns `None` if job_id is omitted and no previous jobs are found. 173 """ 174 if job_id is None: 175 # Get the most recent sync job 176 results = self.get_previous_sync_logs( 177 limit=1, 178 ) 179 if results: 180 return results[0] 181 182 return None 183 184 # Get the sync job by ID (lazy loaded) 185 return SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=job_id, 189 ) 190 191 # Deletions 192 193 def _permanently_delete( 194 self, 195 *, 196 delete_source: bool = False, 197 delete_destination: bool = False, 198 ) -> None: 199 """Delete the connection. 200 201 Args: 202 delete_source: Whether to also delete the source. 203 delete_destination: Whether to also delete the destination. 204 """ 205 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 206 connection=self 207 ) 208 209 if delete_source: 210 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 211 source=self.source_id 212 ) 213 214 if delete_destination: 215 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 216 destination=self.destination_id, 217 )
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id)
The ID of the source.
72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id)
The ID of the destination.
83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams]
The stream names.
91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix
The table prefix.
99 @property 100 def connection_url(self) -> str | None: 101 """The URL to the connection.""" 102 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The URL to the connection.
104 @property 105 def job_history_url(self) -> str | None: 106 """The URL to the job history for the connection.""" 107 return f"{self.connection_url}/job-history"
The URL to the job history for the connection.
111 def run_sync( 112 self, 113 *, 114 wait: bool = True, 115 wait_timeout: int = 300, 116 ) -> SyncResult: 117 """Run a sync.""" 118 connection_response = api_util.run_connection( 119 connection_id=self.connection_id, 120 api_root=self.workspace.api_root, 121 api_key=self.workspace.api_key, 122 workspace_id=self.workspace.workspace_id, 123 ) 124 sync_result = SyncResult( 125 workspace=self.workspace, 126 connection=self, 127 job_id=connection_response.job_id, 128 ) 129 130 if wait: 131 sync_result.wait_for_completion( 132 wait_timeout=wait_timeout, 133 raise_failure=True, 134 raise_timeout=True, 135 ) 136 137 return sync_result
Run a sync.
141 def get_previous_sync_logs( 142 self, 143 *, 144 limit: int = 10, 145 ) -> list[SyncResult]: 146 """Get the previous sync logs for a connection.""" 147 sync_logs: list[JobResponse] = api_util.get_job_logs( 148 connection_id=self.connection_id, 149 api_root=self.workspace.api_root, 150 api_key=self.workspace.api_key, 151 workspace_id=self.workspace.workspace_id, 152 limit=limit, 153 ) 154 return [ 155 SyncResult( 156 workspace=self.workspace, 157 connection=self, 158 job_id=sync_log.job_id, 159 _latest_job_info=sync_log, 160 ) 161 for sync_log in sync_logs 162 ]
Get the previous sync logs for a connection.
164 def get_sync_result( 165 self, 166 job_id: str | None = None, 167 ) -> SyncResult | None: 168 """Get the sync result for the connection. 169 170 If `job_id` is not provided, the most recent sync job will be used. 171 172 Returns `None` if job_id is omitted and no previous jobs are found. 173 """ 174 if job_id is None: 175 # Get the most recent sync job 176 results = self.get_previous_sync_logs( 177 limit=1, 178 ) 179 if results: 180 return results[0] 181 182 return None 183 184 # Get the sync job by ID (lazy loaded) 185 return SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=job_id, 189 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: str 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}" 150 151 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 152 """Return connection info for the sync job.""" 153 if self._connection_response and not force_refresh: 154 return self._connection_response 155 156 self._connection_response = api_util.get_connection( 157 workspace_id=self.workspace.workspace_id, 158 api_root=self.workspace.api_root, 159 api_key=self.workspace.api_key, 160 connection_id=self.connection.connection_id, 161 ) 162 return self._connection_response 163 164 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 165 """Return the destination configuration for the sync job.""" 166 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 167 destination_response = api_util.get_destination( 168 destination_id=connection_info.destination_id, 169 api_root=self.workspace.api_root, 170 api_key=self.workspace.api_key, 171 ) 172 return destination_response.configuration 173 174 def is_job_complete(self) -> bool: 175 """Check if the sync job is complete.""" 176 return self.get_job_status() in FINAL_STATUSES 177 178 def get_job_status(self) -> JobStatusEnum: 179 """Check if the sync job is still running.""" 180 return self._fetch_latest_job_info().status 181 182 def _fetch_latest_job_info(self) -> JobResponse: 183 """Return the job info for the sync job.""" 184 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 185 return self._latest_job_info 186 187 self._latest_job_info = api_util.get_job_info( 188 job_id=self.job_id, 189 api_root=self.workspace.api_root, 190 api_key=self.workspace.api_key, 191 ) 192 return self._latest_job_info 193 194 @property 195 def bytes_synced(self) -> int: 196 """Return the number of records processed.""" 197 return self._fetch_latest_job_info().bytes_synced 198 199 @property 200 def records_synced(self) -> int: 201 """Return the number of records processed.""" 202 return self._fetch_latest_job_info().rows_synced 203 204 @property 205 def start_time(self) -> datetime: 206 """Return the start time of the sync job in UTC.""" 207 # Parse from ISO 8601 format: 208 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 209 210 def raise_failure_status( 211 self, 212 *, 213 refresh_status: bool = False, 214 ) -> None: 215 """Raise an exception if the sync job failed. 216 217 By default, this method will use the latest status available. If you want to refresh the 218 status before checking for failure, set `refresh_status=True`. If the job has failed, this 219 method will raise a `AirbyteConnectionSyncError`. 220 221 Otherwise, do nothing. 222 """ 223 if not refresh_status and self._latest_job_info: 224 latest_status = self._latest_job_info.status 225 else: 226 latest_status = self.get_job_status() 227 228 if latest_status in FAILED_STATUSES: 229 raise AirbyteConnectionSyncError( 230 workspace=self.workspace, 231 connection_id=self.connection.connection_id, 232 job_id=self.job_id, 233 job_status=self.get_job_status(), 234 ) 235 236 def wait_for_completion( 237 self, 238 *, 239 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 240 raise_timeout: bool = True, 241 raise_failure: bool = False, 242 ) -> JobStatusEnum: 243 """Wait for a job to finish running.""" 244 start_time = time.time() 245 while True: 246 latest_status = self.get_job_status() 247 if latest_status in FINAL_STATUSES: 248 if raise_failure: 249 # No-op if the job succeeded or is still running: 250 self.raise_failure_status() 251 252 return latest_status 253 254 if time.time() - start_time > wait_timeout: 255 if raise_timeout: 256 raise AirbyteConnectionSyncTimeoutError( 257 workspace=self.workspace, 258 connection_id=self.connection.connection_id, 259 job_id=self.job_id, 260 job_status=latest_status, 261 timeout=wait_timeout, 262 ) 263 264 return latest_status # This will be a non-final status 265 266 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 267 268 def get_sql_cache(self) -> CacheBase: 269 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 270 if self._cache: 271 return self._cache 272 273 destination_configuration: dict[str, Any] = self._get_destination_configuration() 274 self._cache = create_cache_from_destination_config( 275 destination_configuration=destination_configuration 276 ) 277 return self._cache 278 279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 return self.get_sql_cache().get_sql_engine() 282 283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 286 287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 return self.get_sql_cache().processor.get_sql_table(stream_name) 293 294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 ) 308 309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name() 313 314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name 318 319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names 323 324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self) 335 336 class _SyncResultStreams(Mapping[str, CachedDataset]): 337 """A mapping of stream names to cached datasets.""" 338 339 def __init__( 340 self, 341 parent: SyncResult, 342 /, 343 ) -> None: 344 self.parent: SyncResult = parent 345 346 def __getitem__(self, key: str) -> CachedDataset: 347 return self.parent.get_dataset(stream_name=key) 348 349 def __iter__(self) -> Iterator[str]: 350 return iter(self.parent.stream_names) 351 352 def __len__(self) -> int: 353 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}"
Return the URL of the sync job.
174 def is_job_complete(self) -> bool: 175 """Check if the sync job is complete.""" 176 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
178 def get_job_status(self) -> JobStatusEnum: 179 """Check if the sync job is still running.""" 180 return self._fetch_latest_job_info().status
Check if the sync job is still running.
194 @property 195 def bytes_synced(self) -> int: 196 """Return the number of records processed.""" 197 return self._fetch_latest_job_info().bytes_synced
Return the number of records processed.
199 @property 200 def records_synced(self) -> int: 201 """Return the number of records processed.""" 202 return self._fetch_latest_job_info().rows_synced
Return the number of records processed.
204 @property 205 def start_time(self) -> datetime: 206 """Return the start time of the sync job in UTC.""" 207 # Parse from ISO 8601 format: 208 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
210 def raise_failure_status( 211 self, 212 *, 213 refresh_status: bool = False, 214 ) -> None: 215 """Raise an exception if the sync job failed. 216 217 By default, this method will use the latest status available. If you want to refresh the 218 status before checking for failure, set `refresh_status=True`. If the job has failed, this 219 method will raise a `AirbyteConnectionSyncError`. 220 221 Otherwise, do nothing. 222 """ 223 if not refresh_status and self._latest_job_info: 224 latest_status = self._latest_job_info.status 225 else: 226 latest_status = self.get_job_status() 227 228 if latest_status in FAILED_STATUSES: 229 raise AirbyteConnectionSyncError( 230 workspace=self.workspace, 231 connection_id=self.connection.connection_id, 232 job_id=self.job_id, 233 job_status=self.get_job_status(), 234 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
236 def wait_for_completion( 237 self, 238 *, 239 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 240 raise_timeout: bool = True, 241 raise_failure: bool = False, 242 ) -> JobStatusEnum: 243 """Wait for a job to finish running.""" 244 start_time = time.time() 245 while True: 246 latest_status = self.get_job_status() 247 if latest_status in FINAL_STATUSES: 248 if raise_failure: 249 # No-op if the job succeeded or is still running: 250 self.raise_failure_status() 251 252 return latest_status 253 254 if time.time() - start_time > wait_timeout: 255 if raise_timeout: 256 raise AirbyteConnectionSyncTimeoutError( 257 workspace=self.workspace, 258 connection_id=self.connection.connection_id, 259 job_id=self.job_id, 260 job_status=latest_status, 261 timeout=wait_timeout, 262 ) 263 264 return latest_status # This will be a non-final status 265 266 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
268 def get_sql_cache(self) -> CacheBase: 269 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 270 if self._cache: 271 return self._cache 272 273 destination_configuration: dict[str, Any] = self._get_destination_configuration() 274 self._cache = create_cache_from_destination_config( 275 destination_configuration=destination_configuration 276 ) 277 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
279 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 280 """Return a SQL Engine for querying a SQL-based destination.""" 281 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
283 def get_sql_table_name(self, stream_name: str) -> str: 284 """Return the SQL table name of the named stream.""" 285 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
287 def get_sql_table( 288 self, 289 stream_name: str, 290 ) -> sqlalchemy.Table: 291 """Return a SQLAlchemy table object for the named stream.""" 292 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
294 def get_dataset(self, stream_name: str) -> CachedDataset: 295 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 296 297 This can be used to read and analyze the data in a SQL-based destination. 298 299 TODO: In a future iteration, we can consider providing stream configuration information 300 (catalog information) to the `CachedDataset` object via the "Get stream properties" 301 API: https://reference.airbyte.com/reference/getstreamproperties 302 """ 303 return CachedDataset( 304 self.get_sql_cache(), 305 stream_name=stream_name, 306 stream_configuration=False, # Don't look for stream configuration in cache. 307 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
309 def get_sql_database_name(self) -> str: 310 """Return the SQL database name.""" 311 cache = self.get_sql_cache() 312 return cache.get_database_name()
Return the SQL database name.
314 def get_sql_schema_name(self) -> str: 315 """Return the SQL schema name.""" 316 cache = self.get_sql_cache() 317 return cache.schema_name
Return the SQL schema name.
319 @property 320 def stream_names(self) -> list[str]: 321 """Return the set of stream names.""" 322 return self.connection.stream_names
Return the set of stream names.
324 @final 325 @property 326 def streams( 327 self, 328 ) -> _SyncResultStreams: 329 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 330 331 This is a convenience wrapper around the `stream_names` 332 property and `get_dataset()` method. 333 """ 334 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
7class JobStatusEnum(str, Enum): 8 PENDING = 'pending' 9 RUNNING = 'running' 10 INCOMPLETE = 'incomplete' 11 FAILED = 'failed' 12 SUCCEEDED = 'succeeded' 13 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans