airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult
object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.constants import JobStatusEnum 58from airbyte.cloud.sync_results import SyncResult 59from airbyte.cloud.workspaces import CloudWorkspace 60 61 62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 63if TYPE_CHECKING: 64 # ruff: noqa: TC004 65 from airbyte.cloud import connections, constants, sync_results, workspaces 66 67 68__all__ = [ 69 # Submodules 70 "workspaces", 71 "connections", 72 "constants", 73 "sync_results", 74 # Classes 75 "CloudWorkspace", 76 "CloudConnection", 77 "SyncResult", 78 # Enums 79 "JobStatusEnum", 80]
56@dataclass 57class CloudWorkspace: 58 """A remote workspace on the Airbyte Cloud. 59 60 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 61 instances, both OSS and Enterprise. 62 """ 63 64 workspace_id: str 65 client_id: SecretString 66 client_secret: SecretString 67 api_root: str = api_util.CLOUD_API_ROOT 68 69 def __post_init__(self) -> None: 70 """Ensure that the client ID and secret are handled securely.""" 71 self.client_id = SecretString(self.client_id) 72 self.client_secret = SecretString(self.client_secret) 73 74 @property 75 def workspace_url(self) -> str | None: 76 """The URL of the workspace.""" 77 return f"{self.api_root}/workspaces/{self.workspace_id}" 78 79 # Test connection and creds 80 81 def connect(self) -> None: 82 """Check that the workspace is reachable and raise an exception otherwise. 83 84 Note: It is not necessary to call this method before calling other operations. It 85 serves primarily as a simple check to ensure that the workspace is reachable 86 and credentials are correct. 87 """ 88 _ = api_util.get_workspace( 89 api_root=self.api_root, 90 workspace_id=self.workspace_id, 91 client_id=self.client_id, 92 client_secret=self.client_secret, 93 ) 94 print(f"Successfully connected to workspace: {self.workspace_url}") 95 96 # Get sources, destinations, and connections 97 98 def get_connection( 99 self, 100 connection_id: str, 101 ) -> CloudConnection: 102 """Get a connection by ID. 103 104 This method does not fetch data from the API. It returns a `CloudConnection` object, 105 which will be loaded lazily as needed. 106 """ 107 return CloudConnection( 108 workspace=self, 109 connection_id=connection_id, 110 ) 111 112 def get_source( 113 self, 114 source_id: str, 115 ) -> CloudSource: 116 """Get a source by ID. 117 118 This method does not fetch data from the API. It returns a `CloudSource` object, 119 which will be loaded lazily as needed. 120 """ 121 return CloudSource( 122 workspace=self, 123 connector_id=source_id, 124 ) 125 126 def get_destination( 127 self, 128 destination_id: str, 129 ) -> CloudDestination: 130 """Get a destination by ID. 131 132 This method does not fetch data from the API. It returns a `CloudDestination` object, 133 which will be loaded lazily as needed. 134 """ 135 return CloudDestination( 136 workspace=self, 137 connector_id=destination_id, 138 ) 139 140 # Deploy sources and destinations 141 142 def deploy_source( 143 self, 144 name: str, 145 source: Source, 146 *, 147 unique: bool = True, 148 random_name_suffix: bool = False, 149 ) -> CloudSource: 150 """Deploy a source to the workspace. 151 152 Returns the newly deployed source. 153 154 Args: 155 name: The name to use when deploying. 156 source: The source object to deploy. 157 unique: Whether to require a unique name. If `True`, duplicate names 158 are not allowed. Defaults to `True`. 159 random_name_suffix: Whether to append a random suffix to the name. 160 """ 161 source_config_dict = source.get_config().copy() 162 source_config_dict["sourceType"] = source.name.replace("source-", "") 163 164 if random_name_suffix: 165 name += f" (ID: {text_util.generate_random_suffix()})" 166 167 if unique: 168 existing = self.list_sources(name=name) 169 if existing: 170 raise exc.AirbyteDuplicateResourcesError( 171 resource_type="source", 172 resource_name=name, 173 ) 174 175 deployed_source = api_util.create_source( 176 name=name, 177 api_root=self.api_root, 178 workspace_id=self.workspace_id, 179 config=source_config_dict, 180 client_id=self.client_id, 181 client_secret=self.client_secret, 182 ) 183 return CloudSource( 184 workspace=self, 185 connector_id=deployed_source.source_id, 186 ) 187 188 def deploy_destination( 189 self, 190 name: str, 191 destination: Destination | dict[str, Any], 192 *, 193 unique: bool = True, 194 random_name_suffix: bool = False, 195 ) -> CloudDestination: 196 """Deploy a destination to the workspace. 197 198 Returns the newly deployed destination ID. 199 200 Args: 201 name: The name to use when deploying. 202 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 203 dictionary of configuration values. 204 unique: Whether to require a unique name. If `True`, duplicate names 205 are not allowed. Defaults to `True`. 206 random_name_suffix: Whether to append a random suffix to the name. 207 """ 208 if isinstance(destination, Destination): 209 destination_conf_dict = destination.get_config().copy() 210 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 211 # raise ValueError(destination_conf_dict) 212 else: 213 destination_conf_dict = destination.copy() 214 if "destinationType" not in destination_conf_dict: 215 raise exc.PyAirbyteInputError( 216 message="Missing `destinationType` in configuration dictionary.", 217 ) 218 219 if random_name_suffix: 220 name += f" (ID: {text_util.generate_random_suffix()})" 221 222 if unique: 223 existing = self.list_destinations(name=name) 224 if existing: 225 raise exc.AirbyteDuplicateResourcesError( 226 resource_type="destination", 227 resource_name=name, 228 ) 229 230 deployed_destination = api_util.create_destination( 231 name=name, 232 api_root=self.api_root, 233 workspace_id=self.workspace_id, 234 config=destination_conf_dict, # Wants a dataclass but accepts dict 235 client_id=self.client_id, 236 client_secret=self.client_secret, 237 ) 238 return CloudDestination( 239 workspace=self, 240 connector_id=deployed_destination.destination_id, 241 ) 242 243 def permanently_delete_source( 244 self, 245 source: str | CloudSource, 246 ) -> None: 247 """Delete a source from the workspace. 248 249 You can pass either the source ID `str` or a deployed `Source` object. 250 """ 251 if not isinstance(source, (str, CloudSource)): 252 raise exc.PyAirbyteInputError( 253 message="Invalid source type.", 254 input_value=type(source).__name__, 255 ) 256 257 api_util.delete_source( 258 source_id=source.connector_id if isinstance(source, CloudSource) else source, 259 api_root=self.api_root, 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 ) 263 264 # Deploy and delete destinations 265 266 def permanently_delete_destination( 267 self, 268 destination: str | CloudDestination, 269 ) -> None: 270 """Delete a deployed destination from the workspace. 271 272 You can pass either the `Cache` class or the deployed destination ID as a `str`. 273 """ 274 if not isinstance(destination, (str, CloudDestination)): 275 raise exc.PyAirbyteInputError( 276 message="Invalid destination type.", 277 input_value=type(destination).__name__, 278 ) 279 280 api_util.delete_destination( 281 destination_id=( 282 destination if isinstance(destination, str) else destination.destination_id 283 ), 284 api_root=self.api_root, 285 client_id=self.client_id, 286 client_secret=self.client_secret, 287 ) 288 289 # Deploy and delete connections 290 291 def deploy_connection( 292 self, 293 connection_name: str, 294 *, 295 source: CloudSource | str, 296 selected_streams: list[str], 297 destination: CloudDestination | str, 298 table_prefix: str | None = None, 299 ) -> CloudConnection: 300 """Create a new connection between an already deployed source and destination. 301 302 Returns the newly deployed connection object. 303 304 Args: 305 connection_name: The name of the connection. 306 source: The deployed source. You can pass a source ID or a CloudSource object. 307 destination: The deployed destination. You can pass a destination ID or a 308 CloudDestination object. 309 table_prefix: Optional. The table prefix to use when syncing to the destination. 310 selected_streams: The selected stream names to sync within the connection. 311 """ 312 if not selected_streams: 313 raise exc.PyAirbyteInputError( 314 guidance="You must provide `selected_streams` when creating a connection." 315 ) 316 317 source_id: str = source if isinstance(source, str) else source.connector_id 318 destination_id: str = ( 319 destination if isinstance(destination, str) else destination.connector_id 320 ) 321 322 deployed_connection = api_util.create_connection( 323 name=connection_name, 324 source_id=source_id, 325 destination_id=destination_id, 326 api_root=self.api_root, 327 workspace_id=self.workspace_id, 328 selected_stream_names=selected_streams, 329 prefix=table_prefix or "", 330 client_id=self.client_id, 331 client_secret=self.client_secret, 332 ) 333 334 return CloudConnection( 335 workspace=self, 336 connection_id=deployed_connection.connection_id, 337 source=deployed_connection.source_id, 338 destination=deployed_connection.destination_id, 339 ) 340 341 def permanently_delete_connection( 342 self, 343 connection: str | CloudConnection, 344 *, 345 cascade_delete_source: bool = False, 346 cascade_delete_destination: bool = False, 347 ) -> None: 348 """Delete a deployed connection from the workspace.""" 349 if connection is None: 350 raise ValueError("No connection ID provided.") 351 352 if isinstance(connection, str): 353 connection = CloudConnection( 354 workspace=self, 355 connection_id=connection, 356 ) 357 358 api_util.delete_connection( 359 connection_id=connection.connection_id, 360 api_root=self.api_root, 361 workspace_id=self.workspace_id, 362 client_id=self.client_id, 363 client_secret=self.client_secret, 364 ) 365 366 if cascade_delete_source: 367 self.permanently_delete_source(source=connection.source_id) 368 if cascade_delete_destination: 369 self.permanently_delete_destination(destination=connection.destination_id) 370 371 # List sources, destinations, and connections 372 373 def list_connections( 374 self, 375 name: str | None = None, 376 *, 377 name_filter: Callable | None = None, 378 ) -> list[CloudConnection]: 379 """List connections by name in the workspace.""" 380 connections = api_util.list_connections( 381 api_root=self.api_root, 382 workspace_id=self.workspace_id, 383 name=name, 384 name_filter=name_filter, 385 client_id=self.client_id, 386 client_secret=self.client_secret, 387 ) 388 return [ 389 CloudConnection( 390 workspace=self, 391 connection_id=connection.connection_id, 392 source=None, 393 destination=None, 394 ) 395 for connection in connections 396 if name is None or connection.name == name 397 ] 398 399 def list_sources( 400 self, 401 name: str | None = None, 402 *, 403 name_filter: Callable | None = None, 404 ) -> list[CloudSource]: 405 """List all sources in the workspace.""" 406 sources = api_util.list_sources( 407 api_root=self.api_root, 408 workspace_id=self.workspace_id, 409 name=name, 410 name_filter=name_filter, 411 client_id=self.client_id, 412 client_secret=self.client_secret, 413 ) 414 return [ 415 CloudSource( 416 workspace=self, 417 connector_id=source.source_id, 418 ) 419 for source in sources 420 if name is None or source.name == name 421 ] 422 423 def list_destinations( 424 self, 425 name: str | None = None, 426 *, 427 name_filter: Callable | None = None, 428 ) -> list[CloudDestination]: 429 """List all destinations in the workspace.""" 430 destinations = api_util.list_destinations( 431 api_root=self.api_root, 432 workspace_id=self.workspace_id, 433 name=name, 434 name_filter=name_filter, 435 client_id=self.client_id, 436 client_secret=self.client_secret, 437 ) 438 return [ 439 CloudDestination( 440 workspace=self, 441 connector_id=destination.destination_id, 442 ) 443 for destination in destinations 444 if name is None or destination.name == name 445 ]
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
74 @property 75 def workspace_url(self) -> str | None: 76 """The URL of the workspace.""" 77 return f"{self.api_root}/workspaces/{self.workspace_id}"
The URL of the workspace.
81 def connect(self) -> None: 82 """Check that the workspace is reachable and raise an exception otherwise. 83 84 Note: It is not necessary to call this method before calling other operations. It 85 serves primarily as a simple check to ensure that the workspace is reachable 86 and credentials are correct. 87 """ 88 _ = api_util.get_workspace( 89 api_root=self.api_root, 90 workspace_id=self.workspace_id, 91 client_id=self.client_id, 92 client_secret=self.client_secret, 93 ) 94 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
98 def get_connection( 99 self, 100 connection_id: str, 101 ) -> CloudConnection: 102 """Get a connection by ID. 103 104 This method does not fetch data from the API. It returns a `CloudConnection` object, 105 which will be loaded lazily as needed. 106 """ 107 return CloudConnection( 108 workspace=self, 109 connection_id=connection_id, 110 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
112 def get_source( 113 self, 114 source_id: str, 115 ) -> CloudSource: 116 """Get a source by ID. 117 118 This method does not fetch data from the API. It returns a `CloudSource` object, 119 which will be loaded lazily as needed. 120 """ 121 return CloudSource( 122 workspace=self, 123 connector_id=source_id, 124 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource
object,
which will be loaded lazily as needed.
126 def get_destination( 127 self, 128 destination_id: str, 129 ) -> CloudDestination: 130 """Get a destination by ID. 131 132 This method does not fetch data from the API. It returns a `CloudDestination` object, 133 which will be loaded lazily as needed. 134 """ 135 return CloudDestination( 136 workspace=self, 137 connector_id=destination_id, 138 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination
object,
which will be loaded lazily as needed.
142 def deploy_source( 143 self, 144 name: str, 145 source: Source, 146 *, 147 unique: bool = True, 148 random_name_suffix: bool = False, 149 ) -> CloudSource: 150 """Deploy a source to the workspace. 151 152 Returns the newly deployed source. 153 154 Args: 155 name: The name to use when deploying. 156 source: The source object to deploy. 157 unique: Whether to require a unique name. If `True`, duplicate names 158 are not allowed. Defaults to `True`. 159 random_name_suffix: Whether to append a random suffix to the name. 160 """ 161 source_config_dict = source.get_config().copy() 162 source_config_dict["sourceType"] = source.name.replace("source-", "") 163 164 if random_name_suffix: 165 name += f" (ID: {text_util.generate_random_suffix()})" 166 167 if unique: 168 existing = self.list_sources(name=name) 169 if existing: 170 raise exc.AirbyteDuplicateResourcesError( 171 resource_type="source", 172 resource_name=name, 173 ) 174 175 deployed_source = api_util.create_source( 176 name=name, 177 api_root=self.api_root, 178 workspace_id=self.workspace_id, 179 config=source_config_dict, 180 client_id=self.client_id, 181 client_secret=self.client_secret, 182 ) 183 return CloudSource( 184 workspace=self, 185 connector_id=deployed_source.source_id, 186 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
188 def deploy_destination( 189 self, 190 name: str, 191 destination: Destination | dict[str, Any], 192 *, 193 unique: bool = True, 194 random_name_suffix: bool = False, 195 ) -> CloudDestination: 196 """Deploy a destination to the workspace. 197 198 Returns the newly deployed destination ID. 199 200 Args: 201 name: The name to use when deploying. 202 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 203 dictionary of configuration values. 204 unique: Whether to require a unique name. If `True`, duplicate names 205 are not allowed. Defaults to `True`. 206 random_name_suffix: Whether to append a random suffix to the name. 207 """ 208 if isinstance(destination, Destination): 209 destination_conf_dict = destination.get_config().copy() 210 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 211 # raise ValueError(destination_conf_dict) 212 else: 213 destination_conf_dict = destination.copy() 214 if "destinationType" not in destination_conf_dict: 215 raise exc.PyAirbyteInputError( 216 message="Missing `destinationType` in configuration dictionary.", 217 ) 218 219 if random_name_suffix: 220 name += f" (ID: {text_util.generate_random_suffix()})" 221 222 if unique: 223 existing = self.list_destinations(name=name) 224 if existing: 225 raise exc.AirbyteDuplicateResourcesError( 226 resource_type="destination", 227 resource_name=name, 228 ) 229 230 deployed_destination = api_util.create_destination( 231 name=name, 232 api_root=self.api_root, 233 workspace_id=self.workspace_id, 234 config=destination_conf_dict, # Wants a dataclass but accepts dict 235 client_id=self.client_id, 236 client_secret=self.client_secret, 237 ) 238 return CloudDestination( 239 workspace=self, 240 connector_id=deployed_destination.destination_id, 241 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destination
object or a dictionary of configuration values. - unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
243 def permanently_delete_source( 244 self, 245 source: str | CloudSource, 246 ) -> None: 247 """Delete a source from the workspace. 248 249 You can pass either the source ID `str` or a deployed `Source` object. 250 """ 251 if not isinstance(source, (str, CloudSource)): 252 raise exc.PyAirbyteInputError( 253 message="Invalid source type.", 254 input_value=type(source).__name__, 255 ) 256 257 api_util.delete_source( 258 source_id=source.connector_id if isinstance(source, CloudSource) else source, 259 api_root=self.api_root, 260 client_id=self.client_id, 261 client_secret=self.client_secret, 262 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
266 def permanently_delete_destination( 267 self, 268 destination: str | CloudDestination, 269 ) -> None: 270 """Delete a deployed destination from the workspace. 271 272 You can pass either the `Cache` class or the deployed destination ID as a `str`. 273 """ 274 if not isinstance(destination, (str, CloudDestination)): 275 raise exc.PyAirbyteInputError( 276 message="Invalid destination type.", 277 input_value=type(destination).__name__, 278 ) 279 280 api_util.delete_destination( 281 destination_id=( 282 destination if isinstance(destination, str) else destination.destination_id 283 ), 284 api_root=self.api_root, 285 client_id=self.client_id, 286 client_secret=self.client_secret, 287 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
291 def deploy_connection( 292 self, 293 connection_name: str, 294 *, 295 source: CloudSource | str, 296 selected_streams: list[str], 297 destination: CloudDestination | str, 298 table_prefix: str | None = None, 299 ) -> CloudConnection: 300 """Create a new connection between an already deployed source and destination. 301 302 Returns the newly deployed connection object. 303 304 Args: 305 connection_name: The name of the connection. 306 source: The deployed source. You can pass a source ID or a CloudSource object. 307 destination: The deployed destination. You can pass a destination ID or a 308 CloudDestination object. 309 table_prefix: Optional. The table prefix to use when syncing to the destination. 310 selected_streams: The selected stream names to sync within the connection. 311 """ 312 if not selected_streams: 313 raise exc.PyAirbyteInputError( 314 guidance="You must provide `selected_streams` when creating a connection." 315 ) 316 317 source_id: str = source if isinstance(source, str) else source.connector_id 318 destination_id: str = ( 319 destination if isinstance(destination, str) else destination.connector_id 320 ) 321 322 deployed_connection = api_util.create_connection( 323 name=connection_name, 324 source_id=source_id, 325 destination_id=destination_id, 326 api_root=self.api_root, 327 workspace_id=self.workspace_id, 328 selected_stream_names=selected_streams, 329 prefix=table_prefix or "", 330 client_id=self.client_id, 331 client_secret=self.client_secret, 332 ) 333 334 return CloudConnection( 335 workspace=self, 336 connection_id=deployed_connection.connection_id, 337 source=deployed_connection.source_id, 338 destination=deployed_connection.destination_id, 339 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
341 def permanently_delete_connection( 342 self, 343 connection: str | CloudConnection, 344 *, 345 cascade_delete_source: bool = False, 346 cascade_delete_destination: bool = False, 347 ) -> None: 348 """Delete a deployed connection from the workspace.""" 349 if connection is None: 350 raise ValueError("No connection ID provided.") 351 352 if isinstance(connection, str): 353 connection = CloudConnection( 354 workspace=self, 355 connection_id=connection, 356 ) 357 358 api_util.delete_connection( 359 connection_id=connection.connection_id, 360 api_root=self.api_root, 361 workspace_id=self.workspace_id, 362 client_id=self.client_id, 363 client_secret=self.client_secret, 364 ) 365 366 if cascade_delete_source: 367 self.permanently_delete_source(source=connection.source_id) 368 if cascade_delete_destination: 369 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
373 def list_connections( 374 self, 375 name: str | None = None, 376 *, 377 name_filter: Callable | None = None, 378 ) -> list[CloudConnection]: 379 """List connections by name in the workspace.""" 380 connections = api_util.list_connections( 381 api_root=self.api_root, 382 workspace_id=self.workspace_id, 383 name=name, 384 name_filter=name_filter, 385 client_id=self.client_id, 386 client_secret=self.client_secret, 387 ) 388 return [ 389 CloudConnection( 390 workspace=self, 391 connection_id=connection.connection_id, 392 source=None, 393 destination=None, 394 ) 395 for connection in connections 396 if name is None or connection.name == name 397 ]
List connections by name in the workspace.
399 def list_sources( 400 self, 401 name: str | None = None, 402 *, 403 name_filter: Callable | None = None, 404 ) -> list[CloudSource]: 405 """List all sources in the workspace.""" 406 sources = api_util.list_sources( 407 api_root=self.api_root, 408 workspace_id=self.workspace_id, 409 name=name, 410 name_filter=name_filter, 411 client_id=self.client_id, 412 client_secret=self.client_secret, 413 ) 414 return [ 415 CloudSource( 416 workspace=self, 417 connector_id=source.source_id, 418 ) 419 for source in sources 420 if name is None or source.name == name 421 ]
List all sources in the workspace.
423 def list_destinations( 424 self, 425 name: str | None = None, 426 *, 427 name_filter: Callable | None = None, 428 ) -> list[CloudDestination]: 429 """List all destinations in the workspace.""" 430 destinations = api_util.list_destinations( 431 api_root=self.api_root, 432 workspace_id=self.workspace_id, 433 name=name, 434 name_filter=name_filter, 435 client_id=self.client_id, 436 client_secret=self.client_secret, 437 ) 438 return [ 439 CloudDestination( 440 workspace=self, 441 connector_id=destination.destination_id, 442 ) 443 for destination in destinations 444 if name is None or destination.name == name 445 ]
List all destinations in the workspace.
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 # Properties 69 70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id) 80 81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object 92 93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id) 103 104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object 115 116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []] 123 124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or "" 131 132 @property 133 def connection_url(self) -> str | None: 134 """The URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 136 137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/job-history" 141 142 # Run Sync 143 144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result 172 173 # Logs 174 175 def get_previous_sync_logs( 176 self, 177 *, 178 limit: int = 10, 179 ) -> list[SyncResult]: 180 """Get the previous sync logs for a connection.""" 181 sync_logs: list[JobResponse] = api_util.get_job_logs( 182 connection_id=self.connection_id, 183 api_root=self.workspace.api_root, 184 workspace_id=self.workspace.workspace_id, 185 limit=limit, 186 client_id=self.workspace.client_id, 187 client_secret=self.workspace.client_secret, 188 ) 189 return [ 190 SyncResult( 191 workspace=self.workspace, 192 connection=self, 193 job_id=sync_log.job_id, 194 _latest_job_info=sync_log, 195 ) 196 for sync_log in sync_logs 197 ] 198 199 def get_sync_result( 200 self, 201 job_id: int | None = None, 202 ) -> SyncResult | None: 203 """Get the sync result for the connection. 204 205 If `job_id` is not provided, the most recent sync job will be used. 206 207 Returns `None` if job_id is omitted and no previous jobs are found. 208 """ 209 if job_id is None: 210 # Get the most recent sync job 211 results = self.get_previous_sync_logs( 212 limit=1, 213 ) 214 if results: 215 return results[0] 216 217 return None 218 219 # Get the sync job by ID (lazy loaded) 220 return SyncResult( 221 workspace=self.workspace, 222 connection=self, 223 job_id=job_id, 224 ) 225 226 # Deletions 227 228 def permanently_delete( 229 self, 230 *, 231 cascade_delete_source: bool = False, 232 cascade_delete_destination: bool = False, 233 ) -> None: 234 """Delete the connection. 235 236 Args: 237 cascade_delete_source: Whether to also delete the source. 238 cascade_delete_destination: Whether to also delete the destination. 239 """ 240 self.workspace.permanently_delete_connection(self) 241 242 if cascade_delete_source: 243 self.workspace.permanently_delete_source(self.source_id) 244 245 if cascade_delete_destination: 246 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id)
The ID of the source.
81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object
Get the source object.
93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id)
The ID of the destination.
104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object
Get the destination object.
116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or ""
The table prefix.
132 @property 133 def connection_url(self) -> str | None: 134 """The URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The URL to the connection.
137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/job-history"
The URL to the job history for the connection.
144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result
Run a sync.
175 def get_previous_sync_logs( 176 self, 177 *, 178 limit: int = 10, 179 ) -> list[SyncResult]: 180 """Get the previous sync logs for a connection.""" 181 sync_logs: list[JobResponse] = api_util.get_job_logs( 182 connection_id=self.connection_id, 183 api_root=self.workspace.api_root, 184 workspace_id=self.workspace.workspace_id, 185 limit=limit, 186 client_id=self.workspace.client_id, 187 client_secret=self.workspace.client_secret, 188 ) 189 return [ 190 SyncResult( 191 workspace=self.workspace, 192 connection=self, 193 job_id=sync_log.job_id, 194 _latest_job_info=sync_log, 195 ) 196 for sync_log in sync_logs 197 ]
Get the previous sync logs for a connection.
199 def get_sync_result( 200 self, 201 job_id: int | None = None, 202 ) -> SyncResult | None: 203 """Get the sync result for the connection. 204 205 If `job_id` is not provided, the most recent sync job will be used. 206 207 Returns `None` if job_id is omitted and no previous jobs are found. 208 """ 209 if job_id is None: 210 # Get the most recent sync job 211 results = self.get_previous_sync_logs( 212 limit=1, 213 ) 214 if results: 215 return results[0] 216 217 return None 218 219 # Get the sync job by ID (lazy loaded) 220 return SyncResult( 221 workspace=self.workspace, 222 connection=self, 223 job_id=job_id, 224 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
228 def permanently_delete( 229 self, 230 *, 231 cascade_delete_source: bool = False, 232 cascade_delete_destination: bool = False, 233 ) -> None: 234 """Delete the connection. 235 236 Args: 237 cascade_delete_source: Whether to also delete the source. 238 cascade_delete_destination: Whether to also delete the destination. 239 """ 240 self.workspace.permanently_delete_connection(self) 241 242 if cascade_delete_source: 243 self.workspace.permanently_delete_source(self.source_id) 244 245 if cascade_delete_destination: 246 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: int 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}" 150 151 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 152 """Return connection info for the sync job.""" 153 if self._connection_response and not force_refresh: 154 return self._connection_response 155 156 self._connection_response = api_util.get_connection( 157 workspace_id=self.workspace.workspace_id, 158 api_root=self.workspace.api_root, 159 connection_id=self.connection.connection_id, 160 client_id=self.workspace.client_id, 161 client_secret=self.workspace.client_secret, 162 ) 163 return self._connection_response 164 165 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 166 """Return the destination configuration for the sync job.""" 167 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 168 destination_response = api_util.get_destination( 169 destination_id=connection_info.destination_id, 170 api_root=self.workspace.api_root, 171 client_id=self.workspace.client_id, 172 client_secret=self.workspace.client_secret, 173 ) 174 return asdict(destination_response.configuration) 175 176 def is_job_complete(self) -> bool: 177 """Check if the sync job is complete.""" 178 return self.get_job_status() in FINAL_STATUSES 179 180 def get_job_status(self) -> JobStatusEnum: 181 """Check if the sync job is still running.""" 182 return self._fetch_latest_job_info().status 183 184 def _fetch_latest_job_info(self) -> JobResponse: 185 """Return the job info for the sync job.""" 186 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 187 return self._latest_job_info 188 189 self._latest_job_info = api_util.get_job_info( 190 job_id=self.job_id, 191 api_root=self.workspace.api_root, 192 client_id=self.workspace.client_id, 193 client_secret=self.workspace.client_secret, 194 ) 195 return self._latest_job_info 196 197 @property 198 def bytes_synced(self) -> int: 199 """Return the number of records processed.""" 200 return self._fetch_latest_job_info().bytes_synced or 0 201 202 @property 203 def records_synced(self) -> int: 204 """Return the number of records processed.""" 205 return self._fetch_latest_job_info().rows_synced or 0 206 207 @property 208 def start_time(self) -> datetime: 209 """Return the start time of the sync job in UTC.""" 210 # Parse from ISO 8601 format: 211 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 212 213 def raise_failure_status( 214 self, 215 *, 216 refresh_status: bool = False, 217 ) -> None: 218 """Raise an exception if the sync job failed. 219 220 By default, this method will use the latest status available. If you want to refresh the 221 status before checking for failure, set `refresh_status=True`. If the job has failed, this 222 method will raise a `AirbyteConnectionSyncError`. 223 224 Otherwise, do nothing. 225 """ 226 if not refresh_status and self._latest_job_info: 227 latest_status = self._latest_job_info.status 228 else: 229 latest_status = self.get_job_status() 230 231 if latest_status in FAILED_STATUSES: 232 raise AirbyteConnectionSyncError( 233 workspace=self.workspace, 234 connection_id=self.connection.connection_id, 235 job_id=self.job_id, 236 job_status=self.get_job_status(), 237 ) 238 239 def wait_for_completion( 240 self, 241 *, 242 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 243 raise_timeout: bool = True, 244 raise_failure: bool = False, 245 ) -> JobStatusEnum: 246 """Wait for a job to finish running.""" 247 start_time = time.time() 248 while True: 249 latest_status = self.get_job_status() 250 if latest_status in FINAL_STATUSES: 251 if raise_failure: 252 # No-op if the job succeeded or is still running: 253 self.raise_failure_status() 254 255 return latest_status 256 257 if time.time() - start_time > wait_timeout: 258 if raise_timeout: 259 raise AirbyteConnectionSyncTimeoutError( 260 workspace=self.workspace, 261 connection_id=self.connection.connection_id, 262 job_id=self.job_id, 263 job_status=latest_status, 264 timeout=wait_timeout, 265 ) 266 267 return latest_status # This will be a non-final status 268 269 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 270 271 def get_sql_cache(self) -> CacheBase: 272 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 273 if self._cache: 274 return self._cache 275 276 destination_configuration = self._get_destination_configuration() 277 self._cache = destination_to_cache(destination_configuration=destination_configuration) 278 return self._cache 279 280 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 281 """Return a SQL Engine for querying a SQL-based destination.""" 282 return self.get_sql_cache().get_sql_engine() 283 284 def get_sql_table_name(self, stream_name: str) -> str: 285 """Return the SQL table name of the named stream.""" 286 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 287 288 def get_sql_table( 289 self, 290 stream_name: str, 291 ) -> sqlalchemy.Table: 292 """Return a SQLAlchemy table object for the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table(stream_name) 294 295 def get_dataset(self, stream_name: str) -> CachedDataset: 296 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 297 298 This can be used to read and analyze the data in a SQL-based destination. 299 300 TODO: In a future iteration, we can consider providing stream configuration information 301 (catalog information) to the `CachedDataset` object via the "Get stream properties" 302 API: https://reference.airbyte.com/reference/getstreamproperties 303 """ 304 return CachedDataset( 305 self.get_sql_cache(), 306 stream_name=stream_name, 307 stream_configuration=False, # Don't look for stream configuration in cache. 308 ) 309 310 def get_sql_database_name(self) -> str: 311 """Return the SQL database name.""" 312 cache = self.get_sql_cache() 313 return cache.get_database_name() 314 315 def get_sql_schema_name(self) -> str: 316 """Return the SQL schema name.""" 317 cache = self.get_sql_cache() 318 return cache.schema_name 319 320 @property 321 def stream_names(self) -> list[str]: 322 """Return the set of stream names.""" 323 return self.connection.stream_names 324 325 @final 326 @property 327 def streams( 328 self, 329 ) -> _SyncResultStreams: 330 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 331 332 This is a convenience wrapper around the `stream_names` 333 property and `get_dataset()` method. 334 """ 335 return self._SyncResultStreams(self) 336 337 class _SyncResultStreams(Mapping[str, CachedDataset]): 338 """A mapping of stream names to cached datasets.""" 339 340 def __init__( 341 self, 342 parent: SyncResult, 343 /, 344 ) -> None: 345 self.parent: SyncResult = parent 346 347 def __getitem__(self, key: str) -> CachedDataset: 348 return self.parent.get_dataset(stream_name=key) 349 350 def __iter__(self) -> Iterator[str]: 351 return iter(self.parent.stream_names) 352 353 def __len__(self) -> int: 354 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}"
Return the URL of the sync job.
176 def is_job_complete(self) -> bool: 177 """Check if the sync job is complete.""" 178 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
180 def get_job_status(self) -> JobStatusEnum: 181 """Check if the sync job is still running.""" 182 return self._fetch_latest_job_info().status
Check if the sync job is still running.
197 @property 198 def bytes_synced(self) -> int: 199 """Return the number of records processed.""" 200 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
202 @property 203 def records_synced(self) -> int: 204 """Return the number of records processed.""" 205 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
207 @property 208 def start_time(self) -> datetime: 209 """Return the start time of the sync job in UTC.""" 210 # Parse from ISO 8601 format: 211 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
213 def raise_failure_status( 214 self, 215 *, 216 refresh_status: bool = False, 217 ) -> None: 218 """Raise an exception if the sync job failed. 219 220 By default, this method will use the latest status available. If you want to refresh the 221 status before checking for failure, set `refresh_status=True`. If the job has failed, this 222 method will raise a `AirbyteConnectionSyncError`. 223 224 Otherwise, do nothing. 225 """ 226 if not refresh_status and self._latest_job_info: 227 latest_status = self._latest_job_info.status 228 else: 229 latest_status = self.get_job_status() 230 231 if latest_status in FAILED_STATUSES: 232 raise AirbyteConnectionSyncError( 233 workspace=self.workspace, 234 connection_id=self.connection.connection_id, 235 job_id=self.job_id, 236 job_status=self.get_job_status(), 237 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
239 def wait_for_completion( 240 self, 241 *, 242 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 243 raise_timeout: bool = True, 244 raise_failure: bool = False, 245 ) -> JobStatusEnum: 246 """Wait for a job to finish running.""" 247 start_time = time.time() 248 while True: 249 latest_status = self.get_job_status() 250 if latest_status in FINAL_STATUSES: 251 if raise_failure: 252 # No-op if the job succeeded or is still running: 253 self.raise_failure_status() 254 255 return latest_status 256 257 if time.time() - start_time > wait_timeout: 258 if raise_timeout: 259 raise AirbyteConnectionSyncTimeoutError( 260 workspace=self.workspace, 261 connection_id=self.connection.connection_id, 262 job_id=self.job_id, 263 job_status=latest_status, 264 timeout=wait_timeout, 265 ) 266 267 return latest_status # This will be a non-final status 268 269 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
271 def get_sql_cache(self) -> CacheBase: 272 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 273 if self._cache: 274 return self._cache 275 276 destination_configuration = self._get_destination_configuration() 277 self._cache = destination_to_cache(destination_configuration=destination_configuration) 278 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
280 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 281 """Return a SQL Engine for querying a SQL-based destination.""" 282 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
284 def get_sql_table_name(self, stream_name: str) -> str: 285 """Return the SQL table name of the named stream.""" 286 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
288 def get_sql_table( 289 self, 290 stream_name: str, 291 ) -> sqlalchemy.Table: 292 """Return a SQLAlchemy table object for the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
295 def get_dataset(self, stream_name: str) -> CachedDataset: 296 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 297 298 This can be used to read and analyze the data in a SQL-based destination. 299 300 TODO: In a future iteration, we can consider providing stream configuration information 301 (catalog information) to the `CachedDataset` object via the "Get stream properties" 302 API: https://reference.airbyte.com/reference/getstreamproperties 303 """ 304 return CachedDataset( 305 self.get_sql_cache(), 306 stream_name=stream_name, 307 stream_configuration=False, # Don't look for stream configuration in cache. 308 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
310 def get_sql_database_name(self) -> str: 311 """Return the SQL database name.""" 312 cache = self.get_sql_cache() 313 return cache.get_database_name()
Return the SQL database name.
315 def get_sql_schema_name(self) -> str: 316 """Return the SQL schema name.""" 317 cache = self.get_sql_cache() 318 return cache.schema_name
Return the SQL schema name.
320 @property 321 def stream_names(self) -> list[str]: 322 """Return the set of stream names.""" 323 return self.connection.stream_names
Return the set of stream names.
325 @final 326 @property 327 def streams( 328 self, 329 ) -> _SyncResultStreams: 330 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 331 332 This is a convenience wrapper around the `stream_names` 333 property and `get_dataset()` method. 334 """ 335 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans