airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Usage Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult
object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
ℹ️ Experimental Features
You can use the airbyte.cloud.experimental
module to access experimental features.
These additional features are subject to change and may not be available in all environments.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Usage Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50 51ℹ️ **Experimental Features** 52 53You can use the `airbyte.cloud.experimental` module to access experimental features. 54These additional features are subject to change and may not be available in all environments. 55""" # noqa: RUF002 # Allow emoji 56 57from __future__ import annotations 58 59from typing import TYPE_CHECKING 60 61from airbyte.cloud.connections import CloudConnection 62from airbyte.cloud.constants import JobStatusEnum 63from airbyte.cloud.sync_results import SyncResult 64from airbyte.cloud.workspaces import CloudWorkspace 65 66 67# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 68if TYPE_CHECKING: 69 # ruff: noqa: TC004 70 from airbyte.cloud import connections, constants, sync_results, workspaces 71 72 73__all__ = [ 74 # Submodules 75 "workspaces", 76 "connections", 77 "constants", 78 "sync_results", 79 # Classes 80 "CloudWorkspace", 81 "CloudConnection", 82 "SyncResult", 83 # Enums 84 "JobStatusEnum", 85]
28@dataclass 29class CloudWorkspace: 30 """A remote workspace on the Airbyte Cloud. 31 32 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 33 instances, both OSS and Enterprise. 34 """ 35 36 workspace_id: str 37 client_id: SecretString 38 client_secret: SecretString 39 api_root: str = api_util.CLOUD_API_ROOT 40 41 def __post_init__(self) -> None: 42 """Ensure that the client ID and secret are handled securely.""" 43 self.client_id = SecretString(self.client_id) 44 self.client_secret = SecretString(self.client_secret) 45 46 @property 47 def workspace_url(self) -> str | None: 48 """The URL of the workspace.""" 49 return f"{self.api_root}/workspaces/{self.workspace_id}" 50 51 # Test connection and creds 52 53 def connect(self) -> None: 54 """Check that the workspace is reachable and raise an exception otherwise. 55 56 Note: It is not necessary to call this method before calling other operations. It 57 serves primarily as a simple check to ensure that the workspace is reachable 58 and credentials are correct. 59 """ 60 _ = api_util.get_workspace( 61 api_root=self.api_root, 62 workspace_id=self.workspace_id, 63 client_id=self.client_id, 64 client_secret=self.client_secret, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}") 67 68 # Deploy sources and destinations 69 70 def deploy_source( 71 self, 72 name: str, 73 source: Source, 74 *, 75 unique: bool = True, 76 random_name_suffix: bool = False, 77 ) -> CloudSource: 78 """Deploy a source to the workspace. 79 80 Returns the newly deployed source. 81 82 Args: 83 name: The name to use when deploying. 84 source: The source object to deploy. 85 unique: Whether to require a unique name. If `True`, duplicate names 86 are not allowed. Defaults to `True`. 87 random_name_suffix: Whether to append a random suffix to the name. 88 """ 89 source_config_dict = source.get_config().copy() 90 source_config_dict["sourceType"] = source.name.replace("source-", "") 91 92 if random_name_suffix: 93 name += f" (ID: {text_util.generate_random_suffix()})" 94 95 if unique: 96 existing = self.list_sources(name=name) 97 if existing: 98 raise exc.AirbyteDuplicateResourcesError( 99 resource_type="source", 100 resource_name=name, 101 ) 102 103 deployed_source = api_util.create_source( 104 name=name, 105 api_root=self.api_root, 106 workspace_id=self.workspace_id, 107 config=source_config_dict, 108 client_id=self.client_id, 109 client_secret=self.client_secret, 110 ) 111 return CloudSource( 112 workspace=self, 113 connector_id=deployed_source.source_id, 114 ) 115 116 def deploy_destination( 117 self, 118 name: str, 119 destination: Destination | dict[str, Any], 120 *, 121 unique: bool = True, 122 random_name_suffix: bool = False, 123 ) -> CloudDestination: 124 """Deploy a destination to the workspace. 125 126 Returns the newly deployed destination ID. 127 128 Args: 129 name: The name to use when deploying. 130 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 131 dictionary of configuration values. 132 unique: Whether to require a unique name. If `True`, duplicate names 133 are not allowed. Defaults to `True`. 134 random_name_suffix: Whether to append a random suffix to the name. 135 """ 136 if isinstance(destination, Destination): 137 destination_conf_dict = destination.get_config().copy() 138 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 139 # raise ValueError(destination_conf_dict) 140 else: 141 destination_conf_dict = destination.copy() 142 if "destinationType" not in destination_conf_dict: 143 raise exc.PyAirbyteInputError( 144 message="Missing `destinationType` in configuration dictionary.", 145 ) 146 147 if random_name_suffix: 148 name += f" (ID: {text_util.generate_random_suffix()})" 149 150 if unique: 151 existing = self.list_destinations(name=name) 152 if existing: 153 raise exc.AirbyteDuplicateResourcesError( 154 resource_type="destination", 155 resource_name=name, 156 ) 157 158 deployed_destination = api_util.create_destination( 159 name=name, 160 api_root=self.api_root, 161 workspace_id=self.workspace_id, 162 config=destination_conf_dict, # Wants a dataclass but accepts dict 163 client_id=self.client_id, 164 client_secret=self.client_secret, 165 ) 166 return CloudDestination( 167 workspace=self, 168 connector_id=deployed_destination.destination_id, 169 ) 170 171 def permanently_delete_source( 172 self, 173 source: str | CloudSource, 174 ) -> None: 175 """Delete a source from the workspace. 176 177 You can pass either the source ID `str` or a deployed `Source` object. 178 """ 179 if not isinstance(source, (str, CloudSource)): 180 raise exc.PyAirbyteInputError( 181 message="Invalid source type.", 182 input_value=type(source).__name__, 183 ) 184 185 api_util.delete_source( 186 source_id=source.connector_id if isinstance(source, CloudSource) else source, 187 api_root=self.api_root, 188 client_id=self.client_id, 189 client_secret=self.client_secret, 190 ) 191 192 # Deploy and delete destinations 193 194 def permanently_delete_destination( 195 self, 196 destination: str | CloudDestination, 197 ) -> None: 198 """Delete a deployed destination from the workspace. 199 200 You can pass either the `Cache` class or the deployed destination ID as a `str`. 201 """ 202 if not isinstance(destination, (str, CloudDestination)): 203 raise exc.PyAirbyteInputError( 204 message="Invalid destination type.", 205 input_value=type(destination).__name__, 206 ) 207 208 api_util.delete_destination( 209 destination_id=( 210 destination if isinstance(destination, str) else destination.destination_id 211 ), 212 api_root=self.api_root, 213 client_id=self.client_id, 214 client_secret=self.client_secret, 215 ) 216 217 # Deploy and delete connections 218 219 def deploy_connection( 220 self, 221 connection_name: str, 222 *, 223 source: CloudSource | str, 224 selected_streams: list[str], 225 destination: CloudDestination | str, 226 table_prefix: str | None = None, 227 ) -> CloudConnection: 228 """Create a new connection between an already deployed source and destination. 229 230 Returns the newly deployed connection object. 231 232 Args: 233 connection_name: The name of the connection. 234 source: The deployed source. You can pass a source ID or a CloudSource object. 235 destination: The deployed destination. You can pass a destination ID or a 236 CloudDestination object. 237 table_prefix: Optional. The table prefix to use when syncing to the destination. 238 selected_streams: The selected stream names to sync within the connection. 239 """ 240 if not selected_streams: 241 raise exc.PyAirbyteInputError( 242 guidance="You must provide `selected_streams` when creating a connection." 243 ) 244 245 source_id: str = source if isinstance(source, str) else source.connector_id 246 destination_id: str = ( 247 destination if isinstance(destination, str) else destination.connector_id 248 ) 249 250 deployed_connection = api_util.create_connection( 251 name=connection_name, 252 source_id=source_id, 253 destination_id=destination_id, 254 api_root=self.api_root, 255 workspace_id=self.workspace_id, 256 selected_stream_names=selected_streams, 257 prefix=table_prefix or "", 258 client_id=self.client_id, 259 client_secret=self.client_secret, 260 ) 261 262 return CloudConnection( 263 workspace=self, 264 connection_id=deployed_connection.connection_id, 265 source=deployed_connection.source_id, 266 destination=deployed_connection.destination_id, 267 ) 268 269 def get_connection( 270 self, 271 connection_id: str, 272 ) -> CloudConnection: 273 """Get a connection by ID. 274 275 This method does not fetch data from the API. It returns a `CloudConnection` object, 276 which will be loaded lazily as needed. 277 """ 278 return CloudConnection( 279 workspace=self, 280 connection_id=connection_id, 281 ) 282 283 def permanently_delete_connection( 284 self, 285 connection: str | CloudConnection, 286 *, 287 cascade_delete_source: bool = False, 288 cascade_delete_destination: bool = False, 289 ) -> None: 290 """Delete a deployed connection from the workspace.""" 291 if connection is None: 292 raise ValueError("No connection ID provided.") 293 294 if isinstance(connection, str): 295 connection = CloudConnection( 296 workspace=self, 297 connection_id=connection, 298 ) 299 300 api_util.delete_connection( 301 connection_id=connection.connection_id, 302 api_root=self.api_root, 303 workspace_id=self.workspace_id, 304 client_id=self.client_id, 305 client_secret=self.client_secret, 306 ) 307 308 if cascade_delete_source: 309 self.permanently_delete_source(source=connection.source_id) 310 if cascade_delete_destination: 311 self.permanently_delete_destination(destination=connection.destination_id) 312 313 # List sources, destinations, and connections 314 315 def list_connections( 316 self, 317 name: str | None = None, 318 *, 319 name_filter: Callable | None = None, 320 ) -> list[CloudConnection]: 321 """List connections by name in the workspace.""" 322 connections = api_util.list_connections( 323 api_root=self.api_root, 324 workspace_id=self.workspace_id, 325 name=name, 326 name_filter=name_filter, 327 client_id=self.client_id, 328 client_secret=self.client_secret, 329 ) 330 return [ 331 CloudConnection( 332 workspace=self, 333 connection_id=connection.connection_id, 334 source=None, 335 destination=None, 336 ) 337 for connection in connections 338 if name is None or connection.name == name 339 ] 340 341 def list_sources( 342 self, 343 name: str | None = None, 344 *, 345 name_filter: Callable | None = None, 346 ) -> list[CloudSource]: 347 """List all sources in the workspace.""" 348 sources = api_util.list_sources( 349 api_root=self.api_root, 350 workspace_id=self.workspace_id, 351 name=name, 352 name_filter=name_filter, 353 client_id=self.client_id, 354 client_secret=self.client_secret, 355 ) 356 return [ 357 CloudSource( 358 workspace=self, 359 connector_id=source.source_id, 360 ) 361 for source in sources 362 if name is None or source.name == name 363 ] 364 365 def list_destinations( 366 self, 367 name: str | None = None, 368 *, 369 name_filter: Callable | None = None, 370 ) -> list[CloudDestination]: 371 """List all destinations in the workspace.""" 372 destinations = api_util.list_destinations( 373 api_root=self.api_root, 374 workspace_id=self.workspace_id, 375 name=name, 376 name_filter=name_filter, 377 client_id=self.client_id, 378 client_secret=self.client_secret, 379 ) 380 return [ 381 CloudDestination( 382 workspace=self, 383 connector_id=destination.destination_id, 384 ) 385 for destination in destinations 386 if name is None or destination.name == name 387 ]
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
46 @property 47 def workspace_url(self) -> str | None: 48 """The URL of the workspace.""" 49 return f"{self.api_root}/workspaces/{self.workspace_id}"
The URL of the workspace.
53 def connect(self) -> None: 54 """Check that the workspace is reachable and raise an exception otherwise. 55 56 Note: It is not necessary to call this method before calling other operations. It 57 serves primarily as a simple check to ensure that the workspace is reachable 58 and credentials are correct. 59 """ 60 _ = api_util.get_workspace( 61 api_root=self.api_root, 62 workspace_id=self.workspace_id, 63 client_id=self.client_id, 64 client_secret=self.client_secret, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
70 def deploy_source( 71 self, 72 name: str, 73 source: Source, 74 *, 75 unique: bool = True, 76 random_name_suffix: bool = False, 77 ) -> CloudSource: 78 """Deploy a source to the workspace. 79 80 Returns the newly deployed source. 81 82 Args: 83 name: The name to use when deploying. 84 source: The source object to deploy. 85 unique: Whether to require a unique name. If `True`, duplicate names 86 are not allowed. Defaults to `True`. 87 random_name_suffix: Whether to append a random suffix to the name. 88 """ 89 source_config_dict = source.get_config().copy() 90 source_config_dict["sourceType"] = source.name.replace("source-", "") 91 92 if random_name_suffix: 93 name += f" (ID: {text_util.generate_random_suffix()})" 94 95 if unique: 96 existing = self.list_sources(name=name) 97 if existing: 98 raise exc.AirbyteDuplicateResourcesError( 99 resource_type="source", 100 resource_name=name, 101 ) 102 103 deployed_source = api_util.create_source( 104 name=name, 105 api_root=self.api_root, 106 workspace_id=self.workspace_id, 107 config=source_config_dict, 108 client_id=self.client_id, 109 client_secret=self.client_secret, 110 ) 111 return CloudSource( 112 workspace=self, 113 connector_id=deployed_source.source_id, 114 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
116 def deploy_destination( 117 self, 118 name: str, 119 destination: Destination | dict[str, Any], 120 *, 121 unique: bool = True, 122 random_name_suffix: bool = False, 123 ) -> CloudDestination: 124 """Deploy a destination to the workspace. 125 126 Returns the newly deployed destination ID. 127 128 Args: 129 name: The name to use when deploying. 130 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 131 dictionary of configuration values. 132 unique: Whether to require a unique name. If `True`, duplicate names 133 are not allowed. Defaults to `True`. 134 random_name_suffix: Whether to append a random suffix to the name. 135 """ 136 if isinstance(destination, Destination): 137 destination_conf_dict = destination.get_config().copy() 138 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 139 # raise ValueError(destination_conf_dict) 140 else: 141 destination_conf_dict = destination.copy() 142 if "destinationType" not in destination_conf_dict: 143 raise exc.PyAirbyteInputError( 144 message="Missing `destinationType` in configuration dictionary.", 145 ) 146 147 if random_name_suffix: 148 name += f" (ID: {text_util.generate_random_suffix()})" 149 150 if unique: 151 existing = self.list_destinations(name=name) 152 if existing: 153 raise exc.AirbyteDuplicateResourcesError( 154 resource_type="destination", 155 resource_name=name, 156 ) 157 158 deployed_destination = api_util.create_destination( 159 name=name, 160 api_root=self.api_root, 161 workspace_id=self.workspace_id, 162 config=destination_conf_dict, # Wants a dataclass but accepts dict 163 client_id=self.client_id, 164 client_secret=self.client_secret, 165 ) 166 return CloudDestination( 167 workspace=self, 168 connector_id=deployed_destination.destination_id, 169 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destination
object or a dictionary of configuration values. - unique: Whether to require a unique name. If
True
, duplicate names are not allowed. Defaults toTrue
. - random_name_suffix: Whether to append a random suffix to the name.
171 def permanently_delete_source( 172 self, 173 source: str | CloudSource, 174 ) -> None: 175 """Delete a source from the workspace. 176 177 You can pass either the source ID `str` or a deployed `Source` object. 178 """ 179 if not isinstance(source, (str, CloudSource)): 180 raise exc.PyAirbyteInputError( 181 message="Invalid source type.", 182 input_value=type(source).__name__, 183 ) 184 185 api_util.delete_source( 186 source_id=source.connector_id if isinstance(source, CloudSource) else source, 187 api_root=self.api_root, 188 client_id=self.client_id, 189 client_secret=self.client_secret, 190 )
Delete a source from the workspace.
You can pass either the source ID str
or a deployed Source
object.
194 def permanently_delete_destination( 195 self, 196 destination: str | CloudDestination, 197 ) -> None: 198 """Delete a deployed destination from the workspace. 199 200 You can pass either the `Cache` class or the deployed destination ID as a `str`. 201 """ 202 if not isinstance(destination, (str, CloudDestination)): 203 raise exc.PyAirbyteInputError( 204 message="Invalid destination type.", 205 input_value=type(destination).__name__, 206 ) 207 208 api_util.delete_destination( 209 destination_id=( 210 destination if isinstance(destination, str) else destination.destination_id 211 ), 212 api_root=self.api_root, 213 client_id=self.client_id, 214 client_secret=self.client_secret, 215 )
Delete a deployed destination from the workspace.
You can pass either the Cache
class or the deployed destination ID as a str
.
219 def deploy_connection( 220 self, 221 connection_name: str, 222 *, 223 source: CloudSource | str, 224 selected_streams: list[str], 225 destination: CloudDestination | str, 226 table_prefix: str | None = None, 227 ) -> CloudConnection: 228 """Create a new connection between an already deployed source and destination. 229 230 Returns the newly deployed connection object. 231 232 Args: 233 connection_name: The name of the connection. 234 source: The deployed source. You can pass a source ID or a CloudSource object. 235 destination: The deployed destination. You can pass a destination ID or a 236 CloudDestination object. 237 table_prefix: Optional. The table prefix to use when syncing to the destination. 238 selected_streams: The selected stream names to sync within the connection. 239 """ 240 if not selected_streams: 241 raise exc.PyAirbyteInputError( 242 guidance="You must provide `selected_streams` when creating a connection." 243 ) 244 245 source_id: str = source if isinstance(source, str) else source.connector_id 246 destination_id: str = ( 247 destination if isinstance(destination, str) else destination.connector_id 248 ) 249 250 deployed_connection = api_util.create_connection( 251 name=connection_name, 252 source_id=source_id, 253 destination_id=destination_id, 254 api_root=self.api_root, 255 workspace_id=self.workspace_id, 256 selected_stream_names=selected_streams, 257 prefix=table_prefix or "", 258 client_id=self.client_id, 259 client_secret=self.client_secret, 260 ) 261 262 return CloudConnection( 263 workspace=self, 264 connection_id=deployed_connection.connection_id, 265 source=deployed_connection.source_id, 266 destination=deployed_connection.destination_id, 267 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
269 def get_connection( 270 self, 271 connection_id: str, 272 ) -> CloudConnection: 273 """Get a connection by ID. 274 275 This method does not fetch data from the API. It returns a `CloudConnection` object, 276 which will be loaded lazily as needed. 277 """ 278 return CloudConnection( 279 workspace=self, 280 connection_id=connection_id, 281 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
283 def permanently_delete_connection( 284 self, 285 connection: str | CloudConnection, 286 *, 287 cascade_delete_source: bool = False, 288 cascade_delete_destination: bool = False, 289 ) -> None: 290 """Delete a deployed connection from the workspace.""" 291 if connection is None: 292 raise ValueError("No connection ID provided.") 293 294 if isinstance(connection, str): 295 connection = CloudConnection( 296 workspace=self, 297 connection_id=connection, 298 ) 299 300 api_util.delete_connection( 301 connection_id=connection.connection_id, 302 api_root=self.api_root, 303 workspace_id=self.workspace_id, 304 client_id=self.client_id, 305 client_secret=self.client_secret, 306 ) 307 308 if cascade_delete_source: 309 self.permanently_delete_source(source=connection.source_id) 310 if cascade_delete_destination: 311 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
315 def list_connections( 316 self, 317 name: str | None = None, 318 *, 319 name_filter: Callable | None = None, 320 ) -> list[CloudConnection]: 321 """List connections by name in the workspace.""" 322 connections = api_util.list_connections( 323 api_root=self.api_root, 324 workspace_id=self.workspace_id, 325 name=name, 326 name_filter=name_filter, 327 client_id=self.client_id, 328 client_secret=self.client_secret, 329 ) 330 return [ 331 CloudConnection( 332 workspace=self, 333 connection_id=connection.connection_id, 334 source=None, 335 destination=None, 336 ) 337 for connection in connections 338 if name is None or connection.name == name 339 ]
List connections by name in the workspace.
341 def list_sources( 342 self, 343 name: str | None = None, 344 *, 345 name_filter: Callable | None = None, 346 ) -> list[CloudSource]: 347 """List all sources in the workspace.""" 348 sources = api_util.list_sources( 349 api_root=self.api_root, 350 workspace_id=self.workspace_id, 351 name=name, 352 name_filter=name_filter, 353 client_id=self.client_id, 354 client_secret=self.client_secret, 355 ) 356 return [ 357 CloudSource( 358 workspace=self, 359 connector_id=source.source_id, 360 ) 361 for source in sources 362 if name is None or source.name == name 363 ]
List all sources in the workspace.
365 def list_destinations( 366 self, 367 name: str | None = None, 368 *, 369 name_filter: Callable | None = None, 370 ) -> list[CloudDestination]: 371 """List all destinations in the workspace.""" 372 destinations = api_util.list_destinations( 373 api_root=self.api_root, 374 workspace_id=self.workspace_id, 375 name=name, 376 name_filter=name_filter, 377 client_id=self.client_id, 378 client_secret=self.client_secret, 379 ) 380 return [ 381 CloudDestination( 382 workspace=self, 383 connector_id=destination.destination_id, 384 ) 385 for destination in destinations 386 if name is None or destination.name == name 387 ]
List all destinations in the workspace.
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 # Properties 69 70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id) 80 81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object 92 93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id) 103 104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object 115 116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []] 123 124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or "" 131 132 @property 133 def connection_url(self) -> str | None: 134 """The URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 136 137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/job-history" 141 142 # Run Sync 143 144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result 172 173 # Logs 174 175 def get_previous_sync_logs( 176 self, 177 *, 178 limit: int = 10, 179 ) -> list[SyncResult]: 180 """Get the previous sync logs for a connection.""" 181 sync_logs: list[JobResponse] = api_util.get_job_logs( 182 connection_id=self.connection_id, 183 api_root=self.workspace.api_root, 184 workspace_id=self.workspace.workspace_id, 185 limit=limit, 186 client_id=self.workspace.client_id, 187 client_secret=self.workspace.client_secret, 188 ) 189 return [ 190 SyncResult( 191 workspace=self.workspace, 192 connection=self, 193 job_id=sync_log.job_id, 194 _latest_job_info=sync_log, 195 ) 196 for sync_log in sync_logs 197 ] 198 199 def get_sync_result( 200 self, 201 job_id: int | None = None, 202 ) -> SyncResult | None: 203 """Get the sync result for the connection. 204 205 If `job_id` is not provided, the most recent sync job will be used. 206 207 Returns `None` if job_id is omitted and no previous jobs are found. 208 """ 209 if job_id is None: 210 # Get the most recent sync job 211 results = self.get_previous_sync_logs( 212 limit=1, 213 ) 214 if results: 215 return results[0] 216 217 return None 218 219 # Get the sync job by ID (lazy loaded) 220 return SyncResult( 221 workspace=self.workspace, 222 connection=self, 223 job_id=job_id, 224 ) 225 226 # Deletions 227 228 def permanently_delete( 229 self, 230 *, 231 cascade_delete_source: bool = False, 232 cascade_delete_destination: bool = False, 233 ) -> None: 234 """Delete the connection. 235 236 Args: 237 cascade_delete_source: Whether to also delete the source. 238 cascade_delete_destination: Whether to also delete the destination. 239 """ 240 self.workspace.permanently_delete_connection(self) 241 242 if cascade_delete_source: 243 self.workspace.permanently_delete_source(self.source_id) 244 245 if cascade_delete_destination: 246 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id)
The ID of the source.
81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object
Get the source object.
93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id)
The ID of the destination.
104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object
Get the destination object.
116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or ""
The table prefix.
132 @property 133 def connection_url(self) -> str | None: 134 """The URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The URL to the connection.
137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/job-history"
The URL to the job history for the connection.
144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result
Run a sync.
175 def get_previous_sync_logs( 176 self, 177 *, 178 limit: int = 10, 179 ) -> list[SyncResult]: 180 """Get the previous sync logs for a connection.""" 181 sync_logs: list[JobResponse] = api_util.get_job_logs( 182 connection_id=self.connection_id, 183 api_root=self.workspace.api_root, 184 workspace_id=self.workspace.workspace_id, 185 limit=limit, 186 client_id=self.workspace.client_id, 187 client_secret=self.workspace.client_secret, 188 ) 189 return [ 190 SyncResult( 191 workspace=self.workspace, 192 connection=self, 193 job_id=sync_log.job_id, 194 _latest_job_info=sync_log, 195 ) 196 for sync_log in sync_logs 197 ]
Get the previous sync logs for a connection.
199 def get_sync_result( 200 self, 201 job_id: int | None = None, 202 ) -> SyncResult | None: 203 """Get the sync result for the connection. 204 205 If `job_id` is not provided, the most recent sync job will be used. 206 207 Returns `None` if job_id is omitted and no previous jobs are found. 208 """ 209 if job_id is None: 210 # Get the most recent sync job 211 results = self.get_previous_sync_logs( 212 limit=1, 213 ) 214 if results: 215 return results[0] 216 217 return None 218 219 # Get the sync job by ID (lazy loaded) 220 return SyncResult( 221 workspace=self.workspace, 222 connection=self, 223 job_id=job_id, 224 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
228 def permanently_delete( 229 self, 230 *, 231 cascade_delete_source: bool = False, 232 cascade_delete_destination: bool = False, 233 ) -> None: 234 """Delete the connection. 235 236 Args: 237 cascade_delete_source: Whether to also delete the source. 238 cascade_delete_destination: Whether to also delete the destination. 239 """ 240 self.workspace.permanently_delete_connection(self) 241 242 if cascade_delete_source: 243 self.workspace.permanently_delete_source(self.source_id) 244 245 if cascade_delete_destination: 246 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
129@dataclass 130class SyncResult: 131 """The result of a sync operation. 132 133 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 134 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 135 """ 136 137 workspace: CloudWorkspace 138 connection: CloudConnection 139 job_id: int 140 table_name_prefix: str = "" 141 table_name_suffix: str = "" 142 _latest_job_info: JobResponse | None = None 143 _connection_response: ConnectionResponse | None = None 144 _cache: CacheBase | None = None 145 146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}" 150 151 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 152 """Return connection info for the sync job.""" 153 if self._connection_response and not force_refresh: 154 return self._connection_response 155 156 self._connection_response = api_util.get_connection( 157 workspace_id=self.workspace.workspace_id, 158 api_root=self.workspace.api_root, 159 connection_id=self.connection.connection_id, 160 client_id=self.workspace.client_id, 161 client_secret=self.workspace.client_secret, 162 ) 163 return self._connection_response 164 165 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 166 """Return the destination configuration for the sync job.""" 167 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 168 destination_response = api_util.get_destination( 169 destination_id=connection_info.destination_id, 170 api_root=self.workspace.api_root, 171 client_id=self.workspace.client_id, 172 client_secret=self.workspace.client_secret, 173 ) 174 return asdict(destination_response.configuration) 175 176 def is_job_complete(self) -> bool: 177 """Check if the sync job is complete.""" 178 return self.get_job_status() in FINAL_STATUSES 179 180 def get_job_status(self) -> JobStatusEnum: 181 """Check if the sync job is still running.""" 182 return self._fetch_latest_job_info().status 183 184 def _fetch_latest_job_info(self) -> JobResponse: 185 """Return the job info for the sync job.""" 186 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 187 return self._latest_job_info 188 189 self._latest_job_info = api_util.get_job_info( 190 job_id=self.job_id, 191 api_root=self.workspace.api_root, 192 client_id=self.workspace.client_id, 193 client_secret=self.workspace.client_secret, 194 ) 195 return self._latest_job_info 196 197 @property 198 def bytes_synced(self) -> int: 199 """Return the number of records processed.""" 200 return self._fetch_latest_job_info().bytes_synced or 0 201 202 @property 203 def records_synced(self) -> int: 204 """Return the number of records processed.""" 205 return self._fetch_latest_job_info().rows_synced or 0 206 207 @property 208 def start_time(self) -> datetime: 209 """Return the start time of the sync job in UTC.""" 210 # Parse from ISO 8601 format: 211 return datetime.fromisoformat(self._fetch_latest_job_info().start_time) 212 213 def raise_failure_status( 214 self, 215 *, 216 refresh_status: bool = False, 217 ) -> None: 218 """Raise an exception if the sync job failed. 219 220 By default, this method will use the latest status available. If you want to refresh the 221 status before checking for failure, set `refresh_status=True`. If the job has failed, this 222 method will raise a `AirbyteConnectionSyncError`. 223 224 Otherwise, do nothing. 225 """ 226 if not refresh_status and self._latest_job_info: 227 latest_status = self._latest_job_info.status 228 else: 229 latest_status = self.get_job_status() 230 231 if latest_status in FAILED_STATUSES: 232 raise AirbyteConnectionSyncError( 233 workspace=self.workspace, 234 connection_id=self.connection.connection_id, 235 job_id=self.job_id, 236 job_status=self.get_job_status(), 237 ) 238 239 def wait_for_completion( 240 self, 241 *, 242 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 243 raise_timeout: bool = True, 244 raise_failure: bool = False, 245 ) -> JobStatusEnum: 246 """Wait for a job to finish running.""" 247 start_time = time.time() 248 while True: 249 latest_status = self.get_job_status() 250 if latest_status in FINAL_STATUSES: 251 if raise_failure: 252 # No-op if the job succeeded or is still running: 253 self.raise_failure_status() 254 255 return latest_status 256 257 if time.time() - start_time > wait_timeout: 258 if raise_timeout: 259 raise AirbyteConnectionSyncTimeoutError( 260 workspace=self.workspace, 261 connection_id=self.connection.connection_id, 262 job_id=self.job_id, 263 job_status=latest_status, 264 timeout=wait_timeout, 265 ) 266 267 return latest_status # This will be a non-final status 268 269 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 270 271 def get_sql_cache(self) -> CacheBase: 272 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 273 if self._cache: 274 return self._cache 275 276 destination_configuration = self._get_destination_configuration() 277 self._cache = destination_to_cache(destination_configuration=destination_configuration) 278 return self._cache 279 280 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 281 """Return a SQL Engine for querying a SQL-based destination.""" 282 return self.get_sql_cache().get_sql_engine() 283 284 def get_sql_table_name(self, stream_name: str) -> str: 285 """Return the SQL table name of the named stream.""" 286 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 287 288 def get_sql_table( 289 self, 290 stream_name: str, 291 ) -> sqlalchemy.Table: 292 """Return a SQLAlchemy table object for the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table(stream_name) 294 295 def get_dataset(self, stream_name: str) -> CachedDataset: 296 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 297 298 This can be used to read and analyze the data in a SQL-based destination. 299 300 TODO: In a future iteration, we can consider providing stream configuration information 301 (catalog information) to the `CachedDataset` object via the "Get stream properties" 302 API: https://reference.airbyte.com/reference/getstreamproperties 303 """ 304 return CachedDataset( 305 self.get_sql_cache(), 306 stream_name=stream_name, 307 stream_configuration=False, # Don't look for stream configuration in cache. 308 ) 309 310 def get_sql_database_name(self) -> str: 311 """Return the SQL database name.""" 312 cache = self.get_sql_cache() 313 return cache.get_database_name() 314 315 def get_sql_schema_name(self) -> str: 316 """Return the SQL schema name.""" 317 cache = self.get_sql_cache() 318 return cache.schema_name 319 320 @property 321 def stream_names(self) -> list[str]: 322 """Return the set of stream names.""" 323 return self.connection.stream_names 324 325 @final 326 @property 327 def streams( 328 self, 329 ) -> _SyncResultStreams: 330 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 331 332 This is a convenience wrapper around the `stream_names` 333 property and `get_dataset()` method. 334 """ 335 return self._SyncResultStreams(self) 336 337 class _SyncResultStreams(Mapping[str, CachedDataset]): 338 """A mapping of stream names to cached datasets.""" 339 340 def __init__( 341 self, 342 parent: SyncResult, 343 /, 344 ) -> None: 345 self.parent: SyncResult = parent 346 347 def __getitem__(self, key: str) -> CachedDataset: 348 return self.parent.get_dataset(stream_name=key) 349 350 def __iter__(self) -> Iterator[str]: 351 return iter(self.parent.stream_names) 352 353 def __len__(self) -> int: 354 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult
by
interacting with the .CloudWorkspace
and .CloudConnection
objects.
146 @property 147 def job_url(self) -> str: 148 """Return the URL of the sync job.""" 149 return f"{self.connection.job_history_url}/{self.job_id}"
Return the URL of the sync job.
176 def is_job_complete(self) -> bool: 177 """Check if the sync job is complete.""" 178 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
180 def get_job_status(self) -> JobStatusEnum: 181 """Check if the sync job is still running.""" 182 return self._fetch_latest_job_info().status
Check if the sync job is still running.
197 @property 198 def bytes_synced(self) -> int: 199 """Return the number of records processed.""" 200 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
202 @property 203 def records_synced(self) -> int: 204 """Return the number of records processed.""" 205 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
207 @property 208 def start_time(self) -> datetime: 209 """Return the start time of the sync job in UTC.""" 210 # Parse from ISO 8601 format: 211 return datetime.fromisoformat(self._fetch_latest_job_info().start_time)
Return the start time of the sync job in UTC.
213 def raise_failure_status( 214 self, 215 *, 216 refresh_status: bool = False, 217 ) -> None: 218 """Raise an exception if the sync job failed. 219 220 By default, this method will use the latest status available. If you want to refresh the 221 status before checking for failure, set `refresh_status=True`. If the job has failed, this 222 method will raise a `AirbyteConnectionSyncError`. 223 224 Otherwise, do nothing. 225 """ 226 if not refresh_status and self._latest_job_info: 227 latest_status = self._latest_job_info.status 228 else: 229 latest_status = self.get_job_status() 230 231 if latest_status in FAILED_STATUSES: 232 raise AirbyteConnectionSyncError( 233 workspace=self.workspace, 234 connection_id=self.connection.connection_id, 235 job_id=self.job_id, 236 job_status=self.get_job_status(), 237 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True
. If the job has failed, this
method will raise a AirbyteConnectionSyncError
.
Otherwise, do nothing.
239 def wait_for_completion( 240 self, 241 *, 242 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 243 raise_timeout: bool = True, 244 raise_failure: bool = False, 245 ) -> JobStatusEnum: 246 """Wait for a job to finish running.""" 247 start_time = time.time() 248 while True: 249 latest_status = self.get_job_status() 250 if latest_status in FINAL_STATUSES: 251 if raise_failure: 252 # No-op if the job succeeded or is still running: 253 self.raise_failure_status() 254 255 return latest_status 256 257 if time.time() - start_time > wait_timeout: 258 if raise_timeout: 259 raise AirbyteConnectionSyncTimeoutError( 260 workspace=self.workspace, 261 connection_id=self.connection.connection_id, 262 job_id=self.job_id, 263 job_status=latest_status, 264 timeout=wait_timeout, 265 ) 266 267 return latest_status # This will be a non-final status 268 269 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
271 def get_sql_cache(self) -> CacheBase: 272 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 273 if self._cache: 274 return self._cache 275 276 destination_configuration = self._get_destination_configuration() 277 self._cache = destination_to_cache(destination_configuration=destination_configuration) 278 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
280 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 281 """Return a SQL Engine for querying a SQL-based destination.""" 282 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
284 def get_sql_table_name(self, stream_name: str) -> str: 285 """Return the SQL table name of the named stream.""" 286 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
288 def get_sql_table( 289 self, 290 stream_name: str, 291 ) -> sqlalchemy.Table: 292 """Return a SQLAlchemy table object for the named stream.""" 293 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
295 def get_dataset(self, stream_name: str) -> CachedDataset: 296 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 297 298 This can be used to read and analyze the data in a SQL-based destination. 299 300 TODO: In a future iteration, we can consider providing stream configuration information 301 (catalog information) to the `CachedDataset` object via the "Get stream properties" 302 API: https://reference.airbyte.com/reference/getstreamproperties 303 """ 304 return CachedDataset( 305 self.get_sql_cache(), 306 stream_name=stream_name, 307 stream_configuration=False, # Don't look for stream configuration in cache. 308 )
Retrieve an airbyte.datasets.CachedDataset
object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset
object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
310 def get_sql_database_name(self) -> str: 311 """Return the SQL database name.""" 312 cache = self.get_sql_cache() 313 return cache.get_database_name()
Return the SQL database name.
315 def get_sql_schema_name(self) -> str: 316 """Return the SQL schema name.""" 317 cache = self.get_sql_cache() 318 return cache.schema_name
Return the SQL schema name.
320 @property 321 def stream_names(self) -> list[str]: 322 """Return the set of stream names.""" 323 return self.connection.stream_names
Return the set of stream names.
325 @final 326 @property 327 def streams( 328 self, 329 ) -> _SyncResultStreams: 330 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 331 332 This is a convenience wrapper around the `stream_names` 333 property and `get_dataset()` method. 334 """ 335 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset
objects.
This is a convenience wrapper around the stream_names
property and get_dataset()
method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans