airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult
object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.constants import JobStatusEnum 58from airbyte.cloud.sync_results import SyncResult 59from airbyte.cloud.workspaces import CloudWorkspace 60 61 62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 63if TYPE_CHECKING: 64 # ruff: noqa: TC004 65 from airbyte.cloud import connections, constants, sync_results, workspaces 66 67 68__all__ = [ 69 # Submodules 70 "workspaces", 71 "connections", 72 "constants", 73 "sync_results", 74 # Classes 75 "CloudWorkspace", 76 "CloudConnection", 77 "SyncResult", 78 # Enums 79 "JobStatusEnum", 80]
57@dataclass 58class CloudWorkspace: 59 """A remote workspace on the Airbyte Cloud. 60 61 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 62 instances, both OSS and Enterprise. 63 """ 64 65 workspace_id: str 66 client_id: SecretString 67 client_secret: SecretString 68 api_root: str = api_util.CLOUD_API_ROOT 69 70 def __post_init__(self) -> None: 71 """Ensure that the client ID and secret are handled securely.""" 72 self.client_id = SecretString(self.client_id) 73 self.client_secret = SecretString(self.client_secret) 74 75 @property 76 def workspace_url(self) -> str | None: 77 """The web URL of the workspace.""" 78 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 79 80 # Test connection and creds 81 82 def connect(self) -> None: 83 """Check that the workspace is reachable and raise an exception otherwise. 84 85 Note: It is not necessary to call this method before calling other operations. It 86 serves primarily as a simple check to ensure that the workspace is reachable 87 and credentials are correct. 88 """ 89 _ = api_util.get_workspace( 90 api_root=self.api_root, 91 workspace_id=self.workspace_id, 92 client_id=self.client_id, 93 client_secret=self.client_secret, 94 ) 95 print(f"Successfully connected to workspace: {self.workspace_url}") 96 97 # Get sources, destinations, and connections 98 99 def get_connection( 100 self, 101 connection_id: str, 102 ) -> CloudConnection: 103 """Get a connection by ID. 104 105 This method does not fetch data from the API. It returns a `CloudConnection` object, 106 which will be loaded lazily as needed. 107 """ 108 return CloudConnection( 109 workspace=self, 110 connection_id=connection_id, 111 ) 112 113 def get_source( 114 self, 115 source_id: str, 116 ) -> CloudSource: 117 """Get a source by ID. 118 119 This method does not fetch data from the API. It returns a `CloudSource` object, 120 which will be loaded lazily as needed. 121 """ 122 return CloudSource( 123 workspace=self, 124 connector_id=source_id, 125 ) 126 127 def get_destination( 128 self, 129 destination_id: str, 130 ) -> CloudDestination: 131 """Get a destination by ID. 132 133 This method does not fetch data from the API. It returns a `CloudDestination` object, 134 which will be loaded lazily as needed. 135 """ 136 return CloudDestination( 137 workspace=self, 138 connector_id=destination_id, 139 ) 140 141 # Deploy sources and destinations 142 143 def deploy_source( 144 self, 145 name: str, 146 source: Source, 147 *, 148 unique: bool = True, 149 random_name_suffix: bool = False, 150 ) -> CloudSource: 151 """Deploy a source to the workspace. 152 153 Returns the newly deployed source. 154 155 Args: 156 name: The name to use when deploying. 157 source: The source object to deploy. 158 unique: Whether to require a unique name. If `True`, duplicate names 159 are not allowed. Defaults to `True`. 160 random_name_suffix: Whether to append a random suffix to the name. 161 """ 162 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 163 source_config_dict["sourceType"] = source.name.replace("source-", "") 164 165 if random_name_suffix: 166 name += f" (ID: {text_util.generate_random_suffix()})" 167 168 if unique: 169 existing = self.list_sources(name=name) 170 if existing: 171 raise exc.AirbyteDuplicateResourcesError( 172 resource_type="source", 173 resource_name=name, 174 ) 175 176 deployed_source = api_util.create_source( 177 name=name, 178 api_root=self.api_root, 179 workspace_id=self.workspace_id, 180 config=source_config_dict, 181 client_id=self.client_id, 182 client_secret=self.client_secret, 183 ) 184 return CloudSource( 185 workspace=self, 186 connector_id=deployed_source.source_id, 187 ) 188 189 def deploy_destination( 190 self, 191 name: str, 192 destination: Destination | dict[str, Any], 193 *, 194 unique: bool = True, 195 random_name_suffix: bool = False, 196 ) -> CloudDestination: 197 """Deploy a destination to the workspace. 198 199 Returns the newly deployed destination ID. 200 201 Args: 202 name: The name to use when deploying. 203 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 204 dictionary of configuration values. 205 unique: Whether to require a unique name. If `True`, duplicate names 206 are not allowed. Defaults to `True`. 207 random_name_suffix: Whether to append a random suffix to the name. 208 """ 209 if isinstance(destination, Destination): 210 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 211 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 212 # raise ValueError(destination_conf_dict) 213 else: 214 destination_conf_dict = destination.copy() 215 if "destinationType" not in destination_conf_dict: 216 raise exc.PyAirbyteInputError( 217 message="Missing `destinationType` in configuration dictionary.", 218 ) 219 220 if random_name_suffix: 221 name += f" (ID: {text_util.generate_random_suffix()})" 222 223 if unique: 224 existing = self.list_destinations(name=name) 225 if existing: 226 raise exc.AirbyteDuplicateResourcesError( 227 resource_type="destination", 228 resource_name=name, 229 ) 230 231 deployed_destination = api_util.create_destination( 232 name=name, 233 api_root=self.api_root, 234 workspace_id=self.workspace_id, 235 config=destination_conf_dict, # Wants a dataclass but accepts dict 236 client_id=self.client_id, 237 client_secret=self.client_secret, 238 ) 239 return CloudDestination( 240 workspace=self, 241 connector_id=deployed_destination.destination_id, 242 ) 243 244 def permanently_delete_source( 245 self, 246 source: str | CloudSource, 247 ) -> None: 248 """Delete a source from the workspace. 249 250 You can pass either the source ID `str` or a deployed `Source` object. 251 """ 252 if not isinstance(source, (str, CloudSource)): 253 raise exc.PyAirbyteInputError( 254 message="Invalid source type.", 255 input_value=type(source).__name__, 256 ) 257 258 api_util.delete_source( 259 source_id=source.connector_id if isinstance(source, CloudSource) else source, 260 api_root=self.api_root, 261 client_id=self.client_id, 262 client_secret=self.client_secret, 263 ) 264 265 # Deploy and delete destinations 266 267 def permanently_delete_destination( 268 self, 269 destination: str | CloudDestination, 270 ) -> None: 271 """Delete a deployed destination from the workspace. 272 273 You can pass either the `Cache` class or the deployed destination ID as a `str`. 274 """ 275 if not isinstance(destination, (str, CloudDestination)): 276 raise exc.PyAirbyteInputError( 277 message="Invalid destination type.", 278 input_value=type(destination).__name__, 279 ) 280 281 api_util.delete_destination( 282 destination_id=( 283 destination if isinstance(destination, str) else destination.destination_id 284 ), 285 api_root=self.api_root, 286 client_id=self.client_id, 287 client_secret=self.client_secret, 288 ) 289 290 # Deploy and delete connections 291 292 def deploy_connection( 293 self, 294 connection_name: str, 295 *, 296 source: CloudSource | str, 297 selected_streams: list[str], 298 destination: CloudDestination | str, 299 table_prefix: str | None = None, 300 ) -> CloudConnection: 301 """Create a new connection between an already deployed source and destination. 302 303 Returns the newly deployed connection object. 304 305 Args: 306 connection_name: The name of the connection. 307 source: The deployed source. You can pass a source ID or a CloudSource object. 308 destination: The deployed destination. You can pass a destination ID or a 309 CloudDestination object. 310 table_prefix: Optional. The table prefix to use when syncing to the destination. 311 selected_streams: The selected stream names to sync within the connection. 312 """ 313 if not selected_streams: 314 raise exc.PyAirbyteInputError( 315 guidance="You must provide `selected_streams` when creating a connection." 316 ) 317 318 source_id: str = source if isinstance(source, str) else source.connector_id 319 destination_id: str = ( 320 destination if isinstance(destination, str) else destination.connector_id 321 ) 322 323 deployed_connection = api_util.create_connection( 324 name=connection_name, 325 source_id=source_id, 326 destination_id=destination_id, 327 api_root=self.api_root, 328 workspace_id=self.workspace_id, 329 selected_stream_names=selected_streams, 330 prefix=table_prefix or "", 331 client_id=self.client_id, 332 client_secret=self.client_secret, 333 ) 334 335 return CloudConnection( 336 workspace=self, 337 connection_id=deployed_connection.connection_id, 338 source=deployed_connection.source_id, 339 destination=deployed_connection.destination_id, 340 ) 341 342 def permanently_delete_connection( 343 self, 344 connection: str | CloudConnection, 345 *, 346 cascade_delete_source: bool = False, 347 cascade_delete_destination: bool = False, 348 ) -> None: 349 """Delete a deployed connection from the workspace.""" 350 if connection is None: 351 raise ValueError("No connection ID provided.") 352 353 if isinstance(connection, str): 354 connection = CloudConnection( 355 workspace=self, 356 connection_id=connection, 357 ) 358 359 api_util.delete_connection( 360 connection_id=connection.connection_id, 361 api_root=self.api_root, 362 workspace_id=self.workspace_id, 363 client_id=self.client_id, 364 client_secret=self.client_secret, 365 ) 366 367 if cascade_delete_source: 368 self.permanently_delete_source(source=connection.source_id) 369 if cascade_delete_destination: 370 self.permanently_delete_destination(destination=connection.destination_id) 371 372 # List sources, destinations, and connections 373 374 def list_connections( 375 self, 376 name: str | None = None, 377 *, 378 name_filter: Callable | None = None, 379 ) -> list[CloudConnection]: 380 """List connections by name in the workspace. 381 382 TODO: Add pagination support 383 """ 384 connections = api_util.list_connections( 385 api_root=self.api_root, 386 workspace_id=self.workspace_id, 387 name=name, 388 name_filter=name_filter, 389 client_id=self.client_id, 390 client_secret=self.client_secret, 391 ) 392 return [ 393 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 394 workspace=self, 395 connection_response=connection, 396 ) 397 for connection in connections 398 if name is None or connection.name == name 399 ] 400 401 def list_sources( 402 self, 403 name: str | None = None, 404 *, 405 name_filter: Callable | None = None, 406 ) -> list[CloudSource]: 407 """List all sources in the workspace. 408 409 TODO: Add pagination support 410 """ 411 sources = api_util.list_sources( 412 api_root=self.api_root, 413 workspace_id=self.workspace_id, 414 name=name, 415 name_filter=name_filter, 416 client_id=self.client_id, 417 client_secret=self.client_secret, 418 ) 419 return [ 420 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 421 workspace=self, 422 source_response=source, 423 ) 424 for source in sources 425 if name is None or source.name == name 426 ] 427 428 def list_destinations( 429 self, 430 name: str | None = None, 431 *, 432 name_filter: Callable | None = None, 433 ) -> list[CloudDestination]: 434 """List all destinations in the workspace. 435 436 TODO: Add pagination support 437 """ 438 destinations = api_util.list_destinations( 439 api_root=self.api_root, 440 workspace_id=self.workspace_id, 441 name=name, 442 name_filter=name_filter, 443 client_id=self.client_id, 444 client_secret=self.client_secret, 445 ) 446 return [ 447 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 448 workspace=self, 449 destination_response=destination, 450 ) 451 for destination in destinations 452 if name is None or destination.name == name 453 ]
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
75 @property 76 def workspace_url(self) -> str | None: 77 """The web URL of the workspace.""" 78 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
82 def connect(self) -> None: 83 """Check that the workspace is reachable and raise an exception otherwise. 84 85 Note: It is not necessary to call this method before calling other operations. It 86 serves primarily as a simple check to ensure that the workspace is reachable 87 and credentials are correct. 88 """ 89 _ = api_util.get_workspace( 90 api_root=self.api_root, 91 workspace_id=self.workspace_id, 92 client_id=self.client_id, 93 client_secret=self.client_secret, 94 ) 95 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
99 def get_connection( 100 self, 101 connection_id: str, 102 ) -> CloudConnection: 103 """Get a connection by ID. 104 105 This method does not fetch data from the API. It returns a `CloudConnection` object, 106 which will be loaded lazily as needed. 107 """ 108 return CloudConnection( 109 workspace=self, 110 connection_id=connection_id, 111 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
113 def get_source( 114 self, 115 source_id: str, 116 ) -> CloudSource: 117 """Get a source by ID. 118 119 This method does not fetch data from the API. It returns a `CloudSource` object, 120 which will be loaded lazily as needed. 121 """ 122 return CloudSource( 123 workspace=self, 124 connector_id=source_id, 125 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource
object,
which will be loaded lazily as needed.
127 def get_destination( 128 self, 129 destination_id: str, 130 ) -> CloudDestination: 131 """Get a destination by ID. 132 133 This method does not fetch data from the API. It returns a `CloudDestination` object, 134 which will be loaded lazily as needed. 135 """ 136 return CloudDestination( 137 workspace=self, 138 connector_id=destination_id, 139 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination
object,
which will be loaded lazily as needed.
143 def deploy_source( 144 self, 145 name: str, 146 source: Source, 147 *, 148 unique: bool = True, 149 random_name_suffix: bool = False, 150 ) -> CloudSource: 151 """Deploy a source to the workspace. 152 153 Returns the newly deployed source. 154 155 Args: 156 name: The name to use when deploying. 157 source: The source object to deploy. 158 unique: Whether to require a unique name. If `True`, duplicate names 159 are not allowed. Defaults to `True`. 160 random_name_suffix: Whether to append a random suffix to the name. 161 """ 162 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 163 source_config_dict["sourceType"] = source.name.replace("source-", "") 164 165 if random_name_suffix: 166 name += f" (ID: {text_util.generate_random_suffix()})" 167 168 if unique: 169 existing = self.list_sources(name=name) 170 if existing: 171 raise exc.AirbyteDuplicateResourcesError( 172 resource_type="source", 173 resource_name=name, 174 ) 175 176 deployed_source = api_util.create_source( 177 name=name, 178 api_root=self.api_root, 179 workspace_id=self.workspace_id, 180 config=source_config_dict, 181 client_id=self.client_id, 182 client_secret=self.client_secret, 183 ) 184 return CloudSource( 185 workspace=self, 186 connector_id=deployed_source.source_id, 187 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
189 def deploy_destination( 190 self, 191 name: str, 192 destination: Destination | dict[str, Any], 193 *, 194 unique: bool = True, 195 random_name_suffix: bool = False, 196 ) -> CloudDestination: 197 """Deploy a destination to the workspace. 198 199 Returns the newly deployed destination ID. 200 201 Args: 202 name: The name to use when deploying. 203 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 204 dictionary of configuration values. 205 unique: Whether to require a unique name. If `True`, duplicate names 206 are not allowed. Defaults to `True`. 207 random_name_suffix: Whether to append a random suffix to the name. 208 """ 209 if isinstance(destination, Destination): 210 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 211 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 212 # raise ValueError(destination_conf_dict) 213 else: 214 destination_conf_dict = destination.copy() 215 if "destinationType" not in destination_conf_dict: 216 raise exc.PyAirbyteInputError( 217 message="Missing `destinationType` in configuration dictionary.", 218 ) 219 220 if random_name_suffix: 221 name += f" (ID: {text_util.generate_random_suffix()})" 222 223 if unique: 224 existing = self.list_destinations(name=name) 225 if existing: 226 raise exc.AirbyteDuplicateResourcesError( 227 resource_type="destination", 228 resource_name=name, 229 ) 230 231 deployed_destination = api_util.create_destination( 232 name=name, 233 api_root=self.api_root, 234 workspace_id=self.workspace_id, 235 config=destination_conf_dict, # Wants a dataclass but accepts dict 236 client_id=self.client_id, 237 client_secret=self.client_secret, 238 ) 239 return CloudDestination( 240 workspace=self, 241 connector_id=deployed_destination.destination_id, 242 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destination
object or a dictionary of configuration values. - unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
244 def permanently_delete_source( 245 self, 246 source: str | CloudSource, 247 ) -> None: 248 """Delete a source from the workspace. 249 250 You can pass either the source ID `str` or a deployed `Source` object. 251 """ 252 if not isinstance(source, (str, CloudSource)): 253 raise exc.PyAirbyteInputError( 254 message="Invalid source type.", 255 input_value=type(source).__name__, 256 ) 257 258 api_util.delete_source( 259 source_id=source.connector_id if isinstance(source, CloudSource) else source, 260 api_root=self.api_root, 261 client_id=self.client_id, 262 client_secret=self.client_secret, 263 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
267 def permanently_delete_destination( 268 self, 269 destination: str | CloudDestination, 270 ) -> None: 271 """Delete a deployed destination from the workspace. 272 273 You can pass either the `Cache` class or the deployed destination ID as a `str`. 274 """ 275 if not isinstance(destination, (str, CloudDestination)): 276 raise exc.PyAirbyteInputError( 277 message="Invalid destination type.", 278 input_value=type(destination).__name__, 279 ) 280 281 api_util.delete_destination( 282 destination_id=( 283 destination if isinstance(destination, str) else destination.destination_id 284 ), 285 api_root=self.api_root, 286 client_id=self.client_id, 287 client_secret=self.client_secret, 288 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
292 def deploy_connection( 293 self, 294 connection_name: str, 295 *, 296 source: CloudSource | str, 297 selected_streams: list[str], 298 destination: CloudDestination | str, 299 table_prefix: str | None = None, 300 ) -> CloudConnection: 301 """Create a new connection between an already deployed source and destination. 302 303 Returns the newly deployed connection object. 304 305 Args: 306 connection_name: The name of the connection. 307 source: The deployed source. You can pass a source ID or a CloudSource object. 308 destination: The deployed destination. You can pass a destination ID or a 309 CloudDestination object. 310 table_prefix: Optional. The table prefix to use when syncing to the destination. 311 selected_streams: The selected stream names to sync within the connection. 312 """ 313 if not selected_streams: 314 raise exc.PyAirbyteInputError( 315 guidance="You must provide `selected_streams` when creating a connection." 316 ) 317 318 source_id: str = source if isinstance(source, str) else source.connector_id 319 destination_id: str = ( 320 destination if isinstance(destination, str) else destination.connector_id 321 ) 322 323 deployed_connection = api_util.create_connection( 324 name=connection_name, 325 source_id=source_id, 326 destination_id=destination_id, 327 api_root=self.api_root, 328 workspace_id=self.workspace_id, 329 selected_stream_names=selected_streams, 330 prefix=table_prefix or "", 331 client_id=self.client_id, 332 client_secret=self.client_secret, 333 ) 334 335 return CloudConnection( 336 workspace=self, 337 connection_id=deployed_connection.connection_id, 338 source=deployed_connection.source_id, 339 destination=deployed_connection.destination_id, 340 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
342 def permanently_delete_connection( 343 self, 344 connection: str | CloudConnection, 345 *, 346 cascade_delete_source: bool = False, 347 cascade_delete_destination: bool = False, 348 ) -> None: 349 """Delete a deployed connection from the workspace.""" 350 if connection is None: 351 raise ValueError("No connection ID provided.") 352 353 if isinstance(connection, str): 354 connection = CloudConnection( 355 workspace=self, 356 connection_id=connection, 357 ) 358 359 api_util.delete_connection( 360 connection_id=connection.connection_id, 361 api_root=self.api_root, 362 workspace_id=self.workspace_id, 363 client_id=self.client_id, 364 client_secret=self.client_secret, 365 ) 366 367 if cascade_delete_source: 368 self.permanently_delete_source(source=connection.source_id) 369 if cascade_delete_destination: 370 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
374 def list_connections( 375 self, 376 name: str | None = None, 377 *, 378 name_filter: Callable | None = None, 379 ) -> list[CloudConnection]: 380 """List connections by name in the workspace. 381 382 TODO: Add pagination support 383 """ 384 connections = api_util.list_connections( 385 api_root=self.api_root, 386 workspace_id=self.workspace_id, 387 name=name, 388 name_filter=name_filter, 389 client_id=self.client_id, 390 client_secret=self.client_secret, 391 ) 392 return [ 393 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 394 workspace=self, 395 connection_response=connection, 396 ) 397 for connection in connections 398 if name is None or connection.name == name 399 ]
List connections by name in the workspace.
TODO: Add pagination support
401 def list_sources( 402 self, 403 name: str | None = None, 404 *, 405 name_filter: Callable | None = None, 406 ) -> list[CloudSource]: 407 """List all sources in the workspace. 408 409 TODO: Add pagination support 410 """ 411 sources = api_util.list_sources( 412 api_root=self.api_root, 413 workspace_id=self.workspace_id, 414 name=name, 415 name_filter=name_filter, 416 client_id=self.client_id, 417 client_secret=self.client_secret, 418 ) 419 return [ 420 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 421 workspace=self, 422 source_response=source, 423 ) 424 for source in sources 425 if name is None or source.name == name 426 ]
List all sources in the workspace.
TODO: Add pagination support
428 def list_destinations( 429 self, 430 name: str | None = None, 431 *, 432 name_filter: Callable | None = None, 433 ) -> list[CloudDestination]: 434 """List all destinations in the workspace. 435 436 TODO: Add pagination support 437 """ 438 destinations = api_util.list_destinations( 439 api_root=self.api_root, 440 workspace_id=self.workspace_id, 441 name=name, 442 name_filter=name_filter, 443 client_id=self.client_id, 444 client_secret=self.client_secret, 445 ) 446 return [ 447 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 448 workspace=self, 449 destination_response=destination, 450 ) 451 for destination in destinations 452 if name is None or destination.name == name 453 ]
List all destinations in the workspace.
TODO: Add pagination support
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 @classmethod 69 def _from_connection_response( 70 cls, 71 workspace: CloudWorkspace, 72 connection_response: ConnectionResponse, 73 ) -> CloudConnection: 74 """Create a CloudConnection from a ConnectionResponse.""" 75 result = cls( 76 workspace=workspace, 77 connection_id=connection_response.connection_id, 78 source=connection_response.source_id, 79 destination=connection_response.destination_id, 80 ) 81 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 82 return result 83 84 # Properties 85 86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name 96 97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return cast("str", self._source_id) 107 108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object 119 120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.source_id 128 129 return cast("str", self._destination_id) 130 131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object 142 143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []] 150 151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or "" 158 159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 163 164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline" 168 169 # Run Sync 170 171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result 199 200 def __repr__(self) -> str: 201 """String representation of the connection.""" 202 return ( 203 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 204 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 205 ) 206 207 # Logs 208 209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ] 232 233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 ) 259 260 # Deletions 261 262 def permanently_delete( 263 self, 264 *, 265 cascade_delete_source: bool = False, 266 cascade_delete_destination: bool = False, 267 ) -> None: 268 """Delete the connection. 269 270 Args: 271 cascade_delete_source: Whether to also delete the source. 272 cascade_delete_destination: Whether to also delete the destination. 273 """ 274 self.workspace.permanently_delete_connection(self) 275 276 if cascade_delete_source: 277 self.workspace.permanently_delete_source(self.source_id) 278 279 if cascade_delete_destination: 280 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return cast("str", self._source_id)
The ID of the source.
108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object
Get the source object.
120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.source_id 128 129 return cast("str", self._destination_id)
The ID of the destination.
131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object
Get the destination object.
143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or ""
The table prefix.
159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result
Run a sync.
209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ]
Get the previous sync logs for a connection.
233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
262 def permanently_delete( 263 self, 264 *, 265 cascade_delete_source: bool = False, 266 cascade_delete_destination: bool = False, 267 ) -> None: 268 """Delete the connection. 269 270 Args: 271 cascade_delete_source: Whether to also delete the source. 272 cascade_delete_destination: Whether to also delete the destination. 273 """ 274 self.workspace.permanently_delete_connection(self) 275 276 if cascade_delete_source: 277 self.workspace.permanently_delete_source(self.source_id) 278 279 if cascade_delete_destination: 280 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
216@dataclass 217class SyncResult: 218 """The result of a sync operation. 219 220 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 221 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 222 """ 223 224 workspace: CloudWorkspace 225 connection: CloudConnection 226 job_id: int 227 table_name_prefix: str = "" 228 table_name_suffix: str = "" 229 _latest_job_info: JobResponse | None = None 230 _connection_response: ConnectionResponse | None = None 231 _cache: CacheBase | None = None 232 _job_with_attempts_info: dict[str, Any] | None = None 233 234 @property 235 def job_url(self) -> str: 236 """Return the URL of the sync job. 237 238 Note: This currently returns the connection's job history URL, as there is no direct URL 239 to a specific job in the Airbyte Cloud web app. 240 241 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 242 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 243 """ 244 return f"{self.connection.job_history_url}" 245 246 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 247 """Return connection info for the sync job.""" 248 if self._connection_response and not force_refresh: 249 return self._connection_response 250 251 self._connection_response = api_util.get_connection( 252 workspace_id=self.workspace.workspace_id, 253 api_root=self.workspace.api_root, 254 connection_id=self.connection.connection_id, 255 client_id=self.workspace.client_id, 256 client_secret=self.workspace.client_secret, 257 ) 258 return self._connection_response 259 260 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 261 """Return the destination configuration for the sync job.""" 262 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 263 destination_response = api_util.get_destination( 264 destination_id=connection_info.destination_id, 265 api_root=self.workspace.api_root, 266 client_id=self.workspace.client_id, 267 client_secret=self.workspace.client_secret, 268 ) 269 return asdict(destination_response.configuration) 270 271 def is_job_complete(self) -> bool: 272 """Check if the sync job is complete.""" 273 return self.get_job_status() in FINAL_STATUSES 274 275 def get_job_status(self) -> JobStatusEnum: 276 """Check if the sync job is still running.""" 277 return self._fetch_latest_job_info().status 278 279 def _fetch_latest_job_info(self) -> JobResponse: 280 """Return the job info for the sync job.""" 281 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 282 return self._latest_job_info 283 284 self._latest_job_info = api_util.get_job_info( 285 job_id=self.job_id, 286 api_root=self.workspace.api_root, 287 client_id=self.workspace.client_id, 288 client_secret=self.workspace.client_secret, 289 ) 290 return self._latest_job_info 291 292 @property 293 def bytes_synced(self) -> int: 294 """Return the number of records processed.""" 295 return self._fetch_latest_job_info().bytes_synced or 0 296 297 @property 298 def records_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().rows_synced or 0 301 302 @property 303 def start_time(self) -> datetime: 304 """Return the start time of the sync job in UTC.""" 305 try: 306 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 307 except (ValueError, TypeError) as e: 308 if "Invalid isoformat string" in str(e): 309 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 310 api_root=self.workspace.api_root, 311 path="/jobs/get", 312 json={"id": self.job_id}, 313 client_id=self.workspace.client_id, 314 client_secret=self.workspace.client_secret, 315 ) 316 raw_start_time = job_info_raw.get("startTime") 317 if raw_start_time: 318 return ab_datetime_parse(raw_start_time) 319 raise 320 321 def _fetch_job_with_attempts(self) -> dict[str, Any]: 322 """Fetch job info with attempts from Config API using lazy loading pattern.""" 323 if self._job_with_attempts_info is not None: 324 return self._job_with_attempts_info 325 326 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 327 api_root=self.workspace.api_root, 328 path="/jobs/get", 329 json={ 330 "id": self.job_id, 331 }, 332 client_id=self.workspace.client_id, 333 client_secret=self.workspace.client_secret, 334 ) 335 return self._job_with_attempts_info 336 337 def get_attempts(self) -> list[SyncAttempt]: 338 """Return a list of attempts for this sync job.""" 339 job_with_attempts = self._fetch_job_with_attempts() 340 attempts_data = job_with_attempts.get("attempts", []) 341 342 return [ 343 SyncAttempt( 344 workspace=self.workspace, 345 connection=self.connection, 346 job_id=self.job_id, 347 attempt_number=i, 348 _attempt_data=attempt_data, 349 ) 350 for i, attempt_data in enumerate(attempts_data, start=0) 351 ] 352 353 def raise_failure_status( 354 self, 355 *, 356 refresh_status: bool = False, 357 ) -> None: 358 """Raise an exception if the sync job failed. 359 360 By default, this method will use the latest status available. If you want to refresh the 361 status before checking for failure, set `refresh_status=True`. If the job has failed, this 362 method will raise a `AirbyteConnectionSyncError`. 363 364 Otherwise, do nothing. 365 """ 366 if not refresh_status and self._latest_job_info: 367 latest_status = self._latest_job_info.status 368 else: 369 latest_status = self.get_job_status() 370 371 if latest_status in FAILED_STATUSES: 372 raise AirbyteConnectionSyncError( 373 workspace=self.workspace, 374 connection_id=self.connection.connection_id, 375 job_id=self.job_id, 376 job_status=self.get_job_status(), 377 ) 378 379 def wait_for_completion( 380 self, 381 *, 382 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 383 raise_timeout: bool = True, 384 raise_failure: bool = False, 385 ) -> JobStatusEnum: 386 """Wait for a job to finish running.""" 387 start_time = time.time() 388 while True: 389 latest_status = self.get_job_status() 390 if latest_status in FINAL_STATUSES: 391 if raise_failure: 392 # No-op if the job succeeded or is still running: 393 self.raise_failure_status() 394 395 return latest_status 396 397 if time.time() - start_time > wait_timeout: 398 if raise_timeout: 399 raise AirbyteConnectionSyncTimeoutError( 400 workspace=self.workspace, 401 connection_id=self.connection.connection_id, 402 job_id=self.job_id, 403 job_status=latest_status, 404 timeout=wait_timeout, 405 ) 406 407 return latest_status # This will be a non-final status 408 409 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 410 411 def get_sql_cache(self) -> CacheBase: 412 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 413 if self._cache: 414 return self._cache 415 416 destination_configuration = self._get_destination_configuration() 417 self._cache = destination_to_cache(destination_configuration=destination_configuration) 418 return self._cache 419 420 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 421 """Return a SQL Engine for querying a SQL-based destination.""" 422 return self.get_sql_cache().get_sql_engine() 423 424 def get_sql_table_name(self, stream_name: str) -> str: 425 """Return the SQL table name of the named stream.""" 426 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 427 428 def get_sql_table( 429 self, 430 stream_name: str, 431 ) -> sqlalchemy.Table: 432 """Return a SQLAlchemy table object for the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table(stream_name) 434 435 def get_dataset(self, stream_name: str) -> CachedDataset: 436 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 437 438 This can be used to read and analyze the data in a SQL-based destination. 439 440 TODO: In a future iteration, we can consider providing stream configuration information 441 (catalog information) to the `CachedDataset` object via the "Get stream properties" 442 API: https://reference.airbyte.com/reference/getstreamproperties 443 """ 444 return CachedDataset( 445 self.get_sql_cache(), 446 stream_name=stream_name, 447 stream_configuration=False, # Don't look for stream configuration in cache. 448 ) 449 450 def get_sql_database_name(self) -> str: 451 """Return the SQL database name.""" 452 cache = self.get_sql_cache() 453 return cache.get_database_name() 454 455 def get_sql_schema_name(self) -> str: 456 """Return the SQL schema name.""" 457 cache = self.get_sql_cache() 458 return cache.schema_name 459 460 @property 461 def stream_names(self) -> list[str]: 462 """Return the set of stream names.""" 463 return self.connection.stream_names 464 465 @final 466 @property 467 def streams( 468 self, 469 ) -> _SyncResultStreams: 470 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 471 472 This is a convenience wrapper around the `stream_names` 473 property and `get_dataset()` method. 474 """ 475 return self._SyncResultStreams(self) 476 477 class _SyncResultStreams(Mapping[str, CachedDataset]): 478 """A mapping of stream names to cached datasets.""" 479 480 def __init__( 481 self, 482 parent: SyncResult, 483 /, 484 ) -> None: 485 self.parent: SyncResult = parent 486 487 def __getitem__(self, key: str) -> CachedDataset: 488 return self.parent.get_dataset(stream_name=key) 489 490 def __iter__(self) -> Iterator[str]: 491 return iter(self.parent.stream_names) 492 493 def __len__(self) -> int: 494 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
234 @property 235 def job_url(self) -> str: 236 """Return the URL of the sync job. 237 238 Note: This currently returns the connection's job history URL, as there is no direct URL 239 to a specific job in the Airbyte Cloud web app. 240 241 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 242 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 243 """ 244 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
271 def is_job_complete(self) -> bool: 272 """Check if the sync job is complete.""" 273 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
275 def get_job_status(self) -> JobStatusEnum: 276 """Check if the sync job is still running.""" 277 return self._fetch_latest_job_info().status
Check if the sync job is still running.
292 @property 293 def bytes_synced(self) -> int: 294 """Return the number of records processed.""" 295 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
297 @property 298 def records_synced(self) -> int: 299 """Return the number of records processed.""" 300 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
302 @property 303 def start_time(self) -> datetime: 304 """Return the start time of the sync job in UTC.""" 305 try: 306 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 307 except (ValueError, TypeError) as e: 308 if "Invalid isoformat string" in str(e): 309 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 310 api_root=self.workspace.api_root, 311 path="/jobs/get", 312 json={"id": self.job_id}, 313 client_id=self.workspace.client_id, 314 client_secret=self.workspace.client_secret, 315 ) 316 raw_start_time = job_info_raw.get("startTime") 317 if raw_start_time: 318 return ab_datetime_parse(raw_start_time) 319 raise
Return the start time of the sync job in UTC.
337 def get_attempts(self) -> list[SyncAttempt]: 338 """Return a list of attempts for this sync job.""" 339 job_with_attempts = self._fetch_job_with_attempts() 340 attempts_data = job_with_attempts.get("attempts", []) 341 342 return [ 343 SyncAttempt( 344 workspace=self.workspace, 345 connection=self.connection, 346 job_id=self.job_id, 347 attempt_number=i, 348 _attempt_data=attempt_data, 349 ) 350 for i, attempt_data in enumerate(attempts_data, start=0) 351 ]
Return a list of attempts for this sync job.
353 def raise_failure_status( 354 self, 355 *, 356 refresh_status: bool = False, 357 ) -> None: 358 """Raise an exception if the sync job failed. 359 360 By default, this method will use the latest status available. If you want to refresh the 361 status before checking for failure, set `refresh_status=True`. If the job has failed, this 362 method will raise a `AirbyteConnectionSyncError`. 363 364 Otherwise, do nothing. 365 """ 366 if not refresh_status and self._latest_job_info: 367 latest_status = self._latest_job_info.status 368 else: 369 latest_status = self.get_job_status() 370 371 if latest_status in FAILED_STATUSES: 372 raise AirbyteConnectionSyncError( 373 workspace=self.workspace, 374 connection_id=self.connection.connection_id, 375 job_id=self.job_id, 376 job_status=self.get_job_status(), 377 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
379 def wait_for_completion( 380 self, 381 *, 382 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 383 raise_timeout: bool = True, 384 raise_failure: bool = False, 385 ) -> JobStatusEnum: 386 """Wait for a job to finish running.""" 387 start_time = time.time() 388 while True: 389 latest_status = self.get_job_status() 390 if latest_status in FINAL_STATUSES: 391 if raise_failure: 392 # No-op if the job succeeded or is still running: 393 self.raise_failure_status() 394 395 return latest_status 396 397 if time.time() - start_time > wait_timeout: 398 if raise_timeout: 399 raise AirbyteConnectionSyncTimeoutError( 400 workspace=self.workspace, 401 connection_id=self.connection.connection_id, 402 job_id=self.job_id, 403 job_status=latest_status, 404 timeout=wait_timeout, 405 ) 406 407 return latest_status # This will be a non-final status 408 409 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
411 def get_sql_cache(self) -> CacheBase: 412 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 413 if self._cache: 414 return self._cache 415 416 destination_configuration = self._get_destination_configuration() 417 self._cache = destination_to_cache(destination_configuration=destination_configuration) 418 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
420 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 421 """Return a SQL Engine for querying a SQL-based destination.""" 422 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
424 def get_sql_table_name(self, stream_name: str) -> str: 425 """Return the SQL table name of the named stream.""" 426 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
428 def get_sql_table( 429 self, 430 stream_name: str, 431 ) -> sqlalchemy.Table: 432 """Return a SQLAlchemy table object for the named stream.""" 433 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
435 def get_dataset(self, stream_name: str) -> CachedDataset: 436 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 437 438 This can be used to read and analyze the data in a SQL-based destination. 439 440 TODO: In a future iteration, we can consider providing stream configuration information 441 (catalog information) to the `CachedDataset` object via the "Get stream properties" 442 API: https://reference.airbyte.com/reference/getstreamproperties 443 """ 444 return CachedDataset( 445 self.get_sql_cache(), 446 stream_name=stream_name, 447 stream_configuration=False, # Don't look for stream configuration in cache. 448 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
450 def get_sql_database_name(self) -> str: 451 """Return the SQL database name.""" 452 cache = self.get_sql_cache() 453 return cache.get_database_name()
Return the SQL database name.
455 def get_sql_schema_name(self) -> str: 456 """Return the SQL schema name.""" 457 cache = self.get_sql_cache() 458 return cache.schema_name
Return the SQL schema name.
460 @property 461 def stream_names(self) -> list[str]: 462 """Return the set of stream names.""" 463 return self.connection.stream_names
Return the set of stream names.
465 @final 466 @property 467 def streams( 468 self, 469 ) -> _SyncResultStreams: 470 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 471 472 This is a convenience wrapper around the `stream_names` 473 property and `get_dataset()` method. 474 """ 475 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans