airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult
object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.constants import JobStatusEnum 58from airbyte.cloud.sync_results import SyncResult 59from airbyte.cloud.workspaces import CloudWorkspace 60 61 62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 63if TYPE_CHECKING: 64 # ruff: noqa: TC004 65 from airbyte.cloud import connections, constants, sync_results, workspaces 66 67 68__all__ = [ 69 # Submodules 70 "workspaces", 71 "connections", 72 "constants", 73 "sync_results", 74 # Classes 75 "CloudWorkspace", 76 "CloudConnection", 77 "SyncResult", 78 # Enums 79 "JobStatusEnum", 80]
57@dataclass 58class CloudWorkspace: 59 """A remote workspace on the Airbyte Cloud. 60 61 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 62 instances, both OSS and Enterprise. 63 """ 64 65 workspace_id: str 66 client_id: SecretString 67 client_secret: SecretString 68 api_root: str = api_util.CLOUD_API_ROOT 69 70 def __post_init__(self) -> None: 71 """Ensure that the client ID and secret are handled securely.""" 72 self.client_id = SecretString(self.client_id) 73 self.client_secret = SecretString(self.client_secret) 74 75 @property 76 def workspace_url(self) -> str | None: 77 """The web URL of the workspace.""" 78 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 79 80 # Test connection and creds 81 82 def connect(self) -> None: 83 """Check that the workspace is reachable and raise an exception otherwise. 84 85 Note: It is not necessary to call this method before calling other operations. It 86 serves primarily as a simple check to ensure that the workspace is reachable 87 and credentials are correct. 88 """ 89 _ = api_util.get_workspace( 90 api_root=self.api_root, 91 workspace_id=self.workspace_id, 92 client_id=self.client_id, 93 client_secret=self.client_secret, 94 ) 95 print(f"Successfully connected to workspace: {self.workspace_url}") 96 97 # Get sources, destinations, and connections 98 99 def get_connection( 100 self, 101 connection_id: str, 102 ) -> CloudConnection: 103 """Get a connection by ID. 104 105 This method does not fetch data from the API. It returns a `CloudConnection` object, 106 which will be loaded lazily as needed. 107 """ 108 return CloudConnection( 109 workspace=self, 110 connection_id=connection_id, 111 ) 112 113 def get_source( 114 self, 115 source_id: str, 116 ) -> CloudSource: 117 """Get a source by ID. 118 119 This method does not fetch data from the API. It returns a `CloudSource` object, 120 which will be loaded lazily as needed. 121 """ 122 return CloudSource( 123 workspace=self, 124 connector_id=source_id, 125 ) 126 127 def get_destination( 128 self, 129 destination_id: str, 130 ) -> CloudDestination: 131 """Get a destination by ID. 132 133 This method does not fetch data from the API. It returns a `CloudDestination` object, 134 which will be loaded lazily as needed. 135 """ 136 return CloudDestination( 137 workspace=self, 138 connector_id=destination_id, 139 ) 140 141 # Deploy sources and destinations 142 143 def deploy_source( 144 self, 145 name: str, 146 source: Source, 147 *, 148 unique: bool = True, 149 random_name_suffix: bool = False, 150 ) -> CloudSource: 151 """Deploy a source to the workspace. 152 153 Returns the newly deployed source. 154 155 Args: 156 name: The name to use when deploying. 157 source: The source object to deploy. 158 unique: Whether to require a unique name. If `True`, duplicate names 159 are not allowed. Defaults to `True`. 160 random_name_suffix: Whether to append a random suffix to the name. 161 """ 162 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 163 source_config_dict["sourceType"] = source.name.replace("source-", "") 164 165 if random_name_suffix: 166 name += f" (ID: {text_util.generate_random_suffix()})" 167 168 if unique: 169 existing = self.list_sources(name=name) 170 if existing: 171 raise exc.AirbyteDuplicateResourcesError( 172 resource_type="source", 173 resource_name=name, 174 ) 175 176 deployed_source = api_util.create_source( 177 name=name, 178 api_root=self.api_root, 179 workspace_id=self.workspace_id, 180 config=source_config_dict, 181 client_id=self.client_id, 182 client_secret=self.client_secret, 183 ) 184 return CloudSource( 185 workspace=self, 186 connector_id=deployed_source.source_id, 187 ) 188 189 def deploy_destination( 190 self, 191 name: str, 192 destination: Destination | dict[str, Any], 193 *, 194 unique: bool = True, 195 random_name_suffix: bool = False, 196 ) -> CloudDestination: 197 """Deploy a destination to the workspace. 198 199 Returns the newly deployed destination ID. 200 201 Args: 202 name: The name to use when deploying. 203 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 204 dictionary of configuration values. 205 unique: Whether to require a unique name. If `True`, duplicate names 206 are not allowed. Defaults to `True`. 207 random_name_suffix: Whether to append a random suffix to the name. 208 """ 209 if isinstance(destination, Destination): 210 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 211 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 212 # raise ValueError(destination_conf_dict) 213 else: 214 destination_conf_dict = destination.copy() 215 if "destinationType" not in destination_conf_dict: 216 raise exc.PyAirbyteInputError( 217 message="Missing `destinationType` in configuration dictionary.", 218 ) 219 220 if random_name_suffix: 221 name += f" (ID: {text_util.generate_random_suffix()})" 222 223 if unique: 224 existing = self.list_destinations(name=name) 225 if existing: 226 raise exc.AirbyteDuplicateResourcesError( 227 resource_type="destination", 228 resource_name=name, 229 ) 230 231 deployed_destination = api_util.create_destination( 232 name=name, 233 api_root=self.api_root, 234 workspace_id=self.workspace_id, 235 config=destination_conf_dict, # Wants a dataclass but accepts dict 236 client_id=self.client_id, 237 client_secret=self.client_secret, 238 ) 239 return CloudDestination( 240 workspace=self, 241 connector_id=deployed_destination.destination_id, 242 ) 243 244 def permanently_delete_source( 245 self, 246 source: str | CloudSource, 247 ) -> None: 248 """Delete a source from the workspace. 249 250 You can pass either the source ID `str` or a deployed `Source` object. 251 """ 252 if not isinstance(source, (str, CloudSource)): 253 raise exc.PyAirbyteInputError( 254 message="Invalid source type.", 255 input_value=type(source).__name__, 256 ) 257 258 api_util.delete_source( 259 source_id=source.connector_id if isinstance(source, CloudSource) else source, 260 api_root=self.api_root, 261 client_id=self.client_id, 262 client_secret=self.client_secret, 263 ) 264 265 # Deploy and delete destinations 266 267 def permanently_delete_destination( 268 self, 269 destination: str | CloudDestination, 270 ) -> None: 271 """Delete a deployed destination from the workspace. 272 273 You can pass either the `Cache` class or the deployed destination ID as a `str`. 274 """ 275 if not isinstance(destination, (str, CloudDestination)): 276 raise exc.PyAirbyteInputError( 277 message="Invalid destination type.", 278 input_value=type(destination).__name__, 279 ) 280 281 api_util.delete_destination( 282 destination_id=( 283 destination if isinstance(destination, str) else destination.destination_id 284 ), 285 api_root=self.api_root, 286 client_id=self.client_id, 287 client_secret=self.client_secret, 288 ) 289 290 # Deploy and delete connections 291 292 def deploy_connection( 293 self, 294 connection_name: str, 295 *, 296 source: CloudSource | str, 297 selected_streams: list[str], 298 destination: CloudDestination | str, 299 table_prefix: str | None = None, 300 ) -> CloudConnection: 301 """Create a new connection between an already deployed source and destination. 302 303 Returns the newly deployed connection object. 304 305 Args: 306 connection_name: The name of the connection. 307 source: The deployed source. You can pass a source ID or a CloudSource object. 308 destination: The deployed destination. You can pass a destination ID or a 309 CloudDestination object. 310 table_prefix: Optional. The table prefix to use when syncing to the destination. 311 selected_streams: The selected stream names to sync within the connection. 312 """ 313 if not selected_streams: 314 raise exc.PyAirbyteInputError( 315 guidance="You must provide `selected_streams` when creating a connection." 316 ) 317 318 source_id: str = source if isinstance(source, str) else source.connector_id 319 destination_id: str = ( 320 destination if isinstance(destination, str) else destination.connector_id 321 ) 322 323 deployed_connection = api_util.create_connection( 324 name=connection_name, 325 source_id=source_id, 326 destination_id=destination_id, 327 api_root=self.api_root, 328 workspace_id=self.workspace_id, 329 selected_stream_names=selected_streams, 330 prefix=table_prefix or "", 331 client_id=self.client_id, 332 client_secret=self.client_secret, 333 ) 334 335 return CloudConnection( 336 workspace=self, 337 connection_id=deployed_connection.connection_id, 338 source=deployed_connection.source_id, 339 destination=deployed_connection.destination_id, 340 ) 341 342 def permanently_delete_connection( 343 self, 344 connection: str | CloudConnection, 345 *, 346 cascade_delete_source: bool = False, 347 cascade_delete_destination: bool = False, 348 ) -> None: 349 """Delete a deployed connection from the workspace.""" 350 if connection is None: 351 raise ValueError("No connection ID provided.") 352 353 if isinstance(connection, str): 354 connection = CloudConnection( 355 workspace=self, 356 connection_id=connection, 357 ) 358 359 api_util.delete_connection( 360 connection_id=connection.connection_id, 361 api_root=self.api_root, 362 workspace_id=self.workspace_id, 363 client_id=self.client_id, 364 client_secret=self.client_secret, 365 ) 366 367 if cascade_delete_source: 368 self.permanently_delete_source(source=connection.source_id) 369 if cascade_delete_destination: 370 self.permanently_delete_destination(destination=connection.destination_id) 371 372 # List sources, destinations, and connections 373 374 def list_connections( 375 self, 376 name: str | None = None, 377 *, 378 name_filter: Callable | None = None, 379 ) -> list[CloudConnection]: 380 """List connections by name in the workspace. 381 382 TODO: Add pagination support 383 """ 384 connections = api_util.list_connections( 385 api_root=self.api_root, 386 workspace_id=self.workspace_id, 387 name=name, 388 name_filter=name_filter, 389 client_id=self.client_id, 390 client_secret=self.client_secret, 391 ) 392 return [ 393 CloudConnection( 394 workspace=self, 395 connection_id=connection.connection_id, 396 source=None, 397 destination=None, 398 ) 399 for connection in connections 400 if name is None or connection.name == name 401 ] 402 403 def list_sources( 404 self, 405 name: str | None = None, 406 *, 407 name_filter: Callable | None = None, 408 ) -> list[CloudSource]: 409 """List all sources in the workspace. 410 411 TODO: Add pagination support 412 """ 413 sources = api_util.list_sources( 414 api_root=self.api_root, 415 workspace_id=self.workspace_id, 416 name=name, 417 name_filter=name_filter, 418 client_id=self.client_id, 419 client_secret=self.client_secret, 420 ) 421 return [ 422 CloudSource( 423 workspace=self, 424 connector_id=source.source_id, 425 ) 426 for source in sources 427 if name is None or source.name == name 428 ] 429 430 def list_destinations( 431 self, 432 name: str | None = None, 433 *, 434 name_filter: Callable | None = None, 435 ) -> list[CloudDestination]: 436 """List all destinations in the workspace. 437 438 TODO: Add pagination support 439 """ 440 destinations = api_util.list_destinations( 441 api_root=self.api_root, 442 workspace_id=self.workspace_id, 443 name=name, 444 name_filter=name_filter, 445 client_id=self.client_id, 446 client_secret=self.client_secret, 447 ) 448 return [ 449 CloudDestination( 450 workspace=self, 451 connector_id=destination.destination_id, 452 ) 453 for destination in destinations 454 if name is None or destination.name == name 455 ]
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
75 @property 76 def workspace_url(self) -> str | None: 77 """The web URL of the workspace.""" 78 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
82 def connect(self) -> None: 83 """Check that the workspace is reachable and raise an exception otherwise. 84 85 Note: It is not necessary to call this method before calling other operations. It 86 serves primarily as a simple check to ensure that the workspace is reachable 87 and credentials are correct. 88 """ 89 _ = api_util.get_workspace( 90 api_root=self.api_root, 91 workspace_id=self.workspace_id, 92 client_id=self.client_id, 93 client_secret=self.client_secret, 94 ) 95 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
99 def get_connection( 100 self, 101 connection_id: str, 102 ) -> CloudConnection: 103 """Get a connection by ID. 104 105 This method does not fetch data from the API. It returns a `CloudConnection` object, 106 which will be loaded lazily as needed. 107 """ 108 return CloudConnection( 109 workspace=self, 110 connection_id=connection_id, 111 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
113 def get_source( 114 self, 115 source_id: str, 116 ) -> CloudSource: 117 """Get a source by ID. 118 119 This method does not fetch data from the API. It returns a `CloudSource` object, 120 which will be loaded lazily as needed. 121 """ 122 return CloudSource( 123 workspace=self, 124 connector_id=source_id, 125 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource
object,
which will be loaded lazily as needed.
127 def get_destination( 128 self, 129 destination_id: str, 130 ) -> CloudDestination: 131 """Get a destination by ID. 132 133 This method does not fetch data from the API. It returns a `CloudDestination` object, 134 which will be loaded lazily as needed. 135 """ 136 return CloudDestination( 137 workspace=self, 138 connector_id=destination_id, 139 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination
object,
which will be loaded lazily as needed.
143 def deploy_source( 144 self, 145 name: str, 146 source: Source, 147 *, 148 unique: bool = True, 149 random_name_suffix: bool = False, 150 ) -> CloudSource: 151 """Deploy a source to the workspace. 152 153 Returns the newly deployed source. 154 155 Args: 156 name: The name to use when deploying. 157 source: The source object to deploy. 158 unique: Whether to require a unique name. If `True`, duplicate names 159 are not allowed. Defaults to `True`. 160 random_name_suffix: Whether to append a random suffix to the name. 161 """ 162 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 163 source_config_dict["sourceType"] = source.name.replace("source-", "") 164 165 if random_name_suffix: 166 name += f" (ID: {text_util.generate_random_suffix()})" 167 168 if unique: 169 existing = self.list_sources(name=name) 170 if existing: 171 raise exc.AirbyteDuplicateResourcesError( 172 resource_type="source", 173 resource_name=name, 174 ) 175 176 deployed_source = api_util.create_source( 177 name=name, 178 api_root=self.api_root, 179 workspace_id=self.workspace_id, 180 config=source_config_dict, 181 client_id=self.client_id, 182 client_secret=self.client_secret, 183 ) 184 return CloudSource( 185 workspace=self, 186 connector_id=deployed_source.source_id, 187 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
189 def deploy_destination( 190 self, 191 name: str, 192 destination: Destination | dict[str, Any], 193 *, 194 unique: bool = True, 195 random_name_suffix: bool = False, 196 ) -> CloudDestination: 197 """Deploy a destination to the workspace. 198 199 Returns the newly deployed destination ID. 200 201 Args: 202 name: The name to use when deploying. 203 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 204 dictionary of configuration values. 205 unique: Whether to require a unique name. If `True`, duplicate names 206 are not allowed. Defaults to `True`. 207 random_name_suffix: Whether to append a random suffix to the name. 208 """ 209 if isinstance(destination, Destination): 210 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 211 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 212 # raise ValueError(destination_conf_dict) 213 else: 214 destination_conf_dict = destination.copy() 215 if "destinationType" not in destination_conf_dict: 216 raise exc.PyAirbyteInputError( 217 message="Missing `destinationType` in configuration dictionary.", 218 ) 219 220 if random_name_suffix: 221 name += f" (ID: {text_util.generate_random_suffix()})" 222 223 if unique: 224 existing = self.list_destinations(name=name) 225 if existing: 226 raise exc.AirbyteDuplicateResourcesError( 227 resource_type="destination", 228 resource_name=name, 229 ) 230 231 deployed_destination = api_util.create_destination( 232 name=name, 233 api_root=self.api_root, 234 workspace_id=self.workspace_id, 235 config=destination_conf_dict, # Wants a dataclass but accepts dict 236 client_id=self.client_id, 237 client_secret=self.client_secret, 238 ) 239 return CloudDestination( 240 workspace=self, 241 connector_id=deployed_destination.destination_id, 242 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destination
object or a dictionary of configuration values. - unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
244 def permanently_delete_source( 245 self, 246 source: str | CloudSource, 247 ) -> None: 248 """Delete a source from the workspace. 249 250 You can pass either the source ID `str` or a deployed `Source` object. 251 """ 252 if not isinstance(source, (str, CloudSource)): 253 raise exc.PyAirbyteInputError( 254 message="Invalid source type.", 255 input_value=type(source).__name__, 256 ) 257 258 api_util.delete_source( 259 source_id=source.connector_id if isinstance(source, CloudSource) else source, 260 api_root=self.api_root, 261 client_id=self.client_id, 262 client_secret=self.client_secret, 263 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
267 def permanently_delete_destination( 268 self, 269 destination: str | CloudDestination, 270 ) -> None: 271 """Delete a deployed destination from the workspace. 272 273 You can pass either the `Cache` class or the deployed destination ID as a `str`. 274 """ 275 if not isinstance(destination, (str, CloudDestination)): 276 raise exc.PyAirbyteInputError( 277 message="Invalid destination type.", 278 input_value=type(destination).__name__, 279 ) 280 281 api_util.delete_destination( 282 destination_id=( 283 destination if isinstance(destination, str) else destination.destination_id 284 ), 285 api_root=self.api_root, 286 client_id=self.client_id, 287 client_secret=self.client_secret, 288 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
292 def deploy_connection( 293 self, 294 connection_name: str, 295 *, 296 source: CloudSource | str, 297 selected_streams: list[str], 298 destination: CloudDestination | str, 299 table_prefix: str | None = None, 300 ) -> CloudConnection: 301 """Create a new connection between an already deployed source and destination. 302 303 Returns the newly deployed connection object. 304 305 Args: 306 connection_name: The name of the connection. 307 source: The deployed source. You can pass a source ID or a CloudSource object. 308 destination: The deployed destination. You can pass a destination ID or a 309 CloudDestination object. 310 table_prefix: Optional. The table prefix to use when syncing to the destination. 311 selected_streams: The selected stream names to sync within the connection. 312 """ 313 if not selected_streams: 314 raise exc.PyAirbyteInputError( 315 guidance="You must provide `selected_streams` when creating a connection." 316 ) 317 318 source_id: str = source if isinstance(source, str) else source.connector_id 319 destination_id: str = ( 320 destination if isinstance(destination, str) else destination.connector_id 321 ) 322 323 deployed_connection = api_util.create_connection( 324 name=connection_name, 325 source_id=source_id, 326 destination_id=destination_id, 327 api_root=self.api_root, 328 workspace_id=self.workspace_id, 329 selected_stream_names=selected_streams, 330 prefix=table_prefix or "", 331 client_id=self.client_id, 332 client_secret=self.client_secret, 333 ) 334 335 return CloudConnection( 336 workspace=self, 337 connection_id=deployed_connection.connection_id, 338 source=deployed_connection.source_id, 339 destination=deployed_connection.destination_id, 340 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
342 def permanently_delete_connection( 343 self, 344 connection: str | CloudConnection, 345 *, 346 cascade_delete_source: bool = False, 347 cascade_delete_destination: bool = False, 348 ) -> None: 349 """Delete a deployed connection from the workspace.""" 350 if connection is None: 351 raise ValueError("No connection ID provided.") 352 353 if isinstance(connection, str): 354 connection = CloudConnection( 355 workspace=self, 356 connection_id=connection, 357 ) 358 359 api_util.delete_connection( 360 connection_id=connection.connection_id, 361 api_root=self.api_root, 362 workspace_id=self.workspace_id, 363 client_id=self.client_id, 364 client_secret=self.client_secret, 365 ) 366 367 if cascade_delete_source: 368 self.permanently_delete_source(source=connection.source_id) 369 if cascade_delete_destination: 370 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
374 def list_connections( 375 self, 376 name: str | None = None, 377 *, 378 name_filter: Callable | None = None, 379 ) -> list[CloudConnection]: 380 """List connections by name in the workspace. 381 382 TODO: Add pagination support 383 """ 384 connections = api_util.list_connections( 385 api_root=self.api_root, 386 workspace_id=self.workspace_id, 387 name=name, 388 name_filter=name_filter, 389 client_id=self.client_id, 390 client_secret=self.client_secret, 391 ) 392 return [ 393 CloudConnection( 394 workspace=self, 395 connection_id=connection.connection_id, 396 source=None, 397 destination=None, 398 ) 399 for connection in connections 400 if name is None or connection.name == name 401 ]
List connections by name in the workspace.
TODO: Add pagination support
403 def list_sources( 404 self, 405 name: str | None = None, 406 *, 407 name_filter: Callable | None = None, 408 ) -> list[CloudSource]: 409 """List all sources in the workspace. 410 411 TODO: Add pagination support 412 """ 413 sources = api_util.list_sources( 414 api_root=self.api_root, 415 workspace_id=self.workspace_id, 416 name=name, 417 name_filter=name_filter, 418 client_id=self.client_id, 419 client_secret=self.client_secret, 420 ) 421 return [ 422 CloudSource( 423 workspace=self, 424 connector_id=source.source_id, 425 ) 426 for source in sources 427 if name is None or source.name == name 428 ]
List all sources in the workspace.
TODO: Add pagination support
430 def list_destinations( 431 self, 432 name: str | None = None, 433 *, 434 name_filter: Callable | None = None, 435 ) -> list[CloudDestination]: 436 """List all destinations in the workspace. 437 438 TODO: Add pagination support 439 """ 440 destinations = api_util.list_destinations( 441 api_root=self.api_root, 442 workspace_id=self.workspace_id, 443 name=name, 444 name_filter=name_filter, 445 client_id=self.client_id, 446 client_secret=self.client_secret, 447 ) 448 return [ 449 CloudDestination( 450 workspace=self, 451 connector_id=destination.destination_id, 452 ) 453 for destination in destinations 454 if name is None or destination.name == name 455 ]
List all destinations in the workspace.
TODO: Add pagination support
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 # Properties 69 70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id) 80 81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object 92 93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id) 103 104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object 115 116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []] 123 124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or "" 131 132 @property 133 def connection_url(self) -> str | None: 134 """The web URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 136 137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/timeline" 141 142 # Run Sync 143 144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result 172 173 def __repr__(self) -> str: 174 """String representation of the connection.""" 175 return ( 176 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 177 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 178 ) 179 180 # Logs 181 182 def get_previous_sync_logs( 183 self, 184 *, 185 limit: int = 10, 186 ) -> list[SyncResult]: 187 """Get the previous sync logs for a connection.""" 188 sync_logs: list[JobResponse] = api_util.get_job_logs( 189 connection_id=self.connection_id, 190 api_root=self.workspace.api_root, 191 workspace_id=self.workspace.workspace_id, 192 limit=limit, 193 client_id=self.workspace.client_id, 194 client_secret=self.workspace.client_secret, 195 ) 196 return [ 197 SyncResult( 198 workspace=self.workspace, 199 connection=self, 200 job_id=sync_log.job_id, 201 _latest_job_info=sync_log, 202 ) 203 for sync_log in sync_logs 204 ] 205 206 def get_sync_result( 207 self, 208 job_id: int | None = None, 209 ) -> SyncResult | None: 210 """Get the sync result for the connection. 211 212 If `job_id` is not provided, the most recent sync job will be used. 213 214 Returns `None` if job_id is omitted and no previous jobs are found. 215 """ 216 if job_id is None: 217 # Get the most recent sync job 218 results = self.get_previous_sync_logs( 219 limit=1, 220 ) 221 if results: 222 return results[0] 223 224 return None 225 226 # Get the sync job by ID (lazy loaded) 227 return SyncResult( 228 workspace=self.workspace, 229 connection=self, 230 job_id=job_id, 231 ) 232 233 # Deletions 234 235 def permanently_delete( 236 self, 237 *, 238 cascade_delete_source: bool = False, 239 cascade_delete_destination: bool = False, 240 ) -> None: 241 """Delete the connection. 242 243 Args: 244 cascade_delete_source: Whether to also delete the source. 245 cascade_delete_destination: Whether to also delete the destination. 246 """ 247 self.workspace.permanently_delete_connection(self) 248 249 if cascade_delete_source: 250 self.workspace.permanently_delete_source(self.source_id) 251 252 if cascade_delete_destination: 253 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id)
The ID of the source.
81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object
Get the source object.
93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id)
The ID of the destination.
104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object
Get the destination object.
116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or ""
The table prefix.
132 @property 133 def connection_url(self) -> str | None: 134 """The web URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result
Run a sync.
182 def get_previous_sync_logs( 183 self, 184 *, 185 limit: int = 10, 186 ) -> list[SyncResult]: 187 """Get the previous sync logs for a connection.""" 188 sync_logs: list[JobResponse] = api_util.get_job_logs( 189 connection_id=self.connection_id, 190 api_root=self.workspace.api_root, 191 workspace_id=self.workspace.workspace_id, 192 limit=limit, 193 client_id=self.workspace.client_id, 194 client_secret=self.workspace.client_secret, 195 ) 196 return [ 197 SyncResult( 198 workspace=self.workspace, 199 connection=self, 200 job_id=sync_log.job_id, 201 _latest_job_info=sync_log, 202 ) 203 for sync_log in sync_logs 204 ]
Get the previous sync logs for a connection.
206 def get_sync_result( 207 self, 208 job_id: int | None = None, 209 ) -> SyncResult | None: 210 """Get the sync result for the connection. 211 212 If `job_id` is not provided, the most recent sync job will be used. 213 214 Returns `None` if job_id is omitted and no previous jobs are found. 215 """ 216 if job_id is None: 217 # Get the most recent sync job 218 results = self.get_previous_sync_logs( 219 limit=1, 220 ) 221 if results: 222 return results[0] 223 224 return None 225 226 # Get the sync job by ID (lazy loaded) 227 return SyncResult( 228 workspace=self.workspace, 229 connection=self, 230 job_id=job_id, 231 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
235 def permanently_delete( 236 self, 237 *, 238 cascade_delete_source: bool = False, 239 cascade_delete_destination: bool = False, 240 ) -> None: 241 """Delete the connection. 242 243 Args: 244 cascade_delete_source: Whether to also delete the source. 245 cascade_delete_destination: Whether to also delete the destination. 246 """ 247 self.workspace.permanently_delete_connection(self) 248 249 if cascade_delete_source: 250 self.workspace.permanently_delete_source(self.source_id) 251 252 if cascade_delete_destination: 253 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: int 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job. 149 150 Note: This currently returns the connection's job history URL, as there is no direct URL 151 to a specific job in the Airbyte Cloud web app. 152 153 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 154 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 155 """ 156 return f"{self.connection.job_history_url}" 157 158 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 159 """Return connection info for the sync job.""" 160 if self._connection_response and not force_refresh: 161 return self._connection_response 162 163 self._connection_response = api_util.get_connection( 164 workspace_id=self.workspace.workspace_id, 165 api_root=self.workspace.api_root, 166 connection_id=self.connection.connection_id, 167 client_id=self.workspace.client_id, 168 client_secret=self.workspace.client_secret, 169 ) 170 return self._connection_response 171 172 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 173 """Return the destination configuration for the sync job.""" 174 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 175 destination_response = api_util.get_destination( 176 destination_id=connection_info.destination_id, 177 api_root=self.workspace.api_root, 178 client_id=self.workspace.client_id, 179 client_secret=self.workspace.client_secret, 180 ) 181 return asdict(destination_response.configuration) 182 183 def is_job_complete(self) -> bool: 184 """Check if the sync job is complete.""" 185 return self.get_job_status() in FINAL_STATUSES 186 187 def get_job_status(self) -> JobStatusEnum: 188 """Check if the sync job is still running.""" 189 return self._fetch_latest_job_info().status 190 191 def _fetch_latest_job_info(self) -> JobResponse: 192 """Return the job info for the sync job.""" 193 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 194 return self._latest_job_info 195 196 self._latest_job_info = api_util.get_job_info( 197 job_id=self.job_id, 198 api_root=self.workspace.api_root, 199 client_id=self.workspace.client_id, 200 client_secret=self.workspace.client_secret, 201 ) 202 return self._latest_job_info 203 204 @property 205 def bytes_synced(self) -> int: 206 """Return the number of records processed.""" 207 return self._fetch_latest_job_info().bytes_synced or 0 208 209 @property 210 def records_synced(self) -> int: 211 """Return the number of records processed.""" 212 return self._fetch_latest_job_info().rows_synced or 0 213 214 @property 215 def start_time(self) -> datetime: 216 """Return the start time of the sync job in UTC.""" 217 # Parse from ISO 8601 format: 218 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 219 220 def raise_failure_status( 221 self, 222 *, 223 refresh_status: bool = False, 224 ) -> None: 225 """Raise an exception if the sync job failed. 226 227 By default, this method will use the latest status available. If you want to refresh the 228 status before checking for failure, set `refresh_status=True`. If the job has failed, this 229 method will raise a `AirbyteConnectionSyncError`. 230 231 Otherwise, do nothing. 232 """ 233 if not refresh_status and self._latest_job_info: 234 latest_status = self._latest_job_info.status 235 else: 236 latest_status = self.get_job_status() 237 238 if latest_status in FAILED_STATUSES: 239 raise AirbyteConnectionSyncError( 240 workspace=self.workspace, 241 connection_id=self.connection.connection_id, 242 job_id=self.job_id, 243 job_status=self.get_job_status(), 244 ) 245 246 def wait_for_completion( 247 self, 248 *, 249 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 250 raise_timeout: bool = True, 251 raise_failure: bool = False, 252 ) -> JobStatusEnum: 253 """Wait for a job to finish running.""" 254 start_time = time.time() 255 while True: 256 latest_status = self.get_job_status() 257 if latest_status in FINAL_STATUSES: 258 if raise_failure: 259 # No-op if the job succeeded or is still running: 260 self.raise_failure_status() 261 262 return latest_status 263 264 if time.time() - start_time > wait_timeout: 265 if raise_timeout: 266 raise AirbyteConnectionSyncTimeoutError( 267 workspace=self.workspace, 268 connection_id=self.connection.connection_id, 269 job_id=self.job_id, 270 job_status=latest_status, 271 timeout=wait_timeout, 272 ) 273 274 return latest_status # This will be a non-final status 275 276 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 277 278 def get_sql_cache(self) -> CacheBase: 279 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 280 if self._cache: 281 return self._cache 282 283 destination_configuration = self._get_destination_configuration() 284 self._cache = destination_to_cache(destination_configuration=destination_configuration) 285 return self._cache 286 287 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 288 """Return a SQL Engine for querying a SQL-based destination.""" 289 return self.get_sql_cache().get_sql_engine() 290 291 def get_sql_table_name(self, stream_name: str) -> str: 292 """Return the SQL table name of the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 294 295 def get_sql_table( 296 self, 297 stream_name: str, 298 ) -> sqlalchemy.Table: 299 """Return a SQLAlchemy table object for the named stream.""" 300 return self.get_sql_cache().processor.get_sql_table(stream_name) 301 302 def get_dataset(self, stream_name: str) -> CachedDataset: 303 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 304 305 This can be used to read and analyze the data in a SQL-based destination. 306 307 TODO: In a future iteration, we can consider providing stream configuration information 308 (catalog information) to the `CachedDataset` object via the "Get stream properties" 309 API: https://reference.airbyte.com/reference/getstreamproperties 310 """ 311 return CachedDataset( 312 self.get_sql_cache(), 313 stream_name=stream_name, 314 stream_configuration=False, # Don't look for stream configuration in cache. 315 ) 316 317 def get_sql_database_name(self) -> str: 318 """Return the SQL database name.""" 319 cache = self.get_sql_cache() 320 return cache.get_database_name() 321 322 def get_sql_schema_name(self) -> str: 323 """Return the SQL schema name.""" 324 cache = self.get_sql_cache() 325 return cache.schema_name 326 327 @property 328 def stream_names(self) -> list[str]: 329 """Return the set of stream names.""" 330 return self.connection.stream_names 331 332 @final 333 @property 334 def streams( 335 self, 336 ) -> _SyncResultStreams: 337 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 338 339 This is a convenience wrapper around the `stream_names` 340 property and `get_dataset()` method. 341 """ 342 return self._SyncResultStreams(self) 343 344 class _SyncResultStreams(Mapping[str, CachedDataset]): 345 """A mapping of stream names to cached datasets.""" 346 347 def __init__( 348 self, 349 parent: SyncResult, 350 /, 351 ) -> None: 352 self.parent: SyncResult = parent 353 354 def __getitem__(self, key: str) -> CachedDataset: 355 return self.parent.get_dataset(stream_name=key) 356 357 def __iter__(self) -> Iterator[str]: 358 return iter(self.parent.stream_names) 359 360 def __len__(self) -> int: 361 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job. 149 150 Note: This currently returns the connection's job history URL, as there is no direct URL 151 to a specific job in the Airbyte Cloud web app. 152 153 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 154 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 155 """ 156 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
183 def is_job_complete(self) -> bool: 184 """Check if the sync job is complete.""" 185 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
187 def get_job_status(self) -> JobStatusEnum: 188 """Check if the sync job is still running.""" 189 return self._fetch_latest_job_info().status
Check if the sync job is still running.
204 @property 205 def bytes_synced(self) -> int: 206 """Return the number of records processed.""" 207 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
209 @property 210 def records_synced(self) -> int: 211 """Return the number of records processed.""" 212 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
214 @property 215 def start_time(self) -> datetime: 216 """Return the start time of the sync job in UTC.""" 217 # Parse from ISO 8601 format: 218 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
220 def raise_failure_status( 221 self, 222 *, 223 refresh_status: bool = False, 224 ) -> None: 225 """Raise an exception if the sync job failed. 226 227 By default, this method will use the latest status available. If you want to refresh the 228 status before checking for failure, set `refresh_status=True`. If the job has failed, this 229 method will raise a `AirbyteConnectionSyncError`. 230 231 Otherwise, do nothing. 232 """ 233 if not refresh_status and self._latest_job_info: 234 latest_status = self._latest_job_info.status 235 else: 236 latest_status = self.get_job_status() 237 238 if latest_status in FAILED_STATUSES: 239 raise AirbyteConnectionSyncError( 240 workspace=self.workspace, 241 connection_id=self.connection.connection_id, 242 job_id=self.job_id, 243 job_status=self.get_job_status(), 244 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
246 def wait_for_completion( 247 self, 248 *, 249 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 250 raise_timeout: bool = True, 251 raise_failure: bool = False, 252 ) -> JobStatusEnum: 253 """Wait for a job to finish running.""" 254 start_time = time.time() 255 while True: 256 latest_status = self.get_job_status() 257 if latest_status in FINAL_STATUSES: 258 if raise_failure: 259 # No-op if the job succeeded or is still running: 260 self.raise_failure_status() 261 262 return latest_status 263 264 if time.time() - start_time > wait_timeout: 265 if raise_timeout: 266 raise AirbyteConnectionSyncTimeoutError( 267 workspace=self.workspace, 268 connection_id=self.connection.connection_id, 269 job_id=self.job_id, 270 job_status=latest_status, 271 timeout=wait_timeout, 272 ) 273 274 return latest_status # This will be a non-final status 275 276 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
278 def get_sql_cache(self) -> CacheBase: 279 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 280 if self._cache: 281 return self._cache 282 283 destination_configuration = self._get_destination_configuration() 284 self._cache = destination_to_cache(destination_configuration=destination_configuration) 285 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
287 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 288 """Return a SQL Engine for querying a SQL-based destination.""" 289 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
291 def get_sql_table_name(self, stream_name: str) -> str: 292 """Return the SQL table name of the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
295 def get_sql_table( 296 self, 297 stream_name: str, 298 ) -> sqlalchemy.Table: 299 """Return a SQLAlchemy table object for the named stream.""" 300 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
302 def get_dataset(self, stream_name: str) -> CachedDataset: 303 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 304 305 This can be used to read and analyze the data in a SQL-based destination. 306 307 TODO: In a future iteration, we can consider providing stream configuration information 308 (catalog information) to the `CachedDataset` object via the "Get stream properties" 309 API: https://reference.airbyte.com/reference/getstreamproperties 310 """ 311 return CachedDataset( 312 self.get_sql_cache(), 313 stream_name=stream_name, 314 stream_configuration=False, # Don't look for stream configuration in cache. 315 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
317 def get_sql_database_name(self) -> str: 318 """Return the SQL database name.""" 319 cache = self.get_sql_cache() 320 return cache.get_database_name()
Return the SQL database name.
322 def get_sql_schema_name(self) -> str: 323 """Return the SQL schema name.""" 324 cache = self.get_sql_cache() 325 return cache.schema_name
Return the SQL schema name.
327 @property 328 def stream_names(self) -> list[str]: 329 """Return the set of stream names.""" 330 return self.connection.stream_names
Return the set of stream names.
332 @final 333 @property 334 def streams( 335 self, 336 ) -> _SyncResultStreams: 337 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 338 339 This is a convenience wrapper around the `stream_names` 340 property and `get_dataset()` method. 341 """ 342 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans