airbyte.cloud     
                        PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
    print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.constants import JobStatusEnum 58from airbyte.cloud.sync_results import SyncResult 59from airbyte.cloud.workspaces import CloudWorkspace 60 61 62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 63if TYPE_CHECKING: 64 # ruff: noqa: TC004 65 from airbyte.cloud import connections, constants, sync_results, workspaces 66 67 68__all__ = [ 69 # Submodules 70 "workspaces", 71 "connections", 72 "constants", 73 "sync_results", 74 # Classes 75 "CloudWorkspace", 76 "CloudConnection", 77 "SyncResult", 78 # Enums 79 "JobStatusEnum", 80]
64@dataclass 65class CloudWorkspace: 66 """A remote workspace on the Airbyte Cloud. 67 68 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 69 instances, both OSS and Enterprise. 70 """ 71 72 workspace_id: str 73 client_id: SecretString 74 client_secret: SecretString 75 api_root: str = api_util.CLOUD_API_ROOT 76 77 def __post_init__(self) -> None: 78 """Ensure that the client ID and secret are handled securely.""" 79 self.client_id = SecretString(self.client_id) 80 self.client_secret = SecretString(self.client_secret) 81 82 @property 83 def workspace_url(self) -> str | None: 84 """The web URL of the workspace.""" 85 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 86 87 # Test connection and creds 88 89 def connect(self) -> None: 90 """Check that the workspace is reachable and raise an exception otherwise. 91 92 Note: It is not necessary to call this method before calling other operations. It 93 serves primarily as a simple check to ensure that the workspace is reachable 94 and credentials are correct. 95 """ 96 _ = api_util.get_workspace( 97 api_root=self.api_root, 98 workspace_id=self.workspace_id, 99 client_id=self.client_id, 100 client_secret=self.client_secret, 101 ) 102 print(f"Successfully connected to workspace: {self.workspace_url}") 103 104 # Get sources, destinations, and connections 105 106 def get_connection( 107 self, 108 connection_id: str, 109 ) -> CloudConnection: 110 """Get a connection by ID. 111 112 This method does not fetch data from the API. It returns a `CloudConnection` object, 113 which will be loaded lazily as needed. 114 """ 115 return CloudConnection( 116 workspace=self, 117 connection_id=connection_id, 118 ) 119 120 def get_source( 121 self, 122 source_id: str, 123 ) -> CloudSource: 124 """Get a source by ID. 125 126 This method does not fetch data from the API. It returns a `CloudSource` object, 127 which will be loaded lazily as needed. 128 """ 129 return CloudSource( 130 workspace=self, 131 connector_id=source_id, 132 ) 133 134 def get_destination( 135 self, 136 destination_id: str, 137 ) -> CloudDestination: 138 """Get a destination by ID. 139 140 This method does not fetch data from the API. It returns a `CloudDestination` object, 141 which will be loaded lazily as needed. 142 """ 143 return CloudDestination( 144 workspace=self, 145 connector_id=destination_id, 146 ) 147 148 # Deploy sources and destinations 149 150 def deploy_source( 151 self, 152 name: str, 153 source: Source, 154 *, 155 unique: bool = True, 156 random_name_suffix: bool = False, 157 ) -> CloudSource: 158 """Deploy a source to the workspace. 159 160 Returns the newly deployed source. 161 162 Args: 163 name: The name to use when deploying. 164 source: The source object to deploy. 165 unique: Whether to require a unique name. If `True`, duplicate names 166 are not allowed. Defaults to `True`. 167 random_name_suffix: Whether to append a random suffix to the name. 168 """ 169 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 170 source_config_dict["sourceType"] = source.name.replace("source-", "") 171 172 if random_name_suffix: 173 name += f" (ID: {text_util.generate_random_suffix()})" 174 175 if unique: 176 existing = self.list_sources(name=name) 177 if existing: 178 raise exc.AirbyteDuplicateResourcesError( 179 resource_type="source", 180 resource_name=name, 181 ) 182 183 deployed_source = api_util.create_source( 184 name=name, 185 api_root=self.api_root, 186 workspace_id=self.workspace_id, 187 config=source_config_dict, 188 client_id=self.client_id, 189 client_secret=self.client_secret, 190 ) 191 return CloudSource( 192 workspace=self, 193 connector_id=deployed_source.source_id, 194 ) 195 196 def deploy_destination( 197 self, 198 name: str, 199 destination: Destination | dict[str, Any], 200 *, 201 unique: bool = True, 202 random_name_suffix: bool = False, 203 ) -> CloudDestination: 204 """Deploy a destination to the workspace. 205 206 Returns the newly deployed destination ID. 207 208 Args: 209 name: The name to use when deploying. 210 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 211 dictionary of configuration values. 212 unique: Whether to require a unique name. If `True`, duplicate names 213 are not allowed. Defaults to `True`. 214 random_name_suffix: Whether to append a random suffix to the name. 215 """ 216 if isinstance(destination, Destination): 217 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 218 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 219 # raise ValueError(destination_conf_dict) 220 else: 221 destination_conf_dict = destination.copy() 222 if "destinationType" not in destination_conf_dict: 223 raise exc.PyAirbyteInputError( 224 message="Missing `destinationType` in configuration dictionary.", 225 ) 226 227 if random_name_suffix: 228 name += f" (ID: {text_util.generate_random_suffix()})" 229 230 if unique: 231 existing = self.list_destinations(name=name) 232 if existing: 233 raise exc.AirbyteDuplicateResourcesError( 234 resource_type="destination", 235 resource_name=name, 236 ) 237 238 deployed_destination = api_util.create_destination( 239 name=name, 240 api_root=self.api_root, 241 workspace_id=self.workspace_id, 242 config=destination_conf_dict, # Wants a dataclass but accepts dict 243 client_id=self.client_id, 244 client_secret=self.client_secret, 245 ) 246 return CloudDestination( 247 workspace=self, 248 connector_id=deployed_destination.destination_id, 249 ) 250 251 def permanently_delete_source( 252 self, 253 source: str | CloudSource, 254 ) -> None: 255 """Delete a source from the workspace. 256 257 You can pass either the source ID `str` or a deployed `Source` object. 258 """ 259 if not isinstance(source, (str, CloudSource)): 260 raise exc.PyAirbyteInputError( 261 message="Invalid source type.", 262 input_value=type(source).__name__, 263 ) 264 265 api_util.delete_source( 266 source_id=source.connector_id if isinstance(source, CloudSource) else source, 267 api_root=self.api_root, 268 client_id=self.client_id, 269 client_secret=self.client_secret, 270 ) 271 272 # Deploy and delete destinations 273 274 def permanently_delete_destination( 275 self, 276 destination: str | CloudDestination, 277 ) -> None: 278 """Delete a deployed destination from the workspace. 279 280 You can pass either the `Cache` class or the deployed destination ID as a `str`. 281 """ 282 if not isinstance(destination, (str, CloudDestination)): 283 raise exc.PyAirbyteInputError( 284 message="Invalid destination type.", 285 input_value=type(destination).__name__, 286 ) 287 288 api_util.delete_destination( 289 destination_id=( 290 destination if isinstance(destination, str) else destination.destination_id 291 ), 292 api_root=self.api_root, 293 client_id=self.client_id, 294 client_secret=self.client_secret, 295 ) 296 297 # Deploy and delete connections 298 299 def deploy_connection( 300 self, 301 connection_name: str, 302 *, 303 source: CloudSource | str, 304 selected_streams: list[str], 305 destination: CloudDestination | str, 306 table_prefix: str | None = None, 307 ) -> CloudConnection: 308 """Create a new connection between an already deployed source and destination. 309 310 Returns the newly deployed connection object. 311 312 Args: 313 connection_name: The name of the connection. 314 source: The deployed source. You can pass a source ID or a CloudSource object. 315 destination: The deployed destination. You can pass a destination ID or a 316 CloudDestination object. 317 table_prefix: Optional. The table prefix to use when syncing to the destination. 318 selected_streams: The selected stream names to sync within the connection. 319 """ 320 if not selected_streams: 321 raise exc.PyAirbyteInputError( 322 guidance="You must provide `selected_streams` when creating a connection." 323 ) 324 325 source_id: str = source if isinstance(source, str) else source.connector_id 326 destination_id: str = ( 327 destination if isinstance(destination, str) else destination.connector_id 328 ) 329 330 deployed_connection = api_util.create_connection( 331 name=connection_name, 332 source_id=source_id, 333 destination_id=destination_id, 334 api_root=self.api_root, 335 workspace_id=self.workspace_id, 336 selected_stream_names=selected_streams, 337 prefix=table_prefix or "", 338 client_id=self.client_id, 339 client_secret=self.client_secret, 340 ) 341 342 return CloudConnection( 343 workspace=self, 344 connection_id=deployed_connection.connection_id, 345 source=deployed_connection.source_id, 346 destination=deployed_connection.destination_id, 347 ) 348 349 def permanently_delete_connection( 350 self, 351 connection: str | CloudConnection, 352 *, 353 cascade_delete_source: bool = False, 354 cascade_delete_destination: bool = False, 355 ) -> None: 356 """Delete a deployed connection from the workspace.""" 357 if connection is None: 358 raise ValueError("No connection ID provided.") 359 360 if isinstance(connection, str): 361 connection = CloudConnection( 362 workspace=self, 363 connection_id=connection, 364 ) 365 366 api_util.delete_connection( 367 connection_id=connection.connection_id, 368 api_root=self.api_root, 369 workspace_id=self.workspace_id, 370 client_id=self.client_id, 371 client_secret=self.client_secret, 372 ) 373 374 if cascade_delete_source: 375 self.permanently_delete_source(source=connection.source_id) 376 if cascade_delete_destination: 377 self.permanently_delete_destination(destination=connection.destination_id) 378 379 # List sources, destinations, and connections 380 381 def list_connections( 382 self, 383 name: str | None = None, 384 *, 385 name_filter: Callable | None = None, 386 ) -> list[CloudConnection]: 387 """List connections by name in the workspace. 388 389 TODO: Add pagination support 390 """ 391 connections = api_util.list_connections( 392 api_root=self.api_root, 393 workspace_id=self.workspace_id, 394 name=name, 395 name_filter=name_filter, 396 client_id=self.client_id, 397 client_secret=self.client_secret, 398 ) 399 return [ 400 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 401 workspace=self, 402 connection_response=connection, 403 ) 404 for connection in connections 405 if name is None or connection.name == name 406 ] 407 408 def list_sources( 409 self, 410 name: str | None = None, 411 *, 412 name_filter: Callable | None = None, 413 ) -> list[CloudSource]: 414 """List all sources in the workspace. 415 416 TODO: Add pagination support 417 """ 418 sources = api_util.list_sources( 419 api_root=self.api_root, 420 workspace_id=self.workspace_id, 421 name=name, 422 name_filter=name_filter, 423 client_id=self.client_id, 424 client_secret=self.client_secret, 425 ) 426 return [ 427 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 428 workspace=self, 429 source_response=source, 430 ) 431 for source in sources 432 if name is None or source.name == name 433 ] 434 435 def list_destinations( 436 self, 437 name: str | None = None, 438 *, 439 name_filter: Callable | None = None, 440 ) -> list[CloudDestination]: 441 """List all destinations in the workspace. 442 443 TODO: Add pagination support 444 """ 445 destinations = api_util.list_destinations( 446 api_root=self.api_root, 447 workspace_id=self.workspace_id, 448 name=name, 449 name_filter=name_filter, 450 client_id=self.client_id, 451 client_secret=self.client_secret, 452 ) 453 return [ 454 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 455 workspace=self, 456 destination_response=destination, 457 ) 458 for destination in destinations 459 if name is None or destination.name == name 460 ] 461 462 def publish_custom_source_definition( 463 self, 464 name: str, 465 *, 466 manifest_yaml: dict[str, Any] | Path | str | None = None, 467 docker_image: str | None = None, 468 docker_tag: str | None = None, 469 unique: bool = True, 470 pre_validate: bool = True, 471 ) -> CustomCloudSourceDefinition: 472 """Publish a custom source connector definition. 473 474 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 475 and docker_tag (for Docker connectors), but not both. 476 477 Args: 478 name: Display name for the connector definition 479 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 480 docker_image: Docker repository (e.g., 'airbyte/source-custom') 481 docker_tag: Docker image tag (e.g., '1.0.0') 482 unique: Whether to enforce name uniqueness 483 pre_validate: Whether to validate manifest client-side (YAML only) 484 485 Returns: 486 CustomCloudSourceDefinition object representing the created definition 487 488 Raises: 489 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 490 AirbyteDuplicateResourcesError: If unique=True and name already exists 491 """ 492 is_yaml = manifest_yaml is not None 493 is_docker = docker_image is not None 494 495 if is_yaml == is_docker: 496 raise exc.PyAirbyteInputError( 497 message=( 498 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 499 "docker_image + docker_tag (for Docker connectors), but not both" 500 ), 501 context={ 502 "manifest_yaml_provided": is_yaml, 503 "docker_image_provided": is_docker, 504 }, 505 ) 506 507 if is_docker and docker_tag is None: 508 raise exc.PyAirbyteInputError( 509 message="docker_tag is required when docker_image is specified", 510 context={"docker_image": docker_image}, 511 ) 512 513 if unique: 514 existing = self.list_custom_source_definitions( 515 definition_type="yaml" if is_yaml else "docker", 516 ) 517 if any(d.name == name for d in existing): 518 raise exc.AirbyteDuplicateResourcesError( 519 resource_type="custom_source_definition", 520 resource_name=name, 521 ) 522 523 if is_yaml: 524 manifest_dict: dict[str, Any] 525 if isinstance(manifest_yaml, Path): 526 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 527 elif isinstance(manifest_yaml, str): 528 manifest_dict = yaml.safe_load(manifest_yaml) 529 elif manifest_yaml is not None: 530 manifest_dict = manifest_yaml 531 else: 532 raise exc.PyAirbyteInputError( 533 message="manifest_yaml is required for YAML connectors", 534 context={"name": name}, 535 ) 536 537 if pre_validate: 538 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 539 540 result = api_util.create_custom_yaml_source_definition( 541 name=name, 542 workspace_id=self.workspace_id, 543 manifest=manifest_dict, 544 api_root=self.api_root, 545 client_id=self.client_id, 546 client_secret=self.client_secret, 547 ) 548 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 549 550 raise NotImplementedError( 551 "Docker custom source definitions are not yet supported. " 552 "Only YAML manifest-based custom sources are currently available." 553 ) 554 555 def list_custom_source_definitions( 556 self, 557 *, 558 definition_type: Literal["yaml", "docker"], 559 ) -> list[CustomCloudSourceDefinition]: 560 """List custom source connector definitions. 561 562 Args: 563 definition_type: Connector type to list ("yaml" or "docker"). Required. 564 565 Returns: 566 List of CustomCloudSourceDefinition objects matching the specified type 567 """ 568 if definition_type == "yaml": 569 yaml_definitions = api_util.list_custom_yaml_source_definitions( 570 workspace_id=self.workspace_id, 571 api_root=self.api_root, 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 ) 575 return [ 576 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 577 for d in yaml_definitions 578 ] 579 580 raise NotImplementedError( 581 "Docker custom source definitions are not yet supported. " 582 "Only YAML manifest-based custom sources are currently available." 583 ) 584 585 def get_custom_source_definition( 586 self, 587 definition_id: str, 588 *, 589 definition_type: Literal["yaml", "docker"], 590 ) -> CustomCloudSourceDefinition: 591 """Get a specific custom source definition by ID. 592 593 Args: 594 definition_id: The definition ID 595 definition_type: Connector type ("yaml" or "docker"). Required. 596 597 Returns: 598 CustomCloudSourceDefinition object 599 """ 600 if definition_type == "yaml": 601 result = api_util.get_custom_yaml_source_definition( 602 workspace_id=self.workspace_id, 603 definition_id=definition_id, 604 api_root=self.api_root, 605 client_id=self.client_id, 606 client_secret=self.client_secret, 607 ) 608 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 609 610 raise NotImplementedError( 611 "Docker custom source definitions are not yet supported. " 612 "Only YAML manifest-based custom sources are currently available." 613 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
82 @property 83 def workspace_url(self) -> str | None: 84 """The web URL of the workspace.""" 85 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
89 def connect(self) -> None: 90 """Check that the workspace is reachable and raise an exception otherwise. 91 92 Note: It is not necessary to call this method before calling other operations. It 93 serves primarily as a simple check to ensure that the workspace is reachable 94 and credentials are correct. 95 """ 96 _ = api_util.get_workspace( 97 api_root=self.api_root, 98 workspace_id=self.workspace_id, 99 client_id=self.client_id, 100 client_secret=self.client_secret, 101 ) 102 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
106 def get_connection( 107 self, 108 connection_id: str, 109 ) -> CloudConnection: 110 """Get a connection by ID. 111 112 This method does not fetch data from the API. It returns a `CloudConnection` object, 113 which will be loaded lazily as needed. 114 """ 115 return CloudConnection( 116 workspace=self, 117 connection_id=connection_id, 118 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
120 def get_source( 121 self, 122 source_id: str, 123 ) -> CloudSource: 124 """Get a source by ID. 125 126 This method does not fetch data from the API. It returns a `CloudSource` object, 127 which will be loaded lazily as needed. 128 """ 129 return CloudSource( 130 workspace=self, 131 connector_id=source_id, 132 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
134 def get_destination( 135 self, 136 destination_id: str, 137 ) -> CloudDestination: 138 """Get a destination by ID. 139 140 This method does not fetch data from the API. It returns a `CloudDestination` object, 141 which will be loaded lazily as needed. 142 """ 143 return CloudDestination( 144 workspace=self, 145 connector_id=destination_id, 146 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
150 def deploy_source( 151 self, 152 name: str, 153 source: Source, 154 *, 155 unique: bool = True, 156 random_name_suffix: bool = False, 157 ) -> CloudSource: 158 """Deploy a source to the workspace. 159 160 Returns the newly deployed source. 161 162 Args: 163 name: The name to use when deploying. 164 source: The source object to deploy. 165 unique: Whether to require a unique name. If `True`, duplicate names 166 are not allowed. Defaults to `True`. 167 random_name_suffix: Whether to append a random suffix to the name. 168 """ 169 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 170 source_config_dict["sourceType"] = source.name.replace("source-", "") 171 172 if random_name_suffix: 173 name += f" (ID: {text_util.generate_random_suffix()})" 174 175 if unique: 176 existing = self.list_sources(name=name) 177 if existing: 178 raise exc.AirbyteDuplicateResourcesError( 179 resource_type="source", 180 resource_name=name, 181 ) 182 183 deployed_source = api_util.create_source( 184 name=name, 185 api_root=self.api_root, 186 workspace_id=self.workspace_id, 187 config=source_config_dict, 188 client_id=self.client_id, 189 client_secret=self.client_secret, 190 ) 191 return CloudSource( 192 workspace=self, 193 connector_id=deployed_source.source_id, 194 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique:  Whether to require a unique name. If True, duplicate names are not allowed. Defaults toTrue.
- random_name_suffix: Whether to append a random suffix to the name.
196 def deploy_destination( 197 self, 198 name: str, 199 destination: Destination | dict[str, Any], 200 *, 201 unique: bool = True, 202 random_name_suffix: bool = False, 203 ) -> CloudDestination: 204 """Deploy a destination to the workspace. 205 206 Returns the newly deployed destination ID. 207 208 Args: 209 name: The name to use when deploying. 210 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 211 dictionary of configuration values. 212 unique: Whether to require a unique name. If `True`, duplicate names 213 are not allowed. Defaults to `True`. 214 random_name_suffix: Whether to append a random suffix to the name. 215 """ 216 if isinstance(destination, Destination): 217 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 218 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 219 # raise ValueError(destination_conf_dict) 220 else: 221 destination_conf_dict = destination.copy() 222 if "destinationType" not in destination_conf_dict: 223 raise exc.PyAirbyteInputError( 224 message="Missing `destinationType` in configuration dictionary.", 225 ) 226 227 if random_name_suffix: 228 name += f" (ID: {text_util.generate_random_suffix()})" 229 230 if unique: 231 existing = self.list_destinations(name=name) 232 if existing: 233 raise exc.AirbyteDuplicateResourcesError( 234 resource_type="destination", 235 resource_name=name, 236 ) 237 238 deployed_destination = api_util.create_destination( 239 name=name, 240 api_root=self.api_root, 241 workspace_id=self.workspace_id, 242 config=destination_conf_dict, # Wants a dataclass but accepts dict 243 client_id=self.client_id, 244 client_secret=self.client_secret, 245 ) 246 return CloudDestination( 247 workspace=self, 248 connector_id=deployed_destination.destination_id, 249 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination:  The destination to deploy. Can be a local Airbyte Destinationobject or a dictionary of configuration values.
- unique:  Whether to require a unique name. If True, duplicate names are not allowed. Defaults toTrue.
- random_name_suffix: Whether to append a random suffix to the name.
251 def permanently_delete_source( 252 self, 253 source: str | CloudSource, 254 ) -> None: 255 """Delete a source from the workspace. 256 257 You can pass either the source ID `str` or a deployed `Source` object. 258 """ 259 if not isinstance(source, (str, CloudSource)): 260 raise exc.PyAirbyteInputError( 261 message="Invalid source type.", 262 input_value=type(source).__name__, 263 ) 264 265 api_util.delete_source( 266 source_id=source.connector_id if isinstance(source, CloudSource) else source, 267 api_root=self.api_root, 268 client_id=self.client_id, 269 client_secret=self.client_secret, 270 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
274 def permanently_delete_destination( 275 self, 276 destination: str | CloudDestination, 277 ) -> None: 278 """Delete a deployed destination from the workspace. 279 280 You can pass either the `Cache` class or the deployed destination ID as a `str`. 281 """ 282 if not isinstance(destination, (str, CloudDestination)): 283 raise exc.PyAirbyteInputError( 284 message="Invalid destination type.", 285 input_value=type(destination).__name__, 286 ) 287 288 api_util.delete_destination( 289 destination_id=( 290 destination if isinstance(destination, str) else destination.destination_id 291 ), 292 api_root=self.api_root, 293 client_id=self.client_id, 294 client_secret=self.client_secret, 295 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
299 def deploy_connection( 300 self, 301 connection_name: str, 302 *, 303 source: CloudSource | str, 304 selected_streams: list[str], 305 destination: CloudDestination | str, 306 table_prefix: str | None = None, 307 ) -> CloudConnection: 308 """Create a new connection between an already deployed source and destination. 309 310 Returns the newly deployed connection object. 311 312 Args: 313 connection_name: The name of the connection. 314 source: The deployed source. You can pass a source ID or a CloudSource object. 315 destination: The deployed destination. You can pass a destination ID or a 316 CloudDestination object. 317 table_prefix: Optional. The table prefix to use when syncing to the destination. 318 selected_streams: The selected stream names to sync within the connection. 319 """ 320 if not selected_streams: 321 raise exc.PyAirbyteInputError( 322 guidance="You must provide `selected_streams` when creating a connection." 323 ) 324 325 source_id: str = source if isinstance(source, str) else source.connector_id 326 destination_id: str = ( 327 destination if isinstance(destination, str) else destination.connector_id 328 ) 329 330 deployed_connection = api_util.create_connection( 331 name=connection_name, 332 source_id=source_id, 333 destination_id=destination_id, 334 api_root=self.api_root, 335 workspace_id=self.workspace_id, 336 selected_stream_names=selected_streams, 337 prefix=table_prefix or "", 338 client_id=self.client_id, 339 client_secret=self.client_secret, 340 ) 341 342 return CloudConnection( 343 workspace=self, 344 connection_id=deployed_connection.connection_id, 345 source=deployed_connection.source_id, 346 destination=deployed_connection.destination_id, 347 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
349 def permanently_delete_connection( 350 self, 351 connection: str | CloudConnection, 352 *, 353 cascade_delete_source: bool = False, 354 cascade_delete_destination: bool = False, 355 ) -> None: 356 """Delete a deployed connection from the workspace.""" 357 if connection is None: 358 raise ValueError("No connection ID provided.") 359 360 if isinstance(connection, str): 361 connection = CloudConnection( 362 workspace=self, 363 connection_id=connection, 364 ) 365 366 api_util.delete_connection( 367 connection_id=connection.connection_id, 368 api_root=self.api_root, 369 workspace_id=self.workspace_id, 370 client_id=self.client_id, 371 client_secret=self.client_secret, 372 ) 373 374 if cascade_delete_source: 375 self.permanently_delete_source(source=connection.source_id) 376 if cascade_delete_destination: 377 self.permanently_delete_destination(destination=connection.destination_id)
Delete a deployed connection from the workspace.
381 def list_connections( 382 self, 383 name: str | None = None, 384 *, 385 name_filter: Callable | None = None, 386 ) -> list[CloudConnection]: 387 """List connections by name in the workspace. 388 389 TODO: Add pagination support 390 """ 391 connections = api_util.list_connections( 392 api_root=self.api_root, 393 workspace_id=self.workspace_id, 394 name=name, 395 name_filter=name_filter, 396 client_id=self.client_id, 397 client_secret=self.client_secret, 398 ) 399 return [ 400 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 401 workspace=self, 402 connection_response=connection, 403 ) 404 for connection in connections 405 if name is None or connection.name == name 406 ]
List connections by name in the workspace.
TODO: Add pagination support
408 def list_sources( 409 self, 410 name: str | None = None, 411 *, 412 name_filter: Callable | None = None, 413 ) -> list[CloudSource]: 414 """List all sources in the workspace. 415 416 TODO: Add pagination support 417 """ 418 sources = api_util.list_sources( 419 api_root=self.api_root, 420 workspace_id=self.workspace_id, 421 name=name, 422 name_filter=name_filter, 423 client_id=self.client_id, 424 client_secret=self.client_secret, 425 ) 426 return [ 427 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 428 workspace=self, 429 source_response=source, 430 ) 431 for source in sources 432 if name is None or source.name == name 433 ]
List all sources in the workspace.
TODO: Add pagination support
435 def list_destinations( 436 self, 437 name: str | None = None, 438 *, 439 name_filter: Callable | None = None, 440 ) -> list[CloudDestination]: 441 """List all destinations in the workspace. 442 443 TODO: Add pagination support 444 """ 445 destinations = api_util.list_destinations( 446 api_root=self.api_root, 447 workspace_id=self.workspace_id, 448 name=name, 449 name_filter=name_filter, 450 client_id=self.client_id, 451 client_secret=self.client_secret, 452 ) 453 return [ 454 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 455 workspace=self, 456 destination_response=destination, 457 ) 458 for destination in destinations 459 if name is None or destination.name == name 460 ]
List all destinations in the workspace.
TODO: Add pagination support
462 def publish_custom_source_definition( 463 self, 464 name: str, 465 *, 466 manifest_yaml: dict[str, Any] | Path | str | None = None, 467 docker_image: str | None = None, 468 docker_tag: str | None = None, 469 unique: bool = True, 470 pre_validate: bool = True, 471 ) -> CustomCloudSourceDefinition: 472 """Publish a custom source connector definition. 473 474 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 475 and docker_tag (for Docker connectors), but not both. 476 477 Args: 478 name: Display name for the connector definition 479 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 480 docker_image: Docker repository (e.g., 'airbyte/source-custom') 481 docker_tag: Docker image tag (e.g., '1.0.0') 482 unique: Whether to enforce name uniqueness 483 pre_validate: Whether to validate manifest client-side (YAML only) 484 485 Returns: 486 CustomCloudSourceDefinition object representing the created definition 487 488 Raises: 489 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 490 AirbyteDuplicateResourcesError: If unique=True and name already exists 491 """ 492 is_yaml = manifest_yaml is not None 493 is_docker = docker_image is not None 494 495 if is_yaml == is_docker: 496 raise exc.PyAirbyteInputError( 497 message=( 498 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 499 "docker_image + docker_tag (for Docker connectors), but not both" 500 ), 501 context={ 502 "manifest_yaml_provided": is_yaml, 503 "docker_image_provided": is_docker, 504 }, 505 ) 506 507 if is_docker and docker_tag is None: 508 raise exc.PyAirbyteInputError( 509 message="docker_tag is required when docker_image is specified", 510 context={"docker_image": docker_image}, 511 ) 512 513 if unique: 514 existing = self.list_custom_source_definitions( 515 definition_type="yaml" if is_yaml else "docker", 516 ) 517 if any(d.name == name for d in existing): 518 raise exc.AirbyteDuplicateResourcesError( 519 resource_type="custom_source_definition", 520 resource_name=name, 521 ) 522 523 if is_yaml: 524 manifest_dict: dict[str, Any] 525 if isinstance(manifest_yaml, Path): 526 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 527 elif isinstance(manifest_yaml, str): 528 manifest_dict = yaml.safe_load(manifest_yaml) 529 elif manifest_yaml is not None: 530 manifest_dict = manifest_yaml 531 else: 532 raise exc.PyAirbyteInputError( 533 message="manifest_yaml is required for YAML connectors", 534 context={"name": name}, 535 ) 536 537 if pre_validate: 538 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 539 540 result = api_util.create_custom_yaml_source_definition( 541 name=name, 542 workspace_id=self.workspace_id, 543 manifest=manifest_dict, 544 api_root=self.api_root, 545 client_id=self.client_id, 546 client_secret=self.client_secret, 547 ) 548 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 549 550 raise NotImplementedError( 551 "Docker custom source definitions are not yet supported. " 552 "Only YAML manifest-based custom sources are currently available." 553 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
555 def list_custom_source_definitions( 556 self, 557 *, 558 definition_type: Literal["yaml", "docker"], 559 ) -> list[CustomCloudSourceDefinition]: 560 """List custom source connector definitions. 561 562 Args: 563 definition_type: Connector type to list ("yaml" or "docker"). Required. 564 565 Returns: 566 List of CustomCloudSourceDefinition objects matching the specified type 567 """ 568 if definition_type == "yaml": 569 yaml_definitions = api_util.list_custom_yaml_source_definitions( 570 workspace_id=self.workspace_id, 571 api_root=self.api_root, 572 client_id=self.client_id, 573 client_secret=self.client_secret, 574 ) 575 return [ 576 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 577 for d in yaml_definitions 578 ] 579 580 raise NotImplementedError( 581 "Docker custom source definitions are not yet supported. " 582 "Only YAML manifest-based custom sources are currently available." 583 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
585 def get_custom_source_definition( 586 self, 587 definition_id: str, 588 *, 589 definition_type: Literal["yaml", "docker"], 590 ) -> CustomCloudSourceDefinition: 591 """Get a specific custom source definition by ID. 592 593 Args: 594 definition_id: The definition ID 595 definition_type: Connector type ("yaml" or "docker"). Required. 596 597 Returns: 598 CustomCloudSourceDefinition object 599 """ 600 if definition_type == "yaml": 601 result = api_util.get_custom_yaml_source_definition( 602 workspace_id=self.workspace_id, 603 definition_id=definition_id, 604 api_root=self.api_root, 605 client_id=self.client_id, 606 client_secret=self.client_secret, 607 ) 608 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 609 610 raise NotImplementedError( 611 "Docker custom source definitions are not yet supported. " 612 "Only YAML manifest-based custom sources are currently available." 613 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 @classmethod 69 def _from_connection_response( 70 cls, 71 workspace: CloudWorkspace, 72 connection_response: ConnectionResponse, 73 ) -> CloudConnection: 74 """Create a CloudConnection from a ConnectionResponse.""" 75 result = cls( 76 workspace=workspace, 77 connection_id=connection_response.connection_id, 78 source=connection_response.source_id, 79 destination=connection_response.destination_id, 80 ) 81 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 82 return result 83 84 # Properties 85 86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name 96 97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return self._source_id 107 108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object 119 120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.source_id 128 129 return self._destination_id 130 131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object 142 143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []] 150 151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or "" 158 159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 163 164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline" 168 169 # Run Sync 170 171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result 199 200 def __repr__(self) -> str: 201 """String representation of the connection.""" 202 return ( 203 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 204 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 205 ) 206 207 # Logs 208 209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ] 232 233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 ) 259 260 def rename(self, name: str) -> CloudConnection: 261 """Rename the connection. 262 263 Args: 264 name: New name for the connection 265 266 Returns: 267 Updated CloudConnection object with refreshed info 268 """ 269 updated_response = api_util.patch_connection( 270 connection_id=self.connection_id, 271 api_root=self.workspace.api_root, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 name=name, 275 ) 276 self._connection_info = updated_response 277 return self 278 279 def set_table_prefix(self, prefix: str) -> CloudConnection: 280 """Set the table prefix for the connection. 281 282 Args: 283 prefix: New table prefix to use when syncing to the destination 284 285 Returns: 286 Updated CloudConnection object with refreshed info 287 """ 288 updated_response = api_util.patch_connection( 289 connection_id=self.connection_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 prefix=prefix, 294 ) 295 self._connection_info = updated_response 296 return self 297 298 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 299 """Set the selected streams for the connection. 300 301 This is a destructive operation that can break existing connections if the 302 stream selection is changed incorrectly. Use with caution. 303 304 Args: 305 stream_names: List of stream names to sync 306 307 Returns: 308 Updated CloudConnection object with refreshed info 309 """ 310 configurations = api_util.build_stream_configurations(stream_names) 311 312 updated_response = api_util.patch_connection( 313 connection_id=self.connection_id, 314 api_root=self.workspace.api_root, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 configurations=configurations, 318 ) 319 self._connection_info = updated_response 320 return self 321 322 # Deletions 323 324 def permanently_delete( 325 self, 326 *, 327 cascade_delete_source: bool = False, 328 cascade_delete_destination: bool = False, 329 ) -> None: 330 """Delete the connection. 331 332 Args: 333 cascade_delete_source: Whether to also delete the source. 334 cascade_delete_destination: Whether to also delete the destination. 335 """ 336 self.workspace.permanently_delete_connection(self) 337 338 if cascade_delete_source: 339 self.workspace.permanently_delete_source(self.source_id) 340 341 if cascade_delete_destination: 342 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return self._source_id
The ID of the source.
108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object
Get the source object.
120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.source_id 128 129 return self._destination_id
The ID of the destination.
131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object
Get the destination object.
143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or ""
The table prefix.
159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result
Run a sync.
209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ]
Get the previous sync logs for a connection.
233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
260 def rename(self, name: str) -> CloudConnection: 261 """Rename the connection. 262 263 Args: 264 name: New name for the connection 265 266 Returns: 267 Updated CloudConnection object with refreshed info 268 """ 269 updated_response = api_util.patch_connection( 270 connection_id=self.connection_id, 271 api_root=self.workspace.api_root, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 name=name, 275 ) 276 self._connection_info = updated_response 277 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
279 def set_table_prefix(self, prefix: str) -> CloudConnection: 280 """Set the table prefix for the connection. 281 282 Args: 283 prefix: New table prefix to use when syncing to the destination 284 285 Returns: 286 Updated CloudConnection object with refreshed info 287 """ 288 updated_response = api_util.patch_connection( 289 connection_id=self.connection_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 prefix=prefix, 294 ) 295 self._connection_info = updated_response 296 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
298 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 299 """Set the selected streams for the connection. 300 301 This is a destructive operation that can break existing connections if the 302 stream selection is changed incorrectly. Use with caution. 303 304 Args: 305 stream_names: List of stream names to sync 306 307 Returns: 308 Updated CloudConnection object with refreshed info 309 """ 310 configurations = api_util.build_stream_configurations(stream_names) 311 312 updated_response = api_util.patch_connection( 313 connection_id=self.connection_id, 314 api_root=self.workspace.api_root, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 configurations=configurations, 318 ) 319 self._connection_info = updated_response 320 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
324 def permanently_delete( 325 self, 326 *, 327 cascade_delete_source: bool = False, 328 cascade_delete_destination: bool = False, 329 ) -> None: 330 """Delete the connection. 331 332 Args: 333 cascade_delete_source: Whether to also delete the source. 334 cascade_delete_destination: Whether to also delete the destination. 335 """ 336 self.workspace.permanently_delete_connection(self) 337 338 if cascade_delete_source: 339 self.workspace.permanently_delete_source(self.source_id) 340 341 if cascade_delete_destination: 342 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: JobResponse | None = None 232 _connection_response: ConnectionResponse | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 ) 260 return self._connection_response 261 262 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 263 """Return the destination configuration for the sync job.""" 264 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 265 destination_response = api_util.get_destination( 266 destination_id=connection_info.destination_id, 267 api_root=self.workspace.api_root, 268 client_id=self.workspace.client_id, 269 client_secret=self.workspace.client_secret, 270 ) 271 return asdict(destination_response.configuration) 272 273 def is_job_complete(self) -> bool: 274 """Check if the sync job is complete.""" 275 return self.get_job_status() in FINAL_STATUSES 276 277 def get_job_status(self) -> JobStatusEnum: 278 """Check if the sync job is still running.""" 279 return self._fetch_latest_job_info().status 280 281 def _fetch_latest_job_info(self) -> JobResponse: 282 """Return the job info for the sync job.""" 283 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 284 return self._latest_job_info 285 286 self._latest_job_info = api_util.get_job_info( 287 job_id=self.job_id, 288 api_root=self.workspace.api_root, 289 client_id=self.workspace.client_id, 290 client_secret=self.workspace.client_secret, 291 ) 292 return self._latest_job_info 293 294 @property 295 def bytes_synced(self) -> int: 296 """Return the number of records processed.""" 297 return self._fetch_latest_job_info().bytes_synced or 0 298 299 @property 300 def records_synced(self) -> int: 301 """Return the number of records processed.""" 302 return self._fetch_latest_job_info().rows_synced or 0 303 304 @property 305 def start_time(self) -> datetime: 306 """Return the start time of the sync job in UTC.""" 307 try: 308 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 309 except (ValueError, TypeError) as e: 310 if "Invalid isoformat string" in str(e): 311 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 312 api_root=self.workspace.api_root, 313 path="/jobs/get", 314 json={"id": self.job_id}, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 ) 318 raw_start_time = job_info_raw.get("startTime") 319 if raw_start_time: 320 return ab_datetime_parse(raw_start_time) 321 raise 322 323 def _fetch_job_with_attempts(self) -> dict[str, Any]: 324 """Fetch job info with attempts from Config API using lazy loading pattern.""" 325 if self._job_with_attempts_info is not None: 326 return self._job_with_attempts_info 327 328 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 329 api_root=self.workspace.api_root, 330 path="/jobs/get", 331 json={ 332 "id": self.job_id, 333 }, 334 client_id=self.workspace.client_id, 335 client_secret=self.workspace.client_secret, 336 ) 337 return self._job_with_attempts_info 338 339 def get_attempts(self) -> list[SyncAttempt]: 340 """Return a list of attempts for this sync job.""" 341 job_with_attempts = self._fetch_job_with_attempts() 342 attempts_data = job_with_attempts.get("attempts", []) 343 344 return [ 345 SyncAttempt( 346 workspace=self.workspace, 347 connection=self.connection, 348 job_id=self.job_id, 349 attempt_number=i, 350 _attempt_data=attempt_data, 351 ) 352 for i, attempt_data in enumerate(attempts_data, start=0) 353 ] 354 355 def raise_failure_status( 356 self, 357 *, 358 refresh_status: bool = False, 359 ) -> None: 360 """Raise an exception if the sync job failed. 361 362 By default, this method will use the latest status available. If you want to refresh the 363 status before checking for failure, set `refresh_status=True`. If the job has failed, this 364 method will raise a `AirbyteConnectionSyncError`. 365 366 Otherwise, do nothing. 367 """ 368 if not refresh_status and self._latest_job_info: 369 latest_status = self._latest_job_info.status 370 else: 371 latest_status = self.get_job_status() 372 373 if latest_status in FAILED_STATUSES: 374 raise AirbyteConnectionSyncError( 375 workspace=self.workspace, 376 connection_id=self.connection.connection_id, 377 job_id=self.job_id, 378 job_status=self.get_job_status(), 379 ) 380 381 def wait_for_completion( 382 self, 383 *, 384 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 385 raise_timeout: bool = True, 386 raise_failure: bool = False, 387 ) -> JobStatusEnum: 388 """Wait for a job to finish running.""" 389 start_time = time.time() 390 while True: 391 latest_status = self.get_job_status() 392 if latest_status in FINAL_STATUSES: 393 if raise_failure: 394 # No-op if the job succeeded or is still running: 395 self.raise_failure_status() 396 397 return latest_status 398 399 if time.time() - start_time > wait_timeout: 400 if raise_timeout: 401 raise AirbyteConnectionSyncTimeoutError( 402 workspace=self.workspace, 403 connection_id=self.connection.connection_id, 404 job_id=self.job_id, 405 job_status=latest_status, 406 timeout=wait_timeout, 407 ) 408 409 return latest_status # This will be a non-final status 410 411 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 412 413 def get_sql_cache(self) -> CacheBase: 414 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 415 if self._cache: 416 return self._cache 417 418 destination_configuration = self._get_destination_configuration() 419 self._cache = destination_to_cache(destination_configuration=destination_configuration) 420 return self._cache 421 422 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 423 """Return a SQL Engine for querying a SQL-based destination.""" 424 return self.get_sql_cache().get_sql_engine() 425 426 def get_sql_table_name(self, stream_name: str) -> str: 427 """Return the SQL table name of the named stream.""" 428 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 429 430 def get_sql_table( 431 self, 432 stream_name: str, 433 ) -> sqlalchemy.Table: 434 """Return a SQLAlchemy table object for the named stream.""" 435 return self.get_sql_cache().processor.get_sql_table(stream_name) 436 437 def get_dataset(self, stream_name: str) -> CachedDataset: 438 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 439 440 This can be used to read and analyze the data in a SQL-based destination. 441 442 TODO: In a future iteration, we can consider providing stream configuration information 443 (catalog information) to the `CachedDataset` object via the "Get stream properties" 444 API: https://reference.airbyte.com/reference/getstreamproperties 445 """ 446 return CachedDataset( 447 self.get_sql_cache(), 448 stream_name=stream_name, 449 stream_configuration=False, # Don't look for stream configuration in cache. 450 ) 451 452 def get_sql_database_name(self) -> str: 453 """Return the SQL database name.""" 454 cache = self.get_sql_cache() 455 return cache.get_database_name() 456 457 def get_sql_schema_name(self) -> str: 458 """Return the SQL schema name.""" 459 cache = self.get_sql_cache() 460 return cache.schema_name 461 462 @property 463 def stream_names(self) -> list[str]: 464 """Return the set of stream names.""" 465 return self.connection.stream_names 466 467 @final 468 @property 469 def streams( 470 self, 471 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 472 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 473 474 This is a convenience wrapper around the `stream_names` 475 property and `get_dataset()` method. 476 """ 477 return self._SyncResultStreams(self) 478 479 class _SyncResultStreams(Mapping[str, CachedDataset]): 480 """A mapping of stream names to cached datasets.""" 481 482 def __init__( 483 self, 484 parent: SyncResult, 485 /, 486 ) -> None: 487 self.parent: SyncResult = parent 488 489 def __getitem__(self, key: str) -> CachedDataset: 490 return self.parent.get_dataset(stream_name=key) 491 492 def __iter__(self) -> Iterator[str]: 493 return iter(self.parent.stream_names) 494 495 def __len__(self) -> int: 496 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
273 def is_job_complete(self) -> bool: 274 """Check if the sync job is complete.""" 275 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
277 def get_job_status(self) -> JobStatusEnum: 278 """Check if the sync job is still running.""" 279 return self._fetch_latest_job_info().status
Check if the sync job is still running.
294 @property 295 def bytes_synced(self) -> int: 296 """Return the number of records processed.""" 297 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
299 @property 300 def records_synced(self) -> int: 301 """Return the number of records processed.""" 302 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
304 @property 305 def start_time(self) -> datetime: 306 """Return the start time of the sync job in UTC.""" 307 try: 308 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 309 except (ValueError, TypeError) as e: 310 if "Invalid isoformat string" in str(e): 311 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 312 api_root=self.workspace.api_root, 313 path="/jobs/get", 314 json={"id": self.job_id}, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 ) 318 raw_start_time = job_info_raw.get("startTime") 319 if raw_start_time: 320 return ab_datetime_parse(raw_start_time) 321 raise
Return the start time of the sync job in UTC.
339 def get_attempts(self) -> list[SyncAttempt]: 340 """Return a list of attempts for this sync job.""" 341 job_with_attempts = self._fetch_job_with_attempts() 342 attempts_data = job_with_attempts.get("attempts", []) 343 344 return [ 345 SyncAttempt( 346 workspace=self.workspace, 347 connection=self.connection, 348 job_id=self.job_id, 349 attempt_number=i, 350 _attempt_data=attempt_data, 351 ) 352 for i, attempt_data in enumerate(attempts_data, start=0) 353 ]
Return a list of attempts for this sync job.
355 def raise_failure_status( 356 self, 357 *, 358 refresh_status: bool = False, 359 ) -> None: 360 """Raise an exception if the sync job failed. 361 362 By default, this method will use the latest status available. If you want to refresh the 363 status before checking for failure, set `refresh_status=True`. If the job has failed, this 364 method will raise a `AirbyteConnectionSyncError`. 365 366 Otherwise, do nothing. 367 """ 368 if not refresh_status and self._latest_job_info: 369 latest_status = self._latest_job_info.status 370 else: 371 latest_status = self.get_job_status() 372 373 if latest_status in FAILED_STATUSES: 374 raise AirbyteConnectionSyncError( 375 workspace=self.workspace, 376 connection_id=self.connection.connection_id, 377 job_id=self.job_id, 378 job_status=self.get_job_status(), 379 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
381 def wait_for_completion( 382 self, 383 *, 384 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 385 raise_timeout: bool = True, 386 raise_failure: bool = False, 387 ) -> JobStatusEnum: 388 """Wait for a job to finish running.""" 389 start_time = time.time() 390 while True: 391 latest_status = self.get_job_status() 392 if latest_status in FINAL_STATUSES: 393 if raise_failure: 394 # No-op if the job succeeded or is still running: 395 self.raise_failure_status() 396 397 return latest_status 398 399 if time.time() - start_time > wait_timeout: 400 if raise_timeout: 401 raise AirbyteConnectionSyncTimeoutError( 402 workspace=self.workspace, 403 connection_id=self.connection.connection_id, 404 job_id=self.job_id, 405 job_status=latest_status, 406 timeout=wait_timeout, 407 ) 408 409 return latest_status # This will be a non-final status 410 411 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
413 def get_sql_cache(self) -> CacheBase: 414 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 415 if self._cache: 416 return self._cache 417 418 destination_configuration = self._get_destination_configuration() 419 self._cache = destination_to_cache(destination_configuration=destination_configuration) 420 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
422 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 423 """Return a SQL Engine for querying a SQL-based destination.""" 424 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
426 def get_sql_table_name(self, stream_name: str) -> str: 427 """Return the SQL table name of the named stream.""" 428 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
430 def get_sql_table( 431 self, 432 stream_name: str, 433 ) -> sqlalchemy.Table: 434 """Return a SQLAlchemy table object for the named stream.""" 435 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
437 def get_dataset(self, stream_name: str) -> CachedDataset: 438 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 439 440 This can be used to read and analyze the data in a SQL-based destination. 441 442 TODO: In a future iteration, we can consider providing stream configuration information 443 (catalog information) to the `CachedDataset` object via the "Get stream properties" 444 API: https://reference.airbyte.com/reference/getstreamproperties 445 """ 446 return CachedDataset( 447 self.get_sql_cache(), 448 stream_name=stream_name, 449 stream_configuration=False, # Don't look for stream configuration in cache. 450 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
      (catalog information) to the CachedDataset object via the "Get stream properties"
      API: https://reference.airbyte.com/reference/getstreamproperties
452 def get_sql_database_name(self) -> str: 453 """Return the SQL database name.""" 454 cache = self.get_sql_cache() 455 return cache.get_database_name()
Return the SQL database name.
457 def get_sql_schema_name(self) -> str: 458 """Return the SQL schema name.""" 459 cache = self.get_sql_cache() 460 return cache.schema_name
Return the SQL schema name.
462 @property 463 def stream_names(self) -> list[str]: 464 """Return the set of stream names.""" 465 return self.connection.stream_names
Return the set of stream names.
467 @final 468 @property 469 def streams( 470 self, 471 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 472 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 473 474 This is a convenience wrapper around the `stream_names` 475 property and `get_dataset()` method. 476 """ 477 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans