airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Examples 7 8### Basic Sync Example: 9 10```python 11import airbyte as ab 12from airbyte import cloud 13 14# Initialize an Airbyte Cloud workspace object 15workspace = cloud.CloudWorkspace( 16 workspace_id="123", 17 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 18) 19 20# Run a sync job on Airbyte Cloud 21connection = workspace.get_connection(connection_id="456") 22sync_result = connection.run_sync() 23print(sync_result.get_job_status()) 24``` 25 26### Example Read From Cloud Destination: 27 28If your destination is supported, you can read records directly from the 29`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 30 31 32```python 33# Assuming we've already created a `connection` object... 34 35# Get the latest job result and print the stream names 36sync_result = connection.get_sync_result() 37print(sync_result.stream_names) 38 39# Get a dataset from the sync result 40dataset: CachedDataset = sync_result.get_dataset("users") 41 42# Get a SQLAlchemy table to use in SQL queries... 43users_table = dataset.to_sql_table() 44print(f"Table name: {users_table.name}") 45 46# Or iterate over the dataset directly 47for record in dataset: 48 print(record) 49``` 50""" 51 52from __future__ import annotations 53 54from typing import TYPE_CHECKING 55 56from airbyte.cloud.connections import CloudConnection 57from airbyte.cloud.constants import JobStatusEnum 58from airbyte.cloud.sync_results import SyncResult 59from airbyte.cloud.workspaces import CloudWorkspace 60 61 62# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 63if TYPE_CHECKING: 64 # ruff: noqa: TC004 65 from airbyte.cloud import connections, constants, sync_results, workspaces 66 67 68__all__ = [ 69 # Submodules 70 "workspaces", 71 "connections", 72 "constants", 73 "sync_results", 74 # Classes 75 "CloudWorkspace", 76 "CloudConnection", 77 "SyncResult", 78 # Enums 79 "JobStatusEnum", 80]
65@dataclass 66class CloudWorkspace: 67 """A remote workspace on the Airbyte Cloud. 68 69 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 70 instances, both OSS and Enterprise. 71 """ 72 73 workspace_id: str 74 client_id: SecretString 75 client_secret: SecretString 76 api_root: str = api_util.CLOUD_API_ROOT 77 78 def __post_init__(self) -> None: 79 """Ensure that the client ID and secret are handled securely.""" 80 self.client_id = SecretString(self.client_id) 81 self.client_secret = SecretString(self.client_secret) 82 83 @property 84 def workspace_url(self) -> str | None: 85 """The web URL of the workspace.""" 86 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 87 88 @cached_property 89 def _organization_info(self) -> dict[str, Any]: 90 """Fetch and cache organization info for this workspace. 91 92 Uses the Config API endpoint for an efficient O(1) lookup. 93 """ 94 return api_util.get_workspace_organization_info( 95 workspace_id=self.workspace_id, 96 api_root=self.api_root, 97 client_id=self.client_id, 98 client_secret=self.client_secret, 99 ) 100 101 @property 102 def organization_id(self) -> str | None: 103 """The ID of the organization this workspace belongs to. 104 105 This value is cached after the first lookup. 106 """ 107 return self._organization_info.get("organizationId") 108 109 @property 110 def organization_name(self) -> str | None: 111 """The name of the organization this workspace belongs to. 112 113 This value is cached after the first lookup. 114 """ 115 return self._organization_info.get("organizationName") 116 117 # Test connection and creds 118 119 def connect(self) -> None: 120 """Check that the workspace is reachable and raise an exception otherwise. 121 122 Note: It is not necessary to call this method before calling other operations. It 123 serves primarily as a simple check to ensure that the workspace is reachable 124 and credentials are correct. 125 """ 126 _ = api_util.get_workspace( 127 api_root=self.api_root, 128 workspace_id=self.workspace_id, 129 client_id=self.client_id, 130 client_secret=self.client_secret, 131 ) 132 print(f"Successfully connected to workspace: {self.workspace_url}") 133 134 # Get sources, destinations, and connections 135 136 def get_connection( 137 self, 138 connection_id: str, 139 ) -> CloudConnection: 140 """Get a connection by ID. 141 142 This method does not fetch data from the API. It returns a `CloudConnection` object, 143 which will be loaded lazily as needed. 144 """ 145 return CloudConnection( 146 workspace=self, 147 connection_id=connection_id, 148 ) 149 150 def get_source( 151 self, 152 source_id: str, 153 ) -> CloudSource: 154 """Get a source by ID. 155 156 This method does not fetch data from the API. It returns a `CloudSource` object, 157 which will be loaded lazily as needed. 158 """ 159 return CloudSource( 160 workspace=self, 161 connector_id=source_id, 162 ) 163 164 def get_destination( 165 self, 166 destination_id: str, 167 ) -> CloudDestination: 168 """Get a destination by ID. 169 170 This method does not fetch data from the API. It returns a `CloudDestination` object, 171 which will be loaded lazily as needed. 172 """ 173 return CloudDestination( 174 workspace=self, 175 connector_id=destination_id, 176 ) 177 178 # Deploy sources and destinations 179 180 def deploy_source( 181 self, 182 name: str, 183 source: Source, 184 *, 185 unique: bool = True, 186 random_name_suffix: bool = False, 187 ) -> CloudSource: 188 """Deploy a source to the workspace. 189 190 Returns the newly deployed source. 191 192 Args: 193 name: The name to use when deploying. 194 source: The source object to deploy. 195 unique: Whether to require a unique name. If `True`, duplicate names 196 are not allowed. Defaults to `True`. 197 random_name_suffix: Whether to append a random suffix to the name. 198 """ 199 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 200 source_config_dict["sourceType"] = source.name.replace("source-", "") 201 202 if random_name_suffix: 203 name += f" (ID: {text_util.generate_random_suffix()})" 204 205 if unique: 206 existing = self.list_sources(name=name) 207 if existing: 208 raise exc.AirbyteDuplicateResourcesError( 209 resource_type="source", 210 resource_name=name, 211 ) 212 213 deployed_source = api_util.create_source( 214 name=name, 215 api_root=self.api_root, 216 workspace_id=self.workspace_id, 217 config=source_config_dict, 218 client_id=self.client_id, 219 client_secret=self.client_secret, 220 ) 221 return CloudSource( 222 workspace=self, 223 connector_id=deployed_source.source_id, 224 ) 225 226 def deploy_destination( 227 self, 228 name: str, 229 destination: Destination | dict[str, Any], 230 *, 231 unique: bool = True, 232 random_name_suffix: bool = False, 233 ) -> CloudDestination: 234 """Deploy a destination to the workspace. 235 236 Returns the newly deployed destination ID. 237 238 Args: 239 name: The name to use when deploying. 240 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 241 dictionary of configuration values. 242 unique: Whether to require a unique name. If `True`, duplicate names 243 are not allowed. Defaults to `True`. 244 random_name_suffix: Whether to append a random suffix to the name. 245 """ 246 if isinstance(destination, Destination): 247 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 248 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 249 # raise ValueError(destination_conf_dict) 250 else: 251 destination_conf_dict = destination.copy() 252 if "destinationType" not in destination_conf_dict: 253 raise exc.PyAirbyteInputError( 254 message="Missing `destinationType` in configuration dictionary.", 255 ) 256 257 if random_name_suffix: 258 name += f" (ID: {text_util.generate_random_suffix()})" 259 260 if unique: 261 existing = self.list_destinations(name=name) 262 if existing: 263 raise exc.AirbyteDuplicateResourcesError( 264 resource_type="destination", 265 resource_name=name, 266 ) 267 268 deployed_destination = api_util.create_destination( 269 name=name, 270 api_root=self.api_root, 271 workspace_id=self.workspace_id, 272 config=destination_conf_dict, # Wants a dataclass but accepts dict 273 client_id=self.client_id, 274 client_secret=self.client_secret, 275 ) 276 return CloudDestination( 277 workspace=self, 278 connector_id=deployed_destination.destination_id, 279 ) 280 281 def permanently_delete_source( 282 self, 283 source: str | CloudSource, 284 *, 285 safe_mode: bool = True, 286 ) -> None: 287 """Delete a source from the workspace. 288 289 You can pass either the source ID `str` or a deployed `Source` object. 290 291 Args: 292 source: The source ID or CloudSource object to delete 293 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 294 (case insensitive) to prevent accidental deletion. Defaults to True. 295 """ 296 if not isinstance(source, (str, CloudSource)): 297 raise exc.PyAirbyteInputError( 298 message="Invalid source type.", 299 input_value=type(source).__name__, 300 ) 301 302 api_util.delete_source( 303 source_id=source.connector_id if isinstance(source, CloudSource) else source, 304 source_name=source.name if isinstance(source, CloudSource) else None, 305 api_root=self.api_root, 306 client_id=self.client_id, 307 client_secret=self.client_secret, 308 safe_mode=safe_mode, 309 ) 310 311 # Deploy and delete destinations 312 313 def permanently_delete_destination( 314 self, 315 destination: str | CloudDestination, 316 *, 317 safe_mode: bool = True, 318 ) -> None: 319 """Delete a deployed destination from the workspace. 320 321 You can pass either the `Cache` class or the deployed destination ID as a `str`. 322 323 Args: 324 destination: The destination ID or CloudDestination object to delete 325 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 326 (case insensitive) to prevent accidental deletion. Defaults to True. 327 """ 328 if not isinstance(destination, (str, CloudDestination)): 329 raise exc.PyAirbyteInputError( 330 message="Invalid destination type.", 331 input_value=type(destination).__name__, 332 ) 333 334 api_util.delete_destination( 335 destination_id=( 336 destination if isinstance(destination, str) else destination.destination_id 337 ), 338 destination_name=( 339 destination.name if isinstance(destination, CloudDestination) else None 340 ), 341 api_root=self.api_root, 342 client_id=self.client_id, 343 client_secret=self.client_secret, 344 safe_mode=safe_mode, 345 ) 346 347 # Deploy and delete connections 348 349 def deploy_connection( 350 self, 351 connection_name: str, 352 *, 353 source: CloudSource | str, 354 selected_streams: list[str], 355 destination: CloudDestination | str, 356 table_prefix: str | None = None, 357 ) -> CloudConnection: 358 """Create a new connection between an already deployed source and destination. 359 360 Returns the newly deployed connection object. 361 362 Args: 363 connection_name: The name of the connection. 364 source: The deployed source. You can pass a source ID or a CloudSource object. 365 destination: The deployed destination. You can pass a destination ID or a 366 CloudDestination object. 367 table_prefix: Optional. The table prefix to use when syncing to the destination. 368 selected_streams: The selected stream names to sync within the connection. 369 """ 370 if not selected_streams: 371 raise exc.PyAirbyteInputError( 372 guidance="You must provide `selected_streams` when creating a connection." 373 ) 374 375 source_id: str = source if isinstance(source, str) else source.connector_id 376 destination_id: str = ( 377 destination if isinstance(destination, str) else destination.connector_id 378 ) 379 380 deployed_connection = api_util.create_connection( 381 name=connection_name, 382 source_id=source_id, 383 destination_id=destination_id, 384 api_root=self.api_root, 385 workspace_id=self.workspace_id, 386 selected_stream_names=selected_streams, 387 prefix=table_prefix or "", 388 client_id=self.client_id, 389 client_secret=self.client_secret, 390 ) 391 392 return CloudConnection( 393 workspace=self, 394 connection_id=deployed_connection.connection_id, 395 source=deployed_connection.source_id, 396 destination=deployed_connection.destination_id, 397 ) 398 399 def permanently_delete_connection( 400 self, 401 connection: str | CloudConnection, 402 *, 403 cascade_delete_source: bool = False, 404 cascade_delete_destination: bool = False, 405 safe_mode: bool = True, 406 ) -> None: 407 """Delete a deployed connection from the workspace. 408 409 Args: 410 connection: The connection ID or CloudConnection object to delete 411 cascade_delete_source: If True, also delete the source after deleting the connection 412 cascade_delete_destination: If True, also delete the destination after deleting 413 the connection 414 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 415 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 416 to cascade deletes. 417 """ 418 if connection is None: 419 raise ValueError("No connection ID provided.") 420 421 if isinstance(connection, str): 422 connection = CloudConnection( 423 workspace=self, 424 connection_id=connection, 425 ) 426 427 api_util.delete_connection( 428 connection_id=connection.connection_id, 429 connection_name=connection.name, 430 api_root=self.api_root, 431 workspace_id=self.workspace_id, 432 client_id=self.client_id, 433 client_secret=self.client_secret, 434 safe_mode=safe_mode, 435 ) 436 437 if cascade_delete_source: 438 self.permanently_delete_source( 439 source=connection.source_id, 440 safe_mode=safe_mode, 441 ) 442 if cascade_delete_destination: 443 self.permanently_delete_destination( 444 destination=connection.destination_id, 445 safe_mode=safe_mode, 446 ) 447 448 # List sources, destinations, and connections 449 450 def list_connections( 451 self, 452 name: str | None = None, 453 *, 454 name_filter: Callable | None = None, 455 ) -> list[CloudConnection]: 456 """List connections by name in the workspace. 457 458 TODO: Add pagination support 459 """ 460 connections = api_util.list_connections( 461 api_root=self.api_root, 462 workspace_id=self.workspace_id, 463 name=name, 464 name_filter=name_filter, 465 client_id=self.client_id, 466 client_secret=self.client_secret, 467 ) 468 return [ 469 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 470 workspace=self, 471 connection_response=connection, 472 ) 473 for connection in connections 474 if name is None or connection.name == name 475 ] 476 477 def list_sources( 478 self, 479 name: str | None = None, 480 *, 481 name_filter: Callable | None = None, 482 ) -> list[CloudSource]: 483 """List all sources in the workspace. 484 485 TODO: Add pagination support 486 """ 487 sources = api_util.list_sources( 488 api_root=self.api_root, 489 workspace_id=self.workspace_id, 490 name=name, 491 name_filter=name_filter, 492 client_id=self.client_id, 493 client_secret=self.client_secret, 494 ) 495 return [ 496 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 497 workspace=self, 498 source_response=source, 499 ) 500 for source in sources 501 if name is None or source.name == name 502 ] 503 504 def list_destinations( 505 self, 506 name: str | None = None, 507 *, 508 name_filter: Callable | None = None, 509 ) -> list[CloudDestination]: 510 """List all destinations in the workspace. 511 512 TODO: Add pagination support 513 """ 514 destinations = api_util.list_destinations( 515 api_root=self.api_root, 516 workspace_id=self.workspace_id, 517 name=name, 518 name_filter=name_filter, 519 client_id=self.client_id, 520 client_secret=self.client_secret, 521 ) 522 return [ 523 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 524 workspace=self, 525 destination_response=destination, 526 ) 527 for destination in destinations 528 if name is None or destination.name == name 529 ] 530 531 def publish_custom_source_definition( 532 self, 533 name: str, 534 *, 535 manifest_yaml: dict[str, Any] | Path | str | None = None, 536 docker_image: str | None = None, 537 docker_tag: str | None = None, 538 unique: bool = True, 539 pre_validate: bool = True, 540 testing_values: dict[str, Any] | None = None, 541 ) -> CustomCloudSourceDefinition: 542 """Publish a custom source connector definition. 543 544 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 545 and docker_tag (for Docker connectors), but not both. 546 547 Args: 548 name: Display name for the connector definition 549 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 550 docker_image: Docker repository (e.g., 'airbyte/source-custom') 551 docker_tag: Docker image tag (e.g., '1.0.0') 552 unique: Whether to enforce name uniqueness 553 pre_validate: Whether to validate manifest client-side (YAML only) 554 testing_values: Optional configuration values to use for testing in the 555 Connector Builder UI. If provided, these values are stored as the complete 556 testing values object for the connector builder project (replaces any existing 557 values), allowing immediate test read operations. 558 559 Returns: 560 CustomCloudSourceDefinition object representing the created definition 561 562 Raises: 563 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 564 AirbyteDuplicateResourcesError: If unique=True and name already exists 565 """ 566 is_yaml = manifest_yaml is not None 567 is_docker = docker_image is not None 568 569 if is_yaml == is_docker: 570 raise exc.PyAirbyteInputError( 571 message=( 572 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 573 "docker_image + docker_tag (for Docker connectors), but not both" 574 ), 575 context={ 576 "manifest_yaml_provided": is_yaml, 577 "docker_image_provided": is_docker, 578 }, 579 ) 580 581 if is_docker and docker_tag is None: 582 raise exc.PyAirbyteInputError( 583 message="docker_tag is required when docker_image is specified", 584 context={"docker_image": docker_image}, 585 ) 586 587 if unique: 588 existing = self.list_custom_source_definitions( 589 definition_type="yaml" if is_yaml else "docker", 590 ) 591 if any(d.name == name for d in existing): 592 raise exc.AirbyteDuplicateResourcesError( 593 resource_type="custom_source_definition", 594 resource_name=name, 595 ) 596 597 if is_yaml: 598 manifest_dict: dict[str, Any] 599 if isinstance(manifest_yaml, Path): 600 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 601 elif isinstance(manifest_yaml, str): 602 manifest_dict = yaml.safe_load(manifest_yaml) 603 elif manifest_yaml is not None: 604 manifest_dict = manifest_yaml 605 else: 606 raise exc.PyAirbyteInputError( 607 message="manifest_yaml is required for YAML connectors", 608 context={"name": name}, 609 ) 610 611 if pre_validate: 612 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 613 614 result = api_util.create_custom_yaml_source_definition( 615 name=name, 616 workspace_id=self.workspace_id, 617 manifest=manifest_dict, 618 api_root=self.api_root, 619 client_id=self.client_id, 620 client_secret=self.client_secret, 621 ) 622 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 623 self, result 624 ) 625 626 # Set testing values if provided 627 if testing_values is not None: 628 custom_definition.set_testing_values(testing_values) 629 630 return custom_definition 631 632 raise NotImplementedError( 633 "Docker custom source definitions are not yet supported. " 634 "Only YAML manifest-based custom sources are currently available." 635 ) 636 637 def list_custom_source_definitions( 638 self, 639 *, 640 definition_type: Literal["yaml", "docker"], 641 ) -> list[CustomCloudSourceDefinition]: 642 """List custom source connector definitions. 643 644 Args: 645 definition_type: Connector type to list ("yaml" or "docker"). Required. 646 647 Returns: 648 List of CustomCloudSourceDefinition objects matching the specified type 649 """ 650 if definition_type == "yaml": 651 yaml_definitions = api_util.list_custom_yaml_source_definitions( 652 workspace_id=self.workspace_id, 653 api_root=self.api_root, 654 client_id=self.client_id, 655 client_secret=self.client_secret, 656 ) 657 return [ 658 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 659 for d in yaml_definitions 660 ] 661 662 raise NotImplementedError( 663 "Docker custom source definitions are not yet supported. " 664 "Only YAML manifest-based custom sources are currently available." 665 ) 666 667 def get_custom_source_definition( 668 self, 669 definition_id: str, 670 *, 671 definition_type: Literal["yaml", "docker"], 672 ) -> CustomCloudSourceDefinition: 673 """Get a specific custom source definition by ID. 674 675 Args: 676 definition_id: The definition ID 677 definition_type: Connector type ("yaml" or "docker"). Required. 678 679 Returns: 680 CustomCloudSourceDefinition object 681 """ 682 if definition_type == "yaml": 683 result = api_util.get_custom_yaml_source_definition( 684 workspace_id=self.workspace_id, 685 definition_id=definition_id, 686 api_root=self.api_root, 687 client_id=self.client_id, 688 client_secret=self.client_secret, 689 ) 690 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 691 692 raise NotImplementedError( 693 "Docker custom source definitions are not yet supported. " 694 "Only YAML manifest-based custom sources are currently available." 695 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
83 @property 84 def workspace_url(self) -> str | None: 85 """The web URL of the workspace.""" 86 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
101 @property 102 def organization_id(self) -> str | None: 103 """The ID of the organization this workspace belongs to. 104 105 This value is cached after the first lookup. 106 """ 107 return self._organization_info.get("organizationId")
The ID of the organization this workspace belongs to.
This value is cached after the first lookup.
109 @property 110 def organization_name(self) -> str | None: 111 """The name of the organization this workspace belongs to. 112 113 This value is cached after the first lookup. 114 """ 115 return self._organization_info.get("organizationName")
The name of the organization this workspace belongs to.
This value is cached after the first lookup.
119 def connect(self) -> None: 120 """Check that the workspace is reachable and raise an exception otherwise. 121 122 Note: It is not necessary to call this method before calling other operations. It 123 serves primarily as a simple check to ensure that the workspace is reachable 124 and credentials are correct. 125 """ 126 _ = api_util.get_workspace( 127 api_root=self.api_root, 128 workspace_id=self.workspace_id, 129 client_id=self.client_id, 130 client_secret=self.client_secret, 131 ) 132 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
136 def get_connection( 137 self, 138 connection_id: str, 139 ) -> CloudConnection: 140 """Get a connection by ID. 141 142 This method does not fetch data from the API. It returns a `CloudConnection` object, 143 which will be loaded lazily as needed. 144 """ 145 return CloudConnection( 146 workspace=self, 147 connection_id=connection_id, 148 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
150 def get_source( 151 self, 152 source_id: str, 153 ) -> CloudSource: 154 """Get a source by ID. 155 156 This method does not fetch data from the API. It returns a `CloudSource` object, 157 which will be loaded lazily as needed. 158 """ 159 return CloudSource( 160 workspace=self, 161 connector_id=source_id, 162 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
164 def get_destination( 165 self, 166 destination_id: str, 167 ) -> CloudDestination: 168 """Get a destination by ID. 169 170 This method does not fetch data from the API. It returns a `CloudDestination` object, 171 which will be loaded lazily as needed. 172 """ 173 return CloudDestination( 174 workspace=self, 175 connector_id=destination_id, 176 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
180 def deploy_source( 181 self, 182 name: str, 183 source: Source, 184 *, 185 unique: bool = True, 186 random_name_suffix: bool = False, 187 ) -> CloudSource: 188 """Deploy a source to the workspace. 189 190 Returns the newly deployed source. 191 192 Args: 193 name: The name to use when deploying. 194 source: The source object to deploy. 195 unique: Whether to require a unique name. If `True`, duplicate names 196 are not allowed. Defaults to `True`. 197 random_name_suffix: Whether to append a random suffix to the name. 198 """ 199 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 200 source_config_dict["sourceType"] = source.name.replace("source-", "") 201 202 if random_name_suffix: 203 name += f" (ID: {text_util.generate_random_suffix()})" 204 205 if unique: 206 existing = self.list_sources(name=name) 207 if existing: 208 raise exc.AirbyteDuplicateResourcesError( 209 resource_type="source", 210 resource_name=name, 211 ) 212 213 deployed_source = api_util.create_source( 214 name=name, 215 api_root=self.api_root, 216 workspace_id=self.workspace_id, 217 config=source_config_dict, 218 client_id=self.client_id, 219 client_secret=self.client_secret, 220 ) 221 return CloudSource( 222 workspace=self, 223 connector_id=deployed_source.source_id, 224 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
226 def deploy_destination( 227 self, 228 name: str, 229 destination: Destination | dict[str, Any], 230 *, 231 unique: bool = True, 232 random_name_suffix: bool = False, 233 ) -> CloudDestination: 234 """Deploy a destination to the workspace. 235 236 Returns the newly deployed destination ID. 237 238 Args: 239 name: The name to use when deploying. 240 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 241 dictionary of configuration values. 242 unique: Whether to require a unique name. If `True`, duplicate names 243 are not allowed. Defaults to `True`. 244 random_name_suffix: Whether to append a random suffix to the name. 245 """ 246 if isinstance(destination, Destination): 247 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 248 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 249 # raise ValueError(destination_conf_dict) 250 else: 251 destination_conf_dict = destination.copy() 252 if "destinationType" not in destination_conf_dict: 253 raise exc.PyAirbyteInputError( 254 message="Missing `destinationType` in configuration dictionary.", 255 ) 256 257 if random_name_suffix: 258 name += f" (ID: {text_util.generate_random_suffix()})" 259 260 if unique: 261 existing = self.list_destinations(name=name) 262 if existing: 263 raise exc.AirbyteDuplicateResourcesError( 264 resource_type="destination", 265 resource_name=name, 266 ) 267 268 deployed_destination = api_util.create_destination( 269 name=name, 270 api_root=self.api_root, 271 workspace_id=self.workspace_id, 272 config=destination_conf_dict, # Wants a dataclass but accepts dict 273 client_id=self.client_id, 274 client_secret=self.client_secret, 275 ) 276 return CloudDestination( 277 workspace=self, 278 connector_id=deployed_destination.destination_id, 279 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
281 def permanently_delete_source( 282 self, 283 source: str | CloudSource, 284 *, 285 safe_mode: bool = True, 286 ) -> None: 287 """Delete a source from the workspace. 288 289 You can pass either the source ID `str` or a deployed `Source` object. 290 291 Args: 292 source: The source ID or CloudSource object to delete 293 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 294 (case insensitive) to prevent accidental deletion. Defaults to True. 295 """ 296 if not isinstance(source, (str, CloudSource)): 297 raise exc.PyAirbyteInputError( 298 message="Invalid source type.", 299 input_value=type(source).__name__, 300 ) 301 302 api_util.delete_source( 303 source_id=source.connector_id if isinstance(source, CloudSource) else source, 304 source_name=source.name if isinstance(source, CloudSource) else None, 305 api_root=self.api_root, 306 client_id=self.client_id, 307 client_secret=self.client_secret, 308 safe_mode=safe_mode, 309 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
313 def permanently_delete_destination( 314 self, 315 destination: str | CloudDestination, 316 *, 317 safe_mode: bool = True, 318 ) -> None: 319 """Delete a deployed destination from the workspace. 320 321 You can pass either the `Cache` class or the deployed destination ID as a `str`. 322 323 Args: 324 destination: The destination ID or CloudDestination object to delete 325 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 326 (case insensitive) to prevent accidental deletion. Defaults to True. 327 """ 328 if not isinstance(destination, (str, CloudDestination)): 329 raise exc.PyAirbyteInputError( 330 message="Invalid destination type.", 331 input_value=type(destination).__name__, 332 ) 333 334 api_util.delete_destination( 335 destination_id=( 336 destination if isinstance(destination, str) else destination.destination_id 337 ), 338 destination_name=( 339 destination.name if isinstance(destination, CloudDestination) else None 340 ), 341 api_root=self.api_root, 342 client_id=self.client_id, 343 client_secret=self.client_secret, 344 safe_mode=safe_mode, 345 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
349 def deploy_connection( 350 self, 351 connection_name: str, 352 *, 353 source: CloudSource | str, 354 selected_streams: list[str], 355 destination: CloudDestination | str, 356 table_prefix: str | None = None, 357 ) -> CloudConnection: 358 """Create a new connection between an already deployed source and destination. 359 360 Returns the newly deployed connection object. 361 362 Args: 363 connection_name: The name of the connection. 364 source: The deployed source. You can pass a source ID or a CloudSource object. 365 destination: The deployed destination. You can pass a destination ID or a 366 CloudDestination object. 367 table_prefix: Optional. The table prefix to use when syncing to the destination. 368 selected_streams: The selected stream names to sync within the connection. 369 """ 370 if not selected_streams: 371 raise exc.PyAirbyteInputError( 372 guidance="You must provide `selected_streams` when creating a connection." 373 ) 374 375 source_id: str = source if isinstance(source, str) else source.connector_id 376 destination_id: str = ( 377 destination if isinstance(destination, str) else destination.connector_id 378 ) 379 380 deployed_connection = api_util.create_connection( 381 name=connection_name, 382 source_id=source_id, 383 destination_id=destination_id, 384 api_root=self.api_root, 385 workspace_id=self.workspace_id, 386 selected_stream_names=selected_streams, 387 prefix=table_prefix or "", 388 client_id=self.client_id, 389 client_secret=self.client_secret, 390 ) 391 392 return CloudConnection( 393 workspace=self, 394 connection_id=deployed_connection.connection_id, 395 source=deployed_connection.source_id, 396 destination=deployed_connection.destination_id, 397 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
399 def permanently_delete_connection( 400 self, 401 connection: str | CloudConnection, 402 *, 403 cascade_delete_source: bool = False, 404 cascade_delete_destination: bool = False, 405 safe_mode: bool = True, 406 ) -> None: 407 """Delete a deployed connection from the workspace. 408 409 Args: 410 connection: The connection ID or CloudConnection object to delete 411 cascade_delete_source: If True, also delete the source after deleting the connection 412 cascade_delete_destination: If True, also delete the destination after deleting 413 the connection 414 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 415 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 416 to cascade deletes. 417 """ 418 if connection is None: 419 raise ValueError("No connection ID provided.") 420 421 if isinstance(connection, str): 422 connection = CloudConnection( 423 workspace=self, 424 connection_id=connection, 425 ) 426 427 api_util.delete_connection( 428 connection_id=connection.connection_id, 429 connection_name=connection.name, 430 api_root=self.api_root, 431 workspace_id=self.workspace_id, 432 client_id=self.client_id, 433 client_secret=self.client_secret, 434 safe_mode=safe_mode, 435 ) 436 437 if cascade_delete_source: 438 self.permanently_delete_source( 439 source=connection.source_id, 440 safe_mode=safe_mode, 441 ) 442 if cascade_delete_destination: 443 self.permanently_delete_destination( 444 destination=connection.destination_id, 445 safe_mode=safe_mode, 446 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
450 def list_connections( 451 self, 452 name: str | None = None, 453 *, 454 name_filter: Callable | None = None, 455 ) -> list[CloudConnection]: 456 """List connections by name in the workspace. 457 458 TODO: Add pagination support 459 """ 460 connections = api_util.list_connections( 461 api_root=self.api_root, 462 workspace_id=self.workspace_id, 463 name=name, 464 name_filter=name_filter, 465 client_id=self.client_id, 466 client_secret=self.client_secret, 467 ) 468 return [ 469 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 470 workspace=self, 471 connection_response=connection, 472 ) 473 for connection in connections 474 if name is None or connection.name == name 475 ]
List connections by name in the workspace.
TODO: Add pagination support
477 def list_sources( 478 self, 479 name: str | None = None, 480 *, 481 name_filter: Callable | None = None, 482 ) -> list[CloudSource]: 483 """List all sources in the workspace. 484 485 TODO: Add pagination support 486 """ 487 sources = api_util.list_sources( 488 api_root=self.api_root, 489 workspace_id=self.workspace_id, 490 name=name, 491 name_filter=name_filter, 492 client_id=self.client_id, 493 client_secret=self.client_secret, 494 ) 495 return [ 496 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 497 workspace=self, 498 source_response=source, 499 ) 500 for source in sources 501 if name is None or source.name == name 502 ]
List all sources in the workspace.
TODO: Add pagination support
504 def list_destinations( 505 self, 506 name: str | None = None, 507 *, 508 name_filter: Callable | None = None, 509 ) -> list[CloudDestination]: 510 """List all destinations in the workspace. 511 512 TODO: Add pagination support 513 """ 514 destinations = api_util.list_destinations( 515 api_root=self.api_root, 516 workspace_id=self.workspace_id, 517 name=name, 518 name_filter=name_filter, 519 client_id=self.client_id, 520 client_secret=self.client_secret, 521 ) 522 return [ 523 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 524 workspace=self, 525 destination_response=destination, 526 ) 527 for destination in destinations 528 if name is None or destination.name == name 529 ]
List all destinations in the workspace.
TODO: Add pagination support
531 def publish_custom_source_definition( 532 self, 533 name: str, 534 *, 535 manifest_yaml: dict[str, Any] | Path | str | None = None, 536 docker_image: str | None = None, 537 docker_tag: str | None = None, 538 unique: bool = True, 539 pre_validate: bool = True, 540 testing_values: dict[str, Any] | None = None, 541 ) -> CustomCloudSourceDefinition: 542 """Publish a custom source connector definition. 543 544 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 545 and docker_tag (for Docker connectors), but not both. 546 547 Args: 548 name: Display name for the connector definition 549 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 550 docker_image: Docker repository (e.g., 'airbyte/source-custom') 551 docker_tag: Docker image tag (e.g., '1.0.0') 552 unique: Whether to enforce name uniqueness 553 pre_validate: Whether to validate manifest client-side (YAML only) 554 testing_values: Optional configuration values to use for testing in the 555 Connector Builder UI. If provided, these values are stored as the complete 556 testing values object for the connector builder project (replaces any existing 557 values), allowing immediate test read operations. 558 559 Returns: 560 CustomCloudSourceDefinition object representing the created definition 561 562 Raises: 563 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 564 AirbyteDuplicateResourcesError: If unique=True and name already exists 565 """ 566 is_yaml = manifest_yaml is not None 567 is_docker = docker_image is not None 568 569 if is_yaml == is_docker: 570 raise exc.PyAirbyteInputError( 571 message=( 572 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 573 "docker_image + docker_tag (for Docker connectors), but not both" 574 ), 575 context={ 576 "manifest_yaml_provided": is_yaml, 577 "docker_image_provided": is_docker, 578 }, 579 ) 580 581 if is_docker and docker_tag is None: 582 raise exc.PyAirbyteInputError( 583 message="docker_tag is required when docker_image is specified", 584 context={"docker_image": docker_image}, 585 ) 586 587 if unique: 588 existing = self.list_custom_source_definitions( 589 definition_type="yaml" if is_yaml else "docker", 590 ) 591 if any(d.name == name for d in existing): 592 raise exc.AirbyteDuplicateResourcesError( 593 resource_type="custom_source_definition", 594 resource_name=name, 595 ) 596 597 if is_yaml: 598 manifest_dict: dict[str, Any] 599 if isinstance(manifest_yaml, Path): 600 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 601 elif isinstance(manifest_yaml, str): 602 manifest_dict = yaml.safe_load(manifest_yaml) 603 elif manifest_yaml is not None: 604 manifest_dict = manifest_yaml 605 else: 606 raise exc.PyAirbyteInputError( 607 message="manifest_yaml is required for YAML connectors", 608 context={"name": name}, 609 ) 610 611 if pre_validate: 612 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 613 614 result = api_util.create_custom_yaml_source_definition( 615 name=name, 616 workspace_id=self.workspace_id, 617 manifest=manifest_dict, 618 api_root=self.api_root, 619 client_id=self.client_id, 620 client_secret=self.client_secret, 621 ) 622 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 623 self, result 624 ) 625 626 # Set testing values if provided 627 if testing_values is not None: 628 custom_definition.set_testing_values(testing_values) 629 630 return custom_definition 631 632 raise NotImplementedError( 633 "Docker custom source definitions are not yet supported. " 634 "Only YAML manifest-based custom sources are currently available." 635 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
637 def list_custom_source_definitions( 638 self, 639 *, 640 definition_type: Literal["yaml", "docker"], 641 ) -> list[CustomCloudSourceDefinition]: 642 """List custom source connector definitions. 643 644 Args: 645 definition_type: Connector type to list ("yaml" or "docker"). Required. 646 647 Returns: 648 List of CustomCloudSourceDefinition objects matching the specified type 649 """ 650 if definition_type == "yaml": 651 yaml_definitions = api_util.list_custom_yaml_source_definitions( 652 workspace_id=self.workspace_id, 653 api_root=self.api_root, 654 client_id=self.client_id, 655 client_secret=self.client_secret, 656 ) 657 return [ 658 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 659 for d in yaml_definitions 660 ] 661 662 raise NotImplementedError( 663 "Docker custom source definitions are not yet supported. " 664 "Only YAML manifest-based custom sources are currently available." 665 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
667 def get_custom_source_definition( 668 self, 669 definition_id: str, 670 *, 671 definition_type: Literal["yaml", "docker"], 672 ) -> CustomCloudSourceDefinition: 673 """Get a specific custom source definition by ID. 674 675 Args: 676 definition_id: The definition ID 677 definition_type: Connector type ("yaml" or "docker"). Required. 678 679 Returns: 680 CustomCloudSourceDefinition object 681 """ 682 if definition_type == "yaml": 683 result = api_util.get_custom_yaml_source_definition( 684 workspace_id=self.workspace_id, 685 definition_id=definition_id, 686 api_root=self.api_root, 687 client_id=self.client_id, 688 client_secret=self.client_secret, 689 ) 690 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 691 692 raise NotImplementedError( 693 "Docker custom source definitions are not yet supported. " 694 "Only YAML manifest-based custom sources are currently available." 695 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 @classmethod 69 def _from_connection_response( 70 cls, 71 workspace: CloudWorkspace, 72 connection_response: ConnectionResponse, 73 ) -> CloudConnection: 74 """Create a CloudConnection from a ConnectionResponse.""" 75 result = cls( 76 workspace=workspace, 77 connection_id=connection_response.connection_id, 78 source=connection_response.source_id, 79 destination=connection_response.destination_id, 80 ) 81 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 82 return result 83 84 # Properties 85 86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name 96 97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return self._source_id 107 108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object 119 120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.destination_id 128 129 return self._destination_id 130 131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object 142 143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []] 150 151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or "" 158 159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 163 164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline" 168 169 # Run Sync 170 171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result 199 200 def __repr__(self) -> str: 201 """String representation of the connection.""" 202 return ( 203 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 204 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 205 ) 206 207 # Logs 208 209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ] 232 233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 ) 259 260 def rename(self, name: str) -> CloudConnection: 261 """Rename the connection. 262 263 Args: 264 name: New name for the connection 265 266 Returns: 267 Updated CloudConnection object with refreshed info 268 """ 269 updated_response = api_util.patch_connection( 270 connection_id=self.connection_id, 271 api_root=self.workspace.api_root, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 name=name, 275 ) 276 self._connection_info = updated_response 277 return self 278 279 def set_table_prefix(self, prefix: str) -> CloudConnection: 280 """Set the table prefix for the connection. 281 282 Args: 283 prefix: New table prefix to use when syncing to the destination 284 285 Returns: 286 Updated CloudConnection object with refreshed info 287 """ 288 updated_response = api_util.patch_connection( 289 connection_id=self.connection_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 prefix=prefix, 294 ) 295 self._connection_info = updated_response 296 return self 297 298 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 299 """Set the selected streams for the connection. 300 301 This is a destructive operation that can break existing connections if the 302 stream selection is changed incorrectly. Use with caution. 303 304 Args: 305 stream_names: List of stream names to sync 306 307 Returns: 308 Updated CloudConnection object with refreshed info 309 """ 310 configurations = api_util.build_stream_configurations(stream_names) 311 312 updated_response = api_util.patch_connection( 313 connection_id=self.connection_id, 314 api_root=self.workspace.api_root, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 configurations=configurations, 318 ) 319 self._connection_info = updated_response 320 return self 321 322 # Deletions 323 324 def permanently_delete( 325 self, 326 *, 327 cascade_delete_source: bool = False, 328 cascade_delete_destination: bool = False, 329 ) -> None: 330 """Delete the connection. 331 332 Args: 333 cascade_delete_source: Whether to also delete the source. 334 cascade_delete_destination: Whether to also delete the destination. 335 """ 336 self.workspace.permanently_delete_connection(self) 337 338 if cascade_delete_source: 339 self.workspace.permanently_delete_source(self.source_id) 340 341 if cascade_delete_destination: 342 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return self._source_id
The ID of the source.
108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object
Get the source object.
120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.destination_id 128 129 return self._destination_id
The ID of the destination.
131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object
Get the destination object.
143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or ""
The table prefix.
159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result
Run a sync.
209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ]
Get the previous sync logs for a connection.
233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
260 def rename(self, name: str) -> CloudConnection: 261 """Rename the connection. 262 263 Args: 264 name: New name for the connection 265 266 Returns: 267 Updated CloudConnection object with refreshed info 268 """ 269 updated_response = api_util.patch_connection( 270 connection_id=self.connection_id, 271 api_root=self.workspace.api_root, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 name=name, 275 ) 276 self._connection_info = updated_response 277 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
279 def set_table_prefix(self, prefix: str) -> CloudConnection: 280 """Set the table prefix for the connection. 281 282 Args: 283 prefix: New table prefix to use when syncing to the destination 284 285 Returns: 286 Updated CloudConnection object with refreshed info 287 """ 288 updated_response = api_util.patch_connection( 289 connection_id=self.connection_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 prefix=prefix, 294 ) 295 self._connection_info = updated_response 296 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
298 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 299 """Set the selected streams for the connection. 300 301 This is a destructive operation that can break existing connections if the 302 stream selection is changed incorrectly. Use with caution. 303 304 Args: 305 stream_names: List of stream names to sync 306 307 Returns: 308 Updated CloudConnection object with refreshed info 309 """ 310 configurations = api_util.build_stream_configurations(stream_names) 311 312 updated_response = api_util.patch_connection( 313 connection_id=self.connection_id, 314 api_root=self.workspace.api_root, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 configurations=configurations, 318 ) 319 self._connection_info = updated_response 320 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
324 def permanently_delete( 325 self, 326 *, 327 cascade_delete_source: bool = False, 328 cascade_delete_destination: bool = False, 329 ) -> None: 330 """Delete the connection. 331 332 Args: 333 cascade_delete_source: Whether to also delete the source. 334 cascade_delete_destination: Whether to also delete the destination. 335 """ 336 self.workspace.permanently_delete_connection(self) 337 338 if cascade_delete_source: 339 self.workspace.permanently_delete_source(self.source_id) 340 341 if cascade_delete_destination: 342 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: JobResponse | None = None 232 _connection_response: ConnectionResponse | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = api_util.get_connection( 254 workspace_id=self.workspace.workspace_id, 255 api_root=self.workspace.api_root, 256 connection_id=self.connection.connection_id, 257 client_id=self.workspace.client_id, 258 client_secret=self.workspace.client_secret, 259 ) 260 return self._connection_response 261 262 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 263 """Return the destination configuration for the sync job.""" 264 connection_info: ConnectionResponse = self._get_connection_info(force_refresh=force_refresh) 265 destination_response = api_util.get_destination( 266 destination_id=connection_info.destination_id, 267 api_root=self.workspace.api_root, 268 client_id=self.workspace.client_id, 269 client_secret=self.workspace.client_secret, 270 ) 271 return asdict(destination_response.configuration) 272 273 def is_job_complete(self) -> bool: 274 """Check if the sync job is complete.""" 275 return self.get_job_status() in FINAL_STATUSES 276 277 def get_job_status(self) -> JobStatusEnum: 278 """Check if the sync job is still running.""" 279 return self._fetch_latest_job_info().status 280 281 def _fetch_latest_job_info(self) -> JobResponse: 282 """Return the job info for the sync job.""" 283 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 284 return self._latest_job_info 285 286 self._latest_job_info = api_util.get_job_info( 287 job_id=self.job_id, 288 api_root=self.workspace.api_root, 289 client_id=self.workspace.client_id, 290 client_secret=self.workspace.client_secret, 291 ) 292 return self._latest_job_info 293 294 @property 295 def bytes_synced(self) -> int: 296 """Return the number of records processed.""" 297 return self._fetch_latest_job_info().bytes_synced or 0 298 299 @property 300 def records_synced(self) -> int: 301 """Return the number of records processed.""" 302 return self._fetch_latest_job_info().rows_synced or 0 303 304 @property 305 def start_time(self) -> datetime: 306 """Return the start time of the sync job in UTC.""" 307 try: 308 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 309 except (ValueError, TypeError) as e: 310 if "Invalid isoformat string" in str(e): 311 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 312 api_root=self.workspace.api_root, 313 path="/jobs/get", 314 json={"id": self.job_id}, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 ) 318 raw_start_time = job_info_raw.get("startTime") 319 if raw_start_time: 320 return ab_datetime_parse(raw_start_time) 321 raise 322 323 def _fetch_job_with_attempts(self) -> dict[str, Any]: 324 """Fetch job info with attempts from Config API using lazy loading pattern.""" 325 if self._job_with_attempts_info is not None: 326 return self._job_with_attempts_info 327 328 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 329 api_root=self.workspace.api_root, 330 path="/jobs/get", 331 json={ 332 "id": self.job_id, 333 }, 334 client_id=self.workspace.client_id, 335 client_secret=self.workspace.client_secret, 336 ) 337 return self._job_with_attempts_info 338 339 def get_attempts(self) -> list[SyncAttempt]: 340 """Return a list of attempts for this sync job.""" 341 job_with_attempts = self._fetch_job_with_attempts() 342 attempts_data = job_with_attempts.get("attempts", []) 343 344 return [ 345 SyncAttempt( 346 workspace=self.workspace, 347 connection=self.connection, 348 job_id=self.job_id, 349 attempt_number=i, 350 _attempt_data=attempt_data, 351 ) 352 for i, attempt_data in enumerate(attempts_data, start=0) 353 ] 354 355 def raise_failure_status( 356 self, 357 *, 358 refresh_status: bool = False, 359 ) -> None: 360 """Raise an exception if the sync job failed. 361 362 By default, this method will use the latest status available. If you want to refresh the 363 status before checking for failure, set `refresh_status=True`. If the job has failed, this 364 method will raise a `AirbyteConnectionSyncError`. 365 366 Otherwise, do nothing. 367 """ 368 if not refresh_status and self._latest_job_info: 369 latest_status = self._latest_job_info.status 370 else: 371 latest_status = self.get_job_status() 372 373 if latest_status in FAILED_STATUSES: 374 raise AirbyteConnectionSyncError( 375 workspace=self.workspace, 376 connection_id=self.connection.connection_id, 377 job_id=self.job_id, 378 job_status=self.get_job_status(), 379 ) 380 381 def wait_for_completion( 382 self, 383 *, 384 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 385 raise_timeout: bool = True, 386 raise_failure: bool = False, 387 ) -> JobStatusEnum: 388 """Wait for a job to finish running.""" 389 start_time = time.time() 390 while True: 391 latest_status = self.get_job_status() 392 if latest_status in FINAL_STATUSES: 393 if raise_failure: 394 # No-op if the job succeeded or is still running: 395 self.raise_failure_status() 396 397 return latest_status 398 399 if time.time() - start_time > wait_timeout: 400 if raise_timeout: 401 raise AirbyteConnectionSyncTimeoutError( 402 workspace=self.workspace, 403 connection_id=self.connection.connection_id, 404 job_id=self.job_id, 405 job_status=latest_status, 406 timeout=wait_timeout, 407 ) 408 409 return latest_status # This will be a non-final status 410 411 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 412 413 def get_sql_cache(self) -> CacheBase: 414 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 415 if self._cache: 416 return self._cache 417 418 destination_configuration = self._get_destination_configuration() 419 self._cache = destination_to_cache(destination_configuration=destination_configuration) 420 return self._cache 421 422 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 423 """Return a SQL Engine for querying a SQL-based destination.""" 424 return self.get_sql_cache().get_sql_engine() 425 426 def get_sql_table_name(self, stream_name: str) -> str: 427 """Return the SQL table name of the named stream.""" 428 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 429 430 def get_sql_table( 431 self, 432 stream_name: str, 433 ) -> sqlalchemy.Table: 434 """Return a SQLAlchemy table object for the named stream.""" 435 return self.get_sql_cache().processor.get_sql_table(stream_name) 436 437 def get_dataset(self, stream_name: str) -> CachedDataset: 438 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 439 440 This can be used to read and analyze the data in a SQL-based destination. 441 442 TODO: In a future iteration, we can consider providing stream configuration information 443 (catalog information) to the `CachedDataset` object via the "Get stream properties" 444 API: https://reference.airbyte.com/reference/getstreamproperties 445 """ 446 return CachedDataset( 447 self.get_sql_cache(), 448 stream_name=stream_name, 449 stream_configuration=False, # Don't look for stream configuration in cache. 450 ) 451 452 def get_sql_database_name(self) -> str: 453 """Return the SQL database name.""" 454 cache = self.get_sql_cache() 455 return cache.get_database_name() 456 457 def get_sql_schema_name(self) -> str: 458 """Return the SQL schema name.""" 459 cache = self.get_sql_cache() 460 return cache.schema_name 461 462 @property 463 def stream_names(self) -> list[str]: 464 """Return the set of stream names.""" 465 return self.connection.stream_names 466 467 @final 468 @property 469 def streams( 470 self, 471 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 472 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 473 474 This is a convenience wrapper around the `stream_names` 475 property and `get_dataset()` method. 476 """ 477 return self._SyncResultStreams(self) 478 479 class _SyncResultStreams(Mapping[str, CachedDataset]): 480 """A mapping of stream names to cached datasets.""" 481 482 def __init__( 483 self, 484 parent: SyncResult, 485 /, 486 ) -> None: 487 self.parent: SyncResult = parent 488 489 def __getitem__(self, key: str) -> CachedDataset: 490 return self.parent.get_dataset(stream_name=key) 491 492 def __iter__(self) -> Iterator[str]: 493 return iter(self.parent.stream_names) 494 495 def __len__(self) -> int: 496 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
273 def is_job_complete(self) -> bool: 274 """Check if the sync job is complete.""" 275 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
277 def get_job_status(self) -> JobStatusEnum: 278 """Check if the sync job is still running.""" 279 return self._fetch_latest_job_info().status
Check if the sync job is still running.
294 @property 295 def bytes_synced(self) -> int: 296 """Return the number of records processed.""" 297 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
299 @property 300 def records_synced(self) -> int: 301 """Return the number of records processed.""" 302 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
304 @property 305 def start_time(self) -> datetime: 306 """Return the start time of the sync job in UTC.""" 307 try: 308 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 309 except (ValueError, TypeError) as e: 310 if "Invalid isoformat string" in str(e): 311 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 312 api_root=self.workspace.api_root, 313 path="/jobs/get", 314 json={"id": self.job_id}, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 ) 318 raw_start_time = job_info_raw.get("startTime") 319 if raw_start_time: 320 return ab_datetime_parse(raw_start_time) 321 raise
Return the start time of the sync job in UTC.
339 def get_attempts(self) -> list[SyncAttempt]: 340 """Return a list of attempts for this sync job.""" 341 job_with_attempts = self._fetch_job_with_attempts() 342 attempts_data = job_with_attempts.get("attempts", []) 343 344 return [ 345 SyncAttempt( 346 workspace=self.workspace, 347 connection=self.connection, 348 job_id=self.job_id, 349 attempt_number=i, 350 _attempt_data=attempt_data, 351 ) 352 for i, attempt_data in enumerate(attempts_data, start=0) 353 ]
Return a list of attempts for this sync job.
355 def raise_failure_status( 356 self, 357 *, 358 refresh_status: bool = False, 359 ) -> None: 360 """Raise an exception if the sync job failed. 361 362 By default, this method will use the latest status available. If you want to refresh the 363 status before checking for failure, set `refresh_status=True`. If the job has failed, this 364 method will raise a `AirbyteConnectionSyncError`. 365 366 Otherwise, do nothing. 367 """ 368 if not refresh_status and self._latest_job_info: 369 latest_status = self._latest_job_info.status 370 else: 371 latest_status = self.get_job_status() 372 373 if latest_status in FAILED_STATUSES: 374 raise AirbyteConnectionSyncError( 375 workspace=self.workspace, 376 connection_id=self.connection.connection_id, 377 job_id=self.job_id, 378 job_status=self.get_job_status(), 379 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
381 def wait_for_completion( 382 self, 383 *, 384 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 385 raise_timeout: bool = True, 386 raise_failure: bool = False, 387 ) -> JobStatusEnum: 388 """Wait for a job to finish running.""" 389 start_time = time.time() 390 while True: 391 latest_status = self.get_job_status() 392 if latest_status in FINAL_STATUSES: 393 if raise_failure: 394 # No-op if the job succeeded or is still running: 395 self.raise_failure_status() 396 397 return latest_status 398 399 if time.time() - start_time > wait_timeout: 400 if raise_timeout: 401 raise AirbyteConnectionSyncTimeoutError( 402 workspace=self.workspace, 403 connection_id=self.connection.connection_id, 404 job_id=self.job_id, 405 job_status=latest_status, 406 timeout=wait_timeout, 407 ) 408 409 return latest_status # This will be a non-final status 410 411 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
413 def get_sql_cache(self) -> CacheBase: 414 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 415 if self._cache: 416 return self._cache 417 418 destination_configuration = self._get_destination_configuration() 419 self._cache = destination_to_cache(destination_configuration=destination_configuration) 420 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
422 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 423 """Return a SQL Engine for querying a SQL-based destination.""" 424 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
426 def get_sql_table_name(self, stream_name: str) -> str: 427 """Return the SQL table name of the named stream.""" 428 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
430 def get_sql_table( 431 self, 432 stream_name: str, 433 ) -> sqlalchemy.Table: 434 """Return a SQLAlchemy table object for the named stream.""" 435 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
437 def get_dataset(self, stream_name: str) -> CachedDataset: 438 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 439 440 This can be used to read and analyze the data in a SQL-based destination. 441 442 TODO: In a future iteration, we can consider providing stream configuration information 443 (catalog information) to the `CachedDataset` object via the "Get stream properties" 444 API: https://reference.airbyte.com/reference/getstreamproperties 445 """ 446 return CachedDataset( 447 self.get_sql_cache(), 448 stream_name=stream_name, 449 stream_configuration=False, # Don't look for stream configuration in cache. 450 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
452 def get_sql_database_name(self) -> str: 453 """Return the SQL database name.""" 454 cache = self.get_sql_cache() 455 return cache.get_database_name()
Return the SQL database name.
457 def get_sql_schema_name(self) -> str: 458 """Return the SQL schema name.""" 459 cache = self.get_sql_cache() 460 return cache.schema_name
Return the SQL schema name.
462 @property 463 def stream_names(self) -> list[str]: 464 """Return the set of stream names.""" 465 return self.connection.stream_names
Return the set of stream names.
467 @final 468 @property 469 def streams( 470 self, 471 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 472 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 473 474 This is a convenience wrapper around the `stream_names` 475 property and `get_dataset()` method. 476 """ 477 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
8class JobStatusEnum(str, Enum): 9 PENDING = 'pending' 10 RUNNING = 'running' 11 INCOMPLETE = 'incomplete' 12 FAILED = 'failed' 13 SUCCEEDED = 'succeeded' 14 CANCELLED = 'cancelled'
An enumeration.