airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6import logging 7from typing import TYPE_CHECKING, Any 8 9from typing_extensions import deprecated 10 11from airbyte._util import api_util 12from airbyte.cloud._connection_state import ( 13 ConnectionStateResponse, 14 _get_stream_list, 15 _match_stream, 16) 17from airbyte.cloud.connectors import CloudDestination, CloudSource 18from airbyte.cloud.sync_results import SyncResult 19from airbyte.exceptions import AirbyteWorkspaceMismatchError, PyAirbyteInputError 20 21 22logger = logging.getLogger(__name__) 23 24 25if TYPE_CHECKING: 26 from airbyte_api.models import ConnectionResponse, JobResponse, JobTypeEnum 27 28 from airbyte.cloud.workspaces import CloudWorkspace 29 30 31class CloudConnection: # noqa: PLR0904 # Too many public methods 32 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 33 34 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 35 """ 36 37 def __init__( 38 self, 39 workspace: CloudWorkspace, 40 connection_id: str, 41 source: str | None = None, 42 destination: str | None = None, 43 ) -> None: 44 """It is not recommended to create a `CloudConnection` object directly. 45 46 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 47 """ 48 self.connection_id = connection_id 49 """The ID of the connection.""" 50 51 self.workspace = workspace 52 """The workspace that the connection belongs to.""" 53 54 self._source_id = source 55 """The ID of the source.""" 56 57 self._destination_id = destination 58 """The ID of the destination.""" 59 60 self._connection_info: ConnectionResponse | None = None 61 """The connection info object. (Cached.)""" 62 63 self._cloud_source_object: CloudSource | None = None 64 """The source object. (Cached.)""" 65 66 self._cloud_destination_object: CloudDestination | None = None 67 """The destination object. (Cached.)""" 68 69 def _fetch_connection_info( 70 self, 71 *, 72 force_refresh: bool = False, 73 verify: bool = True, 74 ) -> ConnectionResponse: 75 """Fetch and cache connection info from the API. 76 77 By default, this method will only fetch from the API if connection info is not 78 already cached. It also verifies that the connection belongs to the expected 79 workspace unless verification is explicitly disabled. 80 81 Args: 82 force_refresh: If True, always fetch from the API even if cached. 83 If False (default), only fetch if not already cached. 84 verify: If True (default), verify that the connection is valid (e.g., that 85 the workspace_id matches this object's workspace). Raises an error if 86 validation fails. 87 88 Returns: 89 The ConnectionResponse from the API. 90 91 Raises: 92 AirbyteWorkspaceMismatchError: If verify is True and the connection's 93 workspace_id doesn't match the expected workspace. 94 AirbyteMissingResourceError: If the connection doesn't exist. 95 """ 96 if not force_refresh and self._connection_info is not None: 97 # Use cached info, but still verify if requested 98 if verify: 99 self._verify_workspace_match(self._connection_info) 100 return self._connection_info 101 102 # Fetch from API 103 connection_info = api_util.get_connection( 104 workspace_id=self.workspace.workspace_id, 105 connection_id=self.connection_id, 106 api_root=self.workspace.api_root, 107 client_id=self.workspace.client_id, 108 client_secret=self.workspace.client_secret, 109 bearer_token=self.workspace.bearer_token, 110 ) 111 112 # Cache the result first (before verification may raise) 113 self._connection_info = connection_info 114 115 # Verify if requested 116 if verify: 117 self._verify_workspace_match(connection_info) 118 119 return connection_info 120 121 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 122 """Verify that the connection belongs to the expected workspace. 123 124 Raises: 125 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 126 """ 127 if connection_info.workspace_id != self.workspace.workspace_id: 128 raise AirbyteWorkspaceMismatchError( 129 resource_type="connection", 130 resource_id=self.connection_id, 131 workspace=self.workspace, 132 expected_workspace_id=self.workspace.workspace_id, 133 actual_workspace_id=connection_info.workspace_id, 134 message=( 135 f"Connection '{self.connection_id}' belongs to workspace " 136 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 137 ), 138 ) 139 140 def check_is_valid(self) -> bool: 141 """Check if this connection exists and belongs to the expected workspace. 142 143 This method fetches connection info from the API (if not already cached) and 144 verifies that the connection's workspace_id matches the workspace associated 145 with this CloudConnection object. 146 147 Returns: 148 True if the connection exists and belongs to the expected workspace. 149 150 Raises: 151 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 152 AirbyteMissingResourceError: If the connection doesn't exist. 153 """ 154 self._fetch_connection_info(force_refresh=False, verify=True) 155 return True 156 157 @classmethod 158 def _from_connection_response( 159 cls, 160 workspace: CloudWorkspace, 161 connection_response: ConnectionResponse, 162 ) -> CloudConnection: 163 """Create a CloudConnection from a ConnectionResponse.""" 164 result = cls( 165 workspace=workspace, 166 connection_id=connection_response.connection_id, 167 source=connection_response.source_id, 168 destination=connection_response.destination_id, 169 ) 170 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 171 return result 172 173 # Properties 174 175 @property 176 def name(self) -> str | None: 177 """Get the display name of the connection, if available. 178 179 E.g. "My Postgres to Snowflake", not the connection ID. 180 """ 181 if not self._connection_info: 182 self._connection_info = self._fetch_connection_info() 183 184 return self._connection_info.name 185 186 @property 187 def source_id(self) -> str: 188 """The ID of the source.""" 189 if not self._source_id: 190 if not self._connection_info: 191 self._connection_info = self._fetch_connection_info() 192 193 self._source_id = self._connection_info.source_id 194 195 return self._source_id 196 197 @property 198 def source(self) -> CloudSource: 199 """Get the source object.""" 200 if self._cloud_source_object: 201 return self._cloud_source_object 202 203 self._cloud_source_object = CloudSource( 204 workspace=self.workspace, 205 connector_id=self.source_id, 206 ) 207 return self._cloud_source_object 208 209 @property 210 def destination_id(self) -> str: 211 """The ID of the destination.""" 212 if not self._destination_id: 213 if not self._connection_info: 214 self._connection_info = self._fetch_connection_info() 215 216 self._destination_id = self._connection_info.destination_id 217 218 return self._destination_id 219 220 @property 221 def destination(self) -> CloudDestination: 222 """Get the destination object.""" 223 if self._cloud_destination_object: 224 return self._cloud_destination_object 225 226 self._cloud_destination_object = CloudDestination( 227 workspace=self.workspace, 228 connector_id=self.destination_id, 229 ) 230 return self._cloud_destination_object 231 232 @property 233 def stream_names(self) -> list[str]: 234 """The stream names.""" 235 if not self._connection_info: 236 self._connection_info = self._fetch_connection_info() 237 238 return [stream.name for stream in self._connection_info.configurations.streams or []] 239 240 @property 241 def table_prefix(self) -> str: 242 """The table prefix.""" 243 if not self._connection_info: 244 self._connection_info = self._fetch_connection_info() 245 246 return self._connection_info.prefix or "" 247 248 @property 249 def connection_url(self) -> str | None: 250 """The web URL to the connection.""" 251 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 252 253 @property 254 def job_history_url(self) -> str | None: 255 """The URL to the job history for the connection.""" 256 return f"{self.connection_url}/timeline" 257 258 # Run Sync 259 260 def run_sync( 261 self, 262 *, 263 wait: bool = True, 264 wait_timeout: int = 300, 265 ) -> SyncResult: 266 """Run a sync.""" 267 connection_response = api_util.run_connection( 268 connection_id=self.connection_id, 269 api_root=self.workspace.api_root, 270 workspace_id=self.workspace.workspace_id, 271 client_id=self.workspace.client_id, 272 client_secret=self.workspace.client_secret, 273 bearer_token=self.workspace.bearer_token, 274 ) 275 sync_result = SyncResult( 276 workspace=self.workspace, 277 connection=self, 278 job_id=connection_response.job_id, 279 ) 280 281 if wait: 282 sync_result.wait_for_completion( 283 wait_timeout=wait_timeout, 284 raise_failure=True, 285 raise_timeout=True, 286 ) 287 288 return sync_result 289 290 def __repr__(self) -> str: 291 """String representation of the connection.""" 292 return ( 293 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 294 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 295 ) 296 297 # Logs 298 299 def get_previous_sync_logs( 300 self, 301 *, 302 limit: int = 20, 303 offset: int | None = None, 304 from_tail: bool = True, 305 job_type: JobTypeEnum | None = None, 306 ) -> list[SyncResult]: 307 """Get previous sync jobs for a connection with pagination support. 308 309 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 310 rows_synced, start_time). Full log text can be fetched lazily via 311 `SyncResult.get_full_log_text()`. 312 313 Args: 314 limit: Maximum number of jobs to return. Defaults to 20. 315 offset: Number of jobs to skip from the beginning. Defaults to None (0). 316 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 317 If False, returns jobs ordered oldest-first (createdAt ASC). 318 Defaults to True. 319 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 320 If not specified, defaults to sync and reset jobs only (API default behavior). 321 322 Returns: 323 A list of SyncResult objects representing the sync jobs. 324 """ 325 order_by = ( 326 api_util.JOB_ORDER_BY_CREATED_AT_DESC 327 if from_tail 328 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 329 ) 330 sync_logs: list[JobResponse] = api_util.get_job_logs( 331 connection_id=self.connection_id, 332 api_root=self.workspace.api_root, 333 workspace_id=self.workspace.workspace_id, 334 limit=limit, 335 offset=offset, 336 order_by=order_by, 337 job_type=job_type, 338 client_id=self.workspace.client_id, 339 client_secret=self.workspace.client_secret, 340 bearer_token=self.workspace.bearer_token, 341 ) 342 return [ 343 SyncResult( 344 workspace=self.workspace, 345 connection=self, 346 job_id=sync_log.job_id, 347 _latest_job_info=sync_log, 348 ) 349 for sync_log in sync_logs 350 ] 351 352 def get_sync_result( 353 self, 354 job_id: int | None = None, 355 ) -> SyncResult | None: 356 """Get the sync result for the connection. 357 358 If `job_id` is not provided, the most recent sync job will be used. 359 360 Returns `None` if job_id is omitted and no previous jobs are found. 361 """ 362 if job_id is None: 363 # Get the most recent sync job 364 results = self.get_previous_sync_logs( 365 limit=1, 366 ) 367 if results: 368 return results[0] 369 370 return None 371 372 # Get the sync job by ID (lazy loaded) 373 return SyncResult( 374 workspace=self.workspace, 375 connection=self, 376 job_id=job_id, 377 ) 378 379 # Artifacts 380 381 @deprecated("Use 'dump_raw_state()' instead.") 382 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 383 """Deprecated. Use `dump_raw_state()` instead.""" 384 state_response = api_util.get_connection_state( 385 connection_id=self.connection_id, 386 api_root=self.workspace.api_root, 387 client_id=self.workspace.client_id, 388 client_secret=self.workspace.client_secret, 389 bearer_token=self.workspace.bearer_token, 390 ) 391 if state_response.get("stateType") == "not_set": 392 return None 393 return state_response.get("streamState", []) 394 395 def dump_raw_state(self) -> dict[str, Any]: 396 """Dump the full raw state for this connection. 397 398 Returns the connection's sync state as a raw dictionary from the API. 399 The result includes stateType, connectionId, and all state data. 400 401 The output of this method can be passed directly to `import_raw_state()` 402 on the same or a different connection (connectionId is overridden on import). 403 404 Returns: 405 The full connection state as a dictionary. 406 """ 407 return api_util.get_connection_state( 408 connection_id=self.connection_id, 409 api_root=self.workspace.api_root, 410 client_id=self.workspace.client_id, 411 client_secret=self.workspace.client_secret, 412 bearer_token=self.workspace.bearer_token, 413 ) 414 415 def import_raw_state( 416 self, 417 connection_state_dict: dict[str, Any], 418 ) -> dict[str, Any]: 419 """Import (restore) the full raw state for this connection. 420 421 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 422 > could result in broken connections, and/or incorrect sync behavior. 423 424 Replaces the entire connection state with the provided state blob. 425 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 426 427 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 428 The `connectionId` in the blob is always overridden with this connection's 429 ID, making state blobs portable across connections. 430 431 Args: 432 connection_state_dict: The full connection state dict to import. Must include: 433 - stateType: "global", "stream", or "legacy" 434 - One of: state (legacy), streamState (stream), globalState (global) 435 436 Returns: 437 The updated connection state as a dictionary. 438 439 Raises: 440 AirbyteConnectionSyncActiveError: If a sync is currently running on this 441 connection (HTTP 423). Wait for the sync to complete before retrying. 442 """ 443 return api_util.replace_connection_state( 444 connection_id=self.connection_id, 445 connection_state_dict=connection_state_dict, 446 api_root=self.workspace.api_root, 447 client_id=self.workspace.client_id, 448 client_secret=self.workspace.client_secret, 449 bearer_token=self.workspace.bearer_token, 450 ) 451 452 def get_stream_state( 453 self, 454 stream_name: str, 455 stream_namespace: str | None = None, 456 ) -> dict[str, Any] | None: 457 """Get the state blob for a single stream within this connection. 458 459 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 460 not the full connection state envelope. 461 462 This is compatible with `stream`-type state and stream-level entries 463 within a `global`-type state. It is not compatible with `legacy` state. 464 To get or set the entire connection-level state artifact, use 465 `dump_raw_state` and `import_raw_state` instead. 466 467 Args: 468 stream_name: The name of the stream to get state for. 469 stream_namespace: The source-side stream namespace. This refers to the 470 namespace from the source (e.g., database schema), not any destination 471 namespace override set in connection advanced settings. 472 473 Returns: 474 The stream's state blob as a dictionary, or None if the stream is not found. 475 """ 476 state_data = self.dump_raw_state() 477 result = ConnectionStateResponse(**state_data) 478 479 streams = _get_stream_list(result) 480 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 481 482 if not matching: 483 available = [s.stream_descriptor.name for s in streams] 484 logger.warning( 485 "Stream '%s' not found in connection state for connection '%s'. " 486 "Available streams: %s", 487 stream_name, 488 self.connection_id, 489 available, 490 ) 491 return None 492 493 return matching[0].stream_state 494 495 def set_stream_state( 496 self, 497 stream_name: str, 498 state_blob_dict: dict[str, Any], 499 stream_namespace: str | None = None, 500 ) -> None: 501 """Set the state for a single stream within this connection. 502 503 Fetches the current full state, replaces only the specified stream's state, 504 then sends the full updated state back to the API. If the stream does not 505 exist in the current state, it is appended. 506 507 This is compatible with `stream`-type state and stream-level entries 508 within a `global`-type state. It is not compatible with `legacy` state. 509 To get or set the entire connection-level state artifact, use 510 `dump_raw_state` and `import_raw_state` instead. 511 512 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 513 514 Args: 515 stream_name: The name of the stream to update state for. 516 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 517 stream_namespace: The source-side stream namespace. This refers to the 518 namespace from the source (e.g., database schema), not any destination 519 namespace override set in connection advanced settings. 520 521 Raises: 522 PyAirbyteInputError: If the connection state type is not supported for 523 stream-level operations (not_set, legacy). 524 AirbyteConnectionSyncActiveError: If a sync is currently running on this 525 connection (HTTP 423). Wait for the sync to complete before retrying. 526 """ 527 state_data = self.dump_raw_state() 528 current = ConnectionStateResponse(**state_data) 529 530 if current.state_type == "not_set": 531 raise PyAirbyteInputError( 532 message="Cannot set stream state: connection has no existing state.", 533 context={"connection_id": self.connection_id}, 534 ) 535 536 if current.state_type == "legacy": 537 raise PyAirbyteInputError( 538 message="Cannot set stream state on a legacy-type connection state.", 539 context={"connection_id": self.connection_id}, 540 ) 541 542 new_stream_entry = { 543 "streamDescriptor": { 544 "name": stream_name, 545 **( 546 { 547 "namespace": stream_namespace, 548 } 549 if stream_namespace 550 else {} 551 ), 552 }, 553 "streamState": state_blob_dict, 554 } 555 556 raw_streams: list[dict[str, Any]] 557 if current.state_type == "stream": 558 raw_streams = state_data.get("streamState", []) 559 elif current.state_type == "global": 560 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 561 else: 562 raw_streams = [] 563 564 streams = _get_stream_list(current) 565 found = False 566 updated_streams_raw: list[dict[str, Any]] = [] 567 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 568 if _match_stream(parsed_s, stream_name, stream_namespace): 569 updated_streams_raw.append(new_stream_entry) 570 found = True 571 else: 572 updated_streams_raw.append(raw_s) 573 574 if not found: 575 updated_streams_raw.append(new_stream_entry) 576 577 full_state: dict[str, Any] = { 578 **state_data, 579 } 580 581 if current.state_type == "stream": 582 full_state["streamState"] = updated_streams_raw 583 elif current.state_type == "global": 584 original_global = state_data.get("globalState", {}) 585 full_state["globalState"] = { 586 **original_global, 587 "streamStates": updated_streams_raw, 588 } 589 590 self.import_raw_state(full_state) 591 592 @deprecated("Use 'dump_raw_catalog()' instead.") 593 def get_catalog_artifact(self) -> dict[str, Any] | None: 594 """Get the configured catalog for this connection. 595 596 Returns the full configured catalog (syncCatalog) for this connection, 597 including stream schemas, sync modes, cursor fields, and primary keys. 598 599 Uses the Config API endpoint: POST /v1/web_backend/connections/get 600 601 Returns: 602 Dictionary containing the configured catalog, or `None` if not found. 603 """ 604 return self.dump_raw_catalog() 605 606 def dump_raw_catalog(self) -> dict[str, Any] | None: 607 """Dump the full configured catalog (syncCatalog) for this connection. 608 609 Returns the raw catalog dict as returned by the API, including stream 610 schemas, sync modes, cursor fields, and primary keys. 611 612 The returned dict can be passed to `import_raw_catalog()` on this or 613 another connection to restore or clone the catalog configuration. 614 615 Returns: 616 Dictionary containing the configured catalog, or `None` if not found. 617 """ 618 connection_response = api_util.get_connection_catalog( 619 connection_id=self.connection_id, 620 api_root=self.workspace.api_root, 621 client_id=self.workspace.client_id, 622 client_secret=self.workspace.client_secret, 623 bearer_token=self.workspace.bearer_token, 624 ) 625 return connection_response.get("syncCatalog") 626 627 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 628 """Replace the configured catalog for this connection. 629 630 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 631 > could result in broken connections, and/or incorrect sync behavior. 632 633 Accepts a configured catalog dict and replaces the connection's entire 634 catalog with it. All other connection settings remain unchanged. 635 636 The catalog shape should match the output of `dump_raw_catalog()`: 637 `{"streams": [{"stream": {...}, "config": {...}}, ...]}`. 638 639 Args: 640 catalog: The configured catalog dict to set. 641 """ 642 api_util.replace_connection_catalog( 643 connection_id=self.connection_id, 644 configured_catalog_dict=catalog, 645 api_root=self.workspace.api_root, 646 client_id=self.workspace.client_id, 647 client_secret=self.workspace.client_secret, 648 bearer_token=self.workspace.bearer_token, 649 ) 650 651 def rename(self, name: str) -> CloudConnection: 652 """Rename the connection. 653 654 Args: 655 name: New name for the connection 656 657 Returns: 658 Updated CloudConnection object with refreshed info 659 """ 660 updated_response = api_util.patch_connection( 661 connection_id=self.connection_id, 662 api_root=self.workspace.api_root, 663 client_id=self.workspace.client_id, 664 client_secret=self.workspace.client_secret, 665 bearer_token=self.workspace.bearer_token, 666 name=name, 667 ) 668 self._connection_info = updated_response 669 return self 670 671 def set_table_prefix(self, prefix: str) -> CloudConnection: 672 """Set the table prefix for the connection. 673 674 Args: 675 prefix: New table prefix to use when syncing to the destination 676 677 Returns: 678 Updated CloudConnection object with refreshed info 679 """ 680 updated_response = api_util.patch_connection( 681 connection_id=self.connection_id, 682 api_root=self.workspace.api_root, 683 client_id=self.workspace.client_id, 684 client_secret=self.workspace.client_secret, 685 bearer_token=self.workspace.bearer_token, 686 prefix=prefix, 687 ) 688 self._connection_info = updated_response 689 return self 690 691 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 692 """Set the selected streams for the connection. 693 694 This is a destructive operation that can break existing connections if the 695 stream selection is changed incorrectly. Use with caution. 696 697 Args: 698 stream_names: List of stream names to sync 699 700 Returns: 701 Updated CloudConnection object with refreshed info 702 """ 703 configurations = api_util.build_stream_configurations(stream_names) 704 705 updated_response = api_util.patch_connection( 706 connection_id=self.connection_id, 707 api_root=self.workspace.api_root, 708 client_id=self.workspace.client_id, 709 client_secret=self.workspace.client_secret, 710 bearer_token=self.workspace.bearer_token, 711 configurations=configurations, 712 ) 713 self._connection_info = updated_response 714 return self 715 716 # Enable/Disable 717 718 @property 719 def enabled(self) -> bool: 720 """Get the current enabled status of the connection. 721 722 This property always fetches fresh data from the API to ensure accuracy, 723 as another process or user may have toggled the setting. 724 725 Returns: 726 True if the connection status is 'active', False otherwise. 727 """ 728 connection_info = self._fetch_connection_info(force_refresh=True) 729 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 730 731 @enabled.setter 732 def enabled(self, value: bool) -> None: 733 """Set the enabled status of the connection. 734 735 Args: 736 value: True to enable (set status to 'active'), False to disable 737 (set status to 'inactive'). 738 """ 739 self.set_enabled(enabled=value) 740 741 def set_enabled( 742 self, 743 *, 744 enabled: bool, 745 ignore_noop: bool = True, 746 ) -> None: 747 """Set the enabled status of the connection. 748 749 Args: 750 enabled: True to enable (set status to 'active'), False to disable 751 (set status to 'inactive'). 752 ignore_noop: If True (default), silently return if the connection is already 753 in the requested state. If False, raise ValueError when the requested 754 state matches the current state. 755 756 Raises: 757 ValueError: If ignore_noop is False and the connection is already in the 758 requested state. 759 """ 760 # Always fetch fresh data to check current status 761 connection_info = self._fetch_connection_info(force_refresh=True) 762 current_status = connection_info.status 763 desired_status = ( 764 api_util.models.ConnectionStatusEnum.ACTIVE 765 if enabled 766 else api_util.models.ConnectionStatusEnum.INACTIVE 767 ) 768 769 if current_status == desired_status: 770 if ignore_noop: 771 return 772 raise ValueError( 773 f"Connection is already {'enabled' if enabled else 'disabled'}. " 774 f"Current status: {current_status}" 775 ) 776 777 updated_response = api_util.patch_connection( 778 connection_id=self.connection_id, 779 api_root=self.workspace.api_root, 780 client_id=self.workspace.client_id, 781 client_secret=self.workspace.client_secret, 782 bearer_token=self.workspace.bearer_token, 783 status=desired_status, 784 ) 785 self._connection_info = updated_response 786 787 # Scheduling 788 789 def set_schedule( 790 self, 791 cron_expression: str, 792 ) -> None: 793 """Set a cron schedule for the connection. 794 795 Args: 796 cron_expression: A cron expression defining when syncs should run. 797 798 Examples: 799 - "0 0 * * *" - Daily at midnight UTC 800 - "0 */6 * * *" - Every 6 hours 801 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 802 """ 803 schedule = api_util.models.AirbyteAPIConnectionSchedule( 804 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 805 cron_expression=cron_expression, 806 ) 807 updated_response = api_util.patch_connection( 808 connection_id=self.connection_id, 809 api_root=self.workspace.api_root, 810 client_id=self.workspace.client_id, 811 client_secret=self.workspace.client_secret, 812 bearer_token=self.workspace.bearer_token, 813 schedule=schedule, 814 ) 815 self._connection_info = updated_response 816 817 def set_manual_schedule(self) -> None: 818 """Set the connection to manual scheduling. 819 820 Disables automatic syncs. Syncs will only run when manually triggered. 821 """ 822 schedule = api_util.models.AirbyteAPIConnectionSchedule( 823 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 824 ) 825 updated_response = api_util.patch_connection( 826 connection_id=self.connection_id, 827 api_root=self.workspace.api_root, 828 client_id=self.workspace.client_id, 829 client_secret=self.workspace.client_secret, 830 bearer_token=self.workspace.bearer_token, 831 schedule=schedule, 832 ) 833 self._connection_info = updated_response 834 835 # Deletions 836 837 def permanently_delete( 838 self, 839 *, 840 cascade_delete_source: bool = False, 841 cascade_delete_destination: bool = False, 842 ) -> None: 843 """Delete the connection. 844 845 Args: 846 cascade_delete_source: Whether to also delete the source. 847 cascade_delete_destination: Whether to also delete the destination. 848 """ 849 self.workspace.permanently_delete_connection(self) 850 851 if cascade_delete_source: 852 self.workspace.permanently_delete_source(self.source_id) 853 854 if cascade_delete_destination: 855 self.workspace.permanently_delete_destination(self.destination_id)
32class CloudConnection: # noqa: PLR0904 # Too many public methods 33 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 34 35 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 36 """ 37 38 def __init__( 39 self, 40 workspace: CloudWorkspace, 41 connection_id: str, 42 source: str | None = None, 43 destination: str | None = None, 44 ) -> None: 45 """It is not recommended to create a `CloudConnection` object directly. 46 47 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 48 """ 49 self.connection_id = connection_id 50 """The ID of the connection.""" 51 52 self.workspace = workspace 53 """The workspace that the connection belongs to.""" 54 55 self._source_id = source 56 """The ID of the source.""" 57 58 self._destination_id = destination 59 """The ID of the destination.""" 60 61 self._connection_info: ConnectionResponse | None = None 62 """The connection info object. (Cached.)""" 63 64 self._cloud_source_object: CloudSource | None = None 65 """The source object. (Cached.)""" 66 67 self._cloud_destination_object: CloudDestination | None = None 68 """The destination object. (Cached.)""" 69 70 def _fetch_connection_info( 71 self, 72 *, 73 force_refresh: bool = False, 74 verify: bool = True, 75 ) -> ConnectionResponse: 76 """Fetch and cache connection info from the API. 77 78 By default, this method will only fetch from the API if connection info is not 79 already cached. It also verifies that the connection belongs to the expected 80 workspace unless verification is explicitly disabled. 81 82 Args: 83 force_refresh: If True, always fetch from the API even if cached. 84 If False (default), only fetch if not already cached. 85 verify: If True (default), verify that the connection is valid (e.g., that 86 the workspace_id matches this object's workspace). Raises an error if 87 validation fails. 88 89 Returns: 90 The ConnectionResponse from the API. 91 92 Raises: 93 AirbyteWorkspaceMismatchError: If verify is True and the connection's 94 workspace_id doesn't match the expected workspace. 95 AirbyteMissingResourceError: If the connection doesn't exist. 96 """ 97 if not force_refresh and self._connection_info is not None: 98 # Use cached info, but still verify if requested 99 if verify: 100 self._verify_workspace_match(self._connection_info) 101 return self._connection_info 102 103 # Fetch from API 104 connection_info = api_util.get_connection( 105 workspace_id=self.workspace.workspace_id, 106 connection_id=self.connection_id, 107 api_root=self.workspace.api_root, 108 client_id=self.workspace.client_id, 109 client_secret=self.workspace.client_secret, 110 bearer_token=self.workspace.bearer_token, 111 ) 112 113 # Cache the result first (before verification may raise) 114 self._connection_info = connection_info 115 116 # Verify if requested 117 if verify: 118 self._verify_workspace_match(connection_info) 119 120 return connection_info 121 122 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 123 """Verify that the connection belongs to the expected workspace. 124 125 Raises: 126 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 127 """ 128 if connection_info.workspace_id != self.workspace.workspace_id: 129 raise AirbyteWorkspaceMismatchError( 130 resource_type="connection", 131 resource_id=self.connection_id, 132 workspace=self.workspace, 133 expected_workspace_id=self.workspace.workspace_id, 134 actual_workspace_id=connection_info.workspace_id, 135 message=( 136 f"Connection '{self.connection_id}' belongs to workspace " 137 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 138 ), 139 ) 140 141 def check_is_valid(self) -> bool: 142 """Check if this connection exists and belongs to the expected workspace. 143 144 This method fetches connection info from the API (if not already cached) and 145 verifies that the connection's workspace_id matches the workspace associated 146 with this CloudConnection object. 147 148 Returns: 149 True if the connection exists and belongs to the expected workspace. 150 151 Raises: 152 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 153 AirbyteMissingResourceError: If the connection doesn't exist. 154 """ 155 self._fetch_connection_info(force_refresh=False, verify=True) 156 return True 157 158 @classmethod 159 def _from_connection_response( 160 cls, 161 workspace: CloudWorkspace, 162 connection_response: ConnectionResponse, 163 ) -> CloudConnection: 164 """Create a CloudConnection from a ConnectionResponse.""" 165 result = cls( 166 workspace=workspace, 167 connection_id=connection_response.connection_id, 168 source=connection_response.source_id, 169 destination=connection_response.destination_id, 170 ) 171 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 172 return result 173 174 # Properties 175 176 @property 177 def name(self) -> str | None: 178 """Get the display name of the connection, if available. 179 180 E.g. "My Postgres to Snowflake", not the connection ID. 181 """ 182 if not self._connection_info: 183 self._connection_info = self._fetch_connection_info() 184 185 return self._connection_info.name 186 187 @property 188 def source_id(self) -> str: 189 """The ID of the source.""" 190 if not self._source_id: 191 if not self._connection_info: 192 self._connection_info = self._fetch_connection_info() 193 194 self._source_id = self._connection_info.source_id 195 196 return self._source_id 197 198 @property 199 def source(self) -> CloudSource: 200 """Get the source object.""" 201 if self._cloud_source_object: 202 return self._cloud_source_object 203 204 self._cloud_source_object = CloudSource( 205 workspace=self.workspace, 206 connector_id=self.source_id, 207 ) 208 return self._cloud_source_object 209 210 @property 211 def destination_id(self) -> str: 212 """The ID of the destination.""" 213 if not self._destination_id: 214 if not self._connection_info: 215 self._connection_info = self._fetch_connection_info() 216 217 self._destination_id = self._connection_info.destination_id 218 219 return self._destination_id 220 221 @property 222 def destination(self) -> CloudDestination: 223 """Get the destination object.""" 224 if self._cloud_destination_object: 225 return self._cloud_destination_object 226 227 self._cloud_destination_object = CloudDestination( 228 workspace=self.workspace, 229 connector_id=self.destination_id, 230 ) 231 return self._cloud_destination_object 232 233 @property 234 def stream_names(self) -> list[str]: 235 """The stream names.""" 236 if not self._connection_info: 237 self._connection_info = self._fetch_connection_info() 238 239 return [stream.name for stream in self._connection_info.configurations.streams or []] 240 241 @property 242 def table_prefix(self) -> str: 243 """The table prefix.""" 244 if not self._connection_info: 245 self._connection_info = self._fetch_connection_info() 246 247 return self._connection_info.prefix or "" 248 249 @property 250 def connection_url(self) -> str | None: 251 """The web URL to the connection.""" 252 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 253 254 @property 255 def job_history_url(self) -> str | None: 256 """The URL to the job history for the connection.""" 257 return f"{self.connection_url}/timeline" 258 259 # Run Sync 260 261 def run_sync( 262 self, 263 *, 264 wait: bool = True, 265 wait_timeout: int = 300, 266 ) -> SyncResult: 267 """Run a sync.""" 268 connection_response = api_util.run_connection( 269 connection_id=self.connection_id, 270 api_root=self.workspace.api_root, 271 workspace_id=self.workspace.workspace_id, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 bearer_token=self.workspace.bearer_token, 275 ) 276 sync_result = SyncResult( 277 workspace=self.workspace, 278 connection=self, 279 job_id=connection_response.job_id, 280 ) 281 282 if wait: 283 sync_result.wait_for_completion( 284 wait_timeout=wait_timeout, 285 raise_failure=True, 286 raise_timeout=True, 287 ) 288 289 return sync_result 290 291 def __repr__(self) -> str: 292 """String representation of the connection.""" 293 return ( 294 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 295 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 296 ) 297 298 # Logs 299 300 def get_previous_sync_logs( 301 self, 302 *, 303 limit: int = 20, 304 offset: int | None = None, 305 from_tail: bool = True, 306 job_type: JobTypeEnum | None = None, 307 ) -> list[SyncResult]: 308 """Get previous sync jobs for a connection with pagination support. 309 310 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 311 rows_synced, start_time). Full log text can be fetched lazily via 312 `SyncResult.get_full_log_text()`. 313 314 Args: 315 limit: Maximum number of jobs to return. Defaults to 20. 316 offset: Number of jobs to skip from the beginning. Defaults to None (0). 317 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 318 If False, returns jobs ordered oldest-first (createdAt ASC). 319 Defaults to True. 320 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 321 If not specified, defaults to sync and reset jobs only (API default behavior). 322 323 Returns: 324 A list of SyncResult objects representing the sync jobs. 325 """ 326 order_by = ( 327 api_util.JOB_ORDER_BY_CREATED_AT_DESC 328 if from_tail 329 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 330 ) 331 sync_logs: list[JobResponse] = api_util.get_job_logs( 332 connection_id=self.connection_id, 333 api_root=self.workspace.api_root, 334 workspace_id=self.workspace.workspace_id, 335 limit=limit, 336 offset=offset, 337 order_by=order_by, 338 job_type=job_type, 339 client_id=self.workspace.client_id, 340 client_secret=self.workspace.client_secret, 341 bearer_token=self.workspace.bearer_token, 342 ) 343 return [ 344 SyncResult( 345 workspace=self.workspace, 346 connection=self, 347 job_id=sync_log.job_id, 348 _latest_job_info=sync_log, 349 ) 350 for sync_log in sync_logs 351 ] 352 353 def get_sync_result( 354 self, 355 job_id: int | None = None, 356 ) -> SyncResult | None: 357 """Get the sync result for the connection. 358 359 If `job_id` is not provided, the most recent sync job will be used. 360 361 Returns `None` if job_id is omitted and no previous jobs are found. 362 """ 363 if job_id is None: 364 # Get the most recent sync job 365 results = self.get_previous_sync_logs( 366 limit=1, 367 ) 368 if results: 369 return results[0] 370 371 return None 372 373 # Get the sync job by ID (lazy loaded) 374 return SyncResult( 375 workspace=self.workspace, 376 connection=self, 377 job_id=job_id, 378 ) 379 380 # Artifacts 381 382 @deprecated("Use 'dump_raw_state()' instead.") 383 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 384 """Deprecated. Use `dump_raw_state()` instead.""" 385 state_response = api_util.get_connection_state( 386 connection_id=self.connection_id, 387 api_root=self.workspace.api_root, 388 client_id=self.workspace.client_id, 389 client_secret=self.workspace.client_secret, 390 bearer_token=self.workspace.bearer_token, 391 ) 392 if state_response.get("stateType") == "not_set": 393 return None 394 return state_response.get("streamState", []) 395 396 def dump_raw_state(self) -> dict[str, Any]: 397 """Dump the full raw state for this connection. 398 399 Returns the connection's sync state as a raw dictionary from the API. 400 The result includes stateType, connectionId, and all state data. 401 402 The output of this method can be passed directly to `import_raw_state()` 403 on the same or a different connection (connectionId is overridden on import). 404 405 Returns: 406 The full connection state as a dictionary. 407 """ 408 return api_util.get_connection_state( 409 connection_id=self.connection_id, 410 api_root=self.workspace.api_root, 411 client_id=self.workspace.client_id, 412 client_secret=self.workspace.client_secret, 413 bearer_token=self.workspace.bearer_token, 414 ) 415 416 def import_raw_state( 417 self, 418 connection_state_dict: dict[str, Any], 419 ) -> dict[str, Any]: 420 """Import (restore) the full raw state for this connection. 421 422 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 423 > could result in broken connections, and/or incorrect sync behavior. 424 425 Replaces the entire connection state with the provided state blob. 426 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 427 428 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 429 The `connectionId` in the blob is always overridden with this connection's 430 ID, making state blobs portable across connections. 431 432 Args: 433 connection_state_dict: The full connection state dict to import. Must include: 434 - stateType: "global", "stream", or "legacy" 435 - One of: state (legacy), streamState (stream), globalState (global) 436 437 Returns: 438 The updated connection state as a dictionary. 439 440 Raises: 441 AirbyteConnectionSyncActiveError: If a sync is currently running on this 442 connection (HTTP 423). Wait for the sync to complete before retrying. 443 """ 444 return api_util.replace_connection_state( 445 connection_id=self.connection_id, 446 connection_state_dict=connection_state_dict, 447 api_root=self.workspace.api_root, 448 client_id=self.workspace.client_id, 449 client_secret=self.workspace.client_secret, 450 bearer_token=self.workspace.bearer_token, 451 ) 452 453 def get_stream_state( 454 self, 455 stream_name: str, 456 stream_namespace: str | None = None, 457 ) -> dict[str, Any] | None: 458 """Get the state blob for a single stream within this connection. 459 460 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 461 not the full connection state envelope. 462 463 This is compatible with `stream`-type state and stream-level entries 464 within a `global`-type state. It is not compatible with `legacy` state. 465 To get or set the entire connection-level state artifact, use 466 `dump_raw_state` and `import_raw_state` instead. 467 468 Args: 469 stream_name: The name of the stream to get state for. 470 stream_namespace: The source-side stream namespace. This refers to the 471 namespace from the source (e.g., database schema), not any destination 472 namespace override set in connection advanced settings. 473 474 Returns: 475 The stream's state blob as a dictionary, or None if the stream is not found. 476 """ 477 state_data = self.dump_raw_state() 478 result = ConnectionStateResponse(**state_data) 479 480 streams = _get_stream_list(result) 481 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 482 483 if not matching: 484 available = [s.stream_descriptor.name for s in streams] 485 logger.warning( 486 "Stream '%s' not found in connection state for connection '%s'. " 487 "Available streams: %s", 488 stream_name, 489 self.connection_id, 490 available, 491 ) 492 return None 493 494 return matching[0].stream_state 495 496 def set_stream_state( 497 self, 498 stream_name: str, 499 state_blob_dict: dict[str, Any], 500 stream_namespace: str | None = None, 501 ) -> None: 502 """Set the state for a single stream within this connection. 503 504 Fetches the current full state, replaces only the specified stream's state, 505 then sends the full updated state back to the API. If the stream does not 506 exist in the current state, it is appended. 507 508 This is compatible with `stream`-type state and stream-level entries 509 within a `global`-type state. It is not compatible with `legacy` state. 510 To get or set the entire connection-level state artifact, use 511 `dump_raw_state` and `import_raw_state` instead. 512 513 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 514 515 Args: 516 stream_name: The name of the stream to update state for. 517 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 518 stream_namespace: The source-side stream namespace. This refers to the 519 namespace from the source (e.g., database schema), not any destination 520 namespace override set in connection advanced settings. 521 522 Raises: 523 PyAirbyteInputError: If the connection state type is not supported for 524 stream-level operations (not_set, legacy). 525 AirbyteConnectionSyncActiveError: If a sync is currently running on this 526 connection (HTTP 423). Wait for the sync to complete before retrying. 527 """ 528 state_data = self.dump_raw_state() 529 current = ConnectionStateResponse(**state_data) 530 531 if current.state_type == "not_set": 532 raise PyAirbyteInputError( 533 message="Cannot set stream state: connection has no existing state.", 534 context={"connection_id": self.connection_id}, 535 ) 536 537 if current.state_type == "legacy": 538 raise PyAirbyteInputError( 539 message="Cannot set stream state on a legacy-type connection state.", 540 context={"connection_id": self.connection_id}, 541 ) 542 543 new_stream_entry = { 544 "streamDescriptor": { 545 "name": stream_name, 546 **( 547 { 548 "namespace": stream_namespace, 549 } 550 if stream_namespace 551 else {} 552 ), 553 }, 554 "streamState": state_blob_dict, 555 } 556 557 raw_streams: list[dict[str, Any]] 558 if current.state_type == "stream": 559 raw_streams = state_data.get("streamState", []) 560 elif current.state_type == "global": 561 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 562 else: 563 raw_streams = [] 564 565 streams = _get_stream_list(current) 566 found = False 567 updated_streams_raw: list[dict[str, Any]] = [] 568 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 569 if _match_stream(parsed_s, stream_name, stream_namespace): 570 updated_streams_raw.append(new_stream_entry) 571 found = True 572 else: 573 updated_streams_raw.append(raw_s) 574 575 if not found: 576 updated_streams_raw.append(new_stream_entry) 577 578 full_state: dict[str, Any] = { 579 **state_data, 580 } 581 582 if current.state_type == "stream": 583 full_state["streamState"] = updated_streams_raw 584 elif current.state_type == "global": 585 original_global = state_data.get("globalState", {}) 586 full_state["globalState"] = { 587 **original_global, 588 "streamStates": updated_streams_raw, 589 } 590 591 self.import_raw_state(full_state) 592 593 @deprecated("Use 'dump_raw_catalog()' instead.") 594 def get_catalog_artifact(self) -> dict[str, Any] | None: 595 """Get the configured catalog for this connection. 596 597 Returns the full configured catalog (syncCatalog) for this connection, 598 including stream schemas, sync modes, cursor fields, and primary keys. 599 600 Uses the Config API endpoint: POST /v1/web_backend/connections/get 601 602 Returns: 603 Dictionary containing the configured catalog, or `None` if not found. 604 """ 605 return self.dump_raw_catalog() 606 607 def dump_raw_catalog(self) -> dict[str, Any] | None: 608 """Dump the full configured catalog (syncCatalog) for this connection. 609 610 Returns the raw catalog dict as returned by the API, including stream 611 schemas, sync modes, cursor fields, and primary keys. 612 613 The returned dict can be passed to `import_raw_catalog()` on this or 614 another connection to restore or clone the catalog configuration. 615 616 Returns: 617 Dictionary containing the configured catalog, or `None` if not found. 618 """ 619 connection_response = api_util.get_connection_catalog( 620 connection_id=self.connection_id, 621 api_root=self.workspace.api_root, 622 client_id=self.workspace.client_id, 623 client_secret=self.workspace.client_secret, 624 bearer_token=self.workspace.bearer_token, 625 ) 626 return connection_response.get("syncCatalog") 627 628 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 629 """Replace the configured catalog for this connection. 630 631 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 632 > could result in broken connections, and/or incorrect sync behavior. 633 634 Accepts a configured catalog dict and replaces the connection's entire 635 catalog with it. All other connection settings remain unchanged. 636 637 The catalog shape should match the output of `dump_raw_catalog()`: 638 `{"streams": [{"stream": {...}, "config": {...}}, ...]}`. 639 640 Args: 641 catalog: The configured catalog dict to set. 642 """ 643 api_util.replace_connection_catalog( 644 connection_id=self.connection_id, 645 configured_catalog_dict=catalog, 646 api_root=self.workspace.api_root, 647 client_id=self.workspace.client_id, 648 client_secret=self.workspace.client_secret, 649 bearer_token=self.workspace.bearer_token, 650 ) 651 652 def rename(self, name: str) -> CloudConnection: 653 """Rename the connection. 654 655 Args: 656 name: New name for the connection 657 658 Returns: 659 Updated CloudConnection object with refreshed info 660 """ 661 updated_response = api_util.patch_connection( 662 connection_id=self.connection_id, 663 api_root=self.workspace.api_root, 664 client_id=self.workspace.client_id, 665 client_secret=self.workspace.client_secret, 666 bearer_token=self.workspace.bearer_token, 667 name=name, 668 ) 669 self._connection_info = updated_response 670 return self 671 672 def set_table_prefix(self, prefix: str) -> CloudConnection: 673 """Set the table prefix for the connection. 674 675 Args: 676 prefix: New table prefix to use when syncing to the destination 677 678 Returns: 679 Updated CloudConnection object with refreshed info 680 """ 681 updated_response = api_util.patch_connection( 682 connection_id=self.connection_id, 683 api_root=self.workspace.api_root, 684 client_id=self.workspace.client_id, 685 client_secret=self.workspace.client_secret, 686 bearer_token=self.workspace.bearer_token, 687 prefix=prefix, 688 ) 689 self._connection_info = updated_response 690 return self 691 692 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 693 """Set the selected streams for the connection. 694 695 This is a destructive operation that can break existing connections if the 696 stream selection is changed incorrectly. Use with caution. 697 698 Args: 699 stream_names: List of stream names to sync 700 701 Returns: 702 Updated CloudConnection object with refreshed info 703 """ 704 configurations = api_util.build_stream_configurations(stream_names) 705 706 updated_response = api_util.patch_connection( 707 connection_id=self.connection_id, 708 api_root=self.workspace.api_root, 709 client_id=self.workspace.client_id, 710 client_secret=self.workspace.client_secret, 711 bearer_token=self.workspace.bearer_token, 712 configurations=configurations, 713 ) 714 self._connection_info = updated_response 715 return self 716 717 # Enable/Disable 718 719 @property 720 def enabled(self) -> bool: 721 """Get the current enabled status of the connection. 722 723 This property always fetches fresh data from the API to ensure accuracy, 724 as another process or user may have toggled the setting. 725 726 Returns: 727 True if the connection status is 'active', False otherwise. 728 """ 729 connection_info = self._fetch_connection_info(force_refresh=True) 730 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 731 732 @enabled.setter 733 def enabled(self, value: bool) -> None: 734 """Set the enabled status of the connection. 735 736 Args: 737 value: True to enable (set status to 'active'), False to disable 738 (set status to 'inactive'). 739 """ 740 self.set_enabled(enabled=value) 741 742 def set_enabled( 743 self, 744 *, 745 enabled: bool, 746 ignore_noop: bool = True, 747 ) -> None: 748 """Set the enabled status of the connection. 749 750 Args: 751 enabled: True to enable (set status to 'active'), False to disable 752 (set status to 'inactive'). 753 ignore_noop: If True (default), silently return if the connection is already 754 in the requested state. If False, raise ValueError when the requested 755 state matches the current state. 756 757 Raises: 758 ValueError: If ignore_noop is False and the connection is already in the 759 requested state. 760 """ 761 # Always fetch fresh data to check current status 762 connection_info = self._fetch_connection_info(force_refresh=True) 763 current_status = connection_info.status 764 desired_status = ( 765 api_util.models.ConnectionStatusEnum.ACTIVE 766 if enabled 767 else api_util.models.ConnectionStatusEnum.INACTIVE 768 ) 769 770 if current_status == desired_status: 771 if ignore_noop: 772 return 773 raise ValueError( 774 f"Connection is already {'enabled' if enabled else 'disabled'}. " 775 f"Current status: {current_status}" 776 ) 777 778 updated_response = api_util.patch_connection( 779 connection_id=self.connection_id, 780 api_root=self.workspace.api_root, 781 client_id=self.workspace.client_id, 782 client_secret=self.workspace.client_secret, 783 bearer_token=self.workspace.bearer_token, 784 status=desired_status, 785 ) 786 self._connection_info = updated_response 787 788 # Scheduling 789 790 def set_schedule( 791 self, 792 cron_expression: str, 793 ) -> None: 794 """Set a cron schedule for the connection. 795 796 Args: 797 cron_expression: A cron expression defining when syncs should run. 798 799 Examples: 800 - "0 0 * * *" - Daily at midnight UTC 801 - "0 */6 * * *" - Every 6 hours 802 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 803 """ 804 schedule = api_util.models.AirbyteAPIConnectionSchedule( 805 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 806 cron_expression=cron_expression, 807 ) 808 updated_response = api_util.patch_connection( 809 connection_id=self.connection_id, 810 api_root=self.workspace.api_root, 811 client_id=self.workspace.client_id, 812 client_secret=self.workspace.client_secret, 813 bearer_token=self.workspace.bearer_token, 814 schedule=schedule, 815 ) 816 self._connection_info = updated_response 817 818 def set_manual_schedule(self) -> None: 819 """Set the connection to manual scheduling. 820 821 Disables automatic syncs. Syncs will only run when manually triggered. 822 """ 823 schedule = api_util.models.AirbyteAPIConnectionSchedule( 824 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 825 ) 826 updated_response = api_util.patch_connection( 827 connection_id=self.connection_id, 828 api_root=self.workspace.api_root, 829 client_id=self.workspace.client_id, 830 client_secret=self.workspace.client_secret, 831 bearer_token=self.workspace.bearer_token, 832 schedule=schedule, 833 ) 834 self._connection_info = updated_response 835 836 # Deletions 837 838 def permanently_delete( 839 self, 840 *, 841 cascade_delete_source: bool = False, 842 cascade_delete_destination: bool = False, 843 ) -> None: 844 """Delete the connection. 845 846 Args: 847 cascade_delete_source: Whether to also delete the source. 848 cascade_delete_destination: Whether to also delete the destination. 849 """ 850 self.workspace.permanently_delete_connection(self) 851 852 if cascade_delete_source: 853 self.workspace.permanently_delete_source(self.source_id) 854 855 if cascade_delete_destination: 856 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
38 def __init__( 39 self, 40 workspace: CloudWorkspace, 41 connection_id: str, 42 source: str | None = None, 43 destination: str | None = None, 44 ) -> None: 45 """It is not recommended to create a `CloudConnection` object directly. 46 47 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 48 """ 49 self.connection_id = connection_id 50 """The ID of the connection.""" 51 52 self.workspace = workspace 53 """The workspace that the connection belongs to.""" 54 55 self._source_id = source 56 """The ID of the source.""" 57 58 self._destination_id = destination 59 """The ID of the destination.""" 60 61 self._connection_info: ConnectionResponse | None = None 62 """The connection info object. (Cached.)""" 63 64 self._cloud_source_object: CloudSource | None = None 65 """The source object. (Cached.)""" 66 67 self._cloud_destination_object: CloudDestination | None = None 68 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
141 def check_is_valid(self) -> bool: 142 """Check if this connection exists and belongs to the expected workspace. 143 144 This method fetches connection info from the API (if not already cached) and 145 verifies that the connection's workspace_id matches the workspace associated 146 with this CloudConnection object. 147 148 Returns: 149 True if the connection exists and belongs to the expected workspace. 150 151 Raises: 152 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 153 AirbyteMissingResourceError: If the connection doesn't exist. 154 """ 155 self._fetch_connection_info(force_refresh=False, verify=True) 156 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
176 @property 177 def name(self) -> str | None: 178 """Get the display name of the connection, if available. 179 180 E.g. "My Postgres to Snowflake", not the connection ID. 181 """ 182 if not self._connection_info: 183 self._connection_info = self._fetch_connection_info() 184 185 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
187 @property 188 def source_id(self) -> str: 189 """The ID of the source.""" 190 if not self._source_id: 191 if not self._connection_info: 192 self._connection_info = self._fetch_connection_info() 193 194 self._source_id = self._connection_info.source_id 195 196 return self._source_id
The ID of the source.
198 @property 199 def source(self) -> CloudSource: 200 """Get the source object.""" 201 if self._cloud_source_object: 202 return self._cloud_source_object 203 204 self._cloud_source_object = CloudSource( 205 workspace=self.workspace, 206 connector_id=self.source_id, 207 ) 208 return self._cloud_source_object
Get the source object.
210 @property 211 def destination_id(self) -> str: 212 """The ID of the destination.""" 213 if not self._destination_id: 214 if not self._connection_info: 215 self._connection_info = self._fetch_connection_info() 216 217 self._destination_id = self._connection_info.destination_id 218 219 return self._destination_id
The ID of the destination.
221 @property 222 def destination(self) -> CloudDestination: 223 """Get the destination object.""" 224 if self._cloud_destination_object: 225 return self._cloud_destination_object 226 227 self._cloud_destination_object = CloudDestination( 228 workspace=self.workspace, 229 connector_id=self.destination_id, 230 ) 231 return self._cloud_destination_object
Get the destination object.
233 @property 234 def stream_names(self) -> list[str]: 235 """The stream names.""" 236 if not self._connection_info: 237 self._connection_info = self._fetch_connection_info() 238 239 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
241 @property 242 def table_prefix(self) -> str: 243 """The table prefix.""" 244 if not self._connection_info: 245 self._connection_info = self._fetch_connection_info() 246 247 return self._connection_info.prefix or ""
The table prefix.
249 @property 250 def connection_url(self) -> str | None: 251 """The web URL to the connection.""" 252 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
254 @property 255 def job_history_url(self) -> str | None: 256 """The URL to the job history for the connection.""" 257 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
261 def run_sync( 262 self, 263 *, 264 wait: bool = True, 265 wait_timeout: int = 300, 266 ) -> SyncResult: 267 """Run a sync.""" 268 connection_response = api_util.run_connection( 269 connection_id=self.connection_id, 270 api_root=self.workspace.api_root, 271 workspace_id=self.workspace.workspace_id, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 bearer_token=self.workspace.bearer_token, 275 ) 276 sync_result = SyncResult( 277 workspace=self.workspace, 278 connection=self, 279 job_id=connection_response.job_id, 280 ) 281 282 if wait: 283 sync_result.wait_for_completion( 284 wait_timeout=wait_timeout, 285 raise_failure=True, 286 raise_timeout=True, 287 ) 288 289 return sync_result
Run a sync.
300 def get_previous_sync_logs( 301 self, 302 *, 303 limit: int = 20, 304 offset: int | None = None, 305 from_tail: bool = True, 306 job_type: JobTypeEnum | None = None, 307 ) -> list[SyncResult]: 308 """Get previous sync jobs for a connection with pagination support. 309 310 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 311 rows_synced, start_time). Full log text can be fetched lazily via 312 `SyncResult.get_full_log_text()`. 313 314 Args: 315 limit: Maximum number of jobs to return. Defaults to 20. 316 offset: Number of jobs to skip from the beginning. Defaults to None (0). 317 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 318 If False, returns jobs ordered oldest-first (createdAt ASC). 319 Defaults to True. 320 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 321 If not specified, defaults to sync and reset jobs only (API default behavior). 322 323 Returns: 324 A list of SyncResult objects representing the sync jobs. 325 """ 326 order_by = ( 327 api_util.JOB_ORDER_BY_CREATED_AT_DESC 328 if from_tail 329 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 330 ) 331 sync_logs: list[JobResponse] = api_util.get_job_logs( 332 connection_id=self.connection_id, 333 api_root=self.workspace.api_root, 334 workspace_id=self.workspace.workspace_id, 335 limit=limit, 336 offset=offset, 337 order_by=order_by, 338 job_type=job_type, 339 client_id=self.workspace.client_id, 340 client_secret=self.workspace.client_secret, 341 bearer_token=self.workspace.bearer_token, 342 ) 343 return [ 344 SyncResult( 345 workspace=self.workspace, 346 connection=self, 347 job_id=sync_log.job_id, 348 _latest_job_info=sync_log, 349 ) 350 for sync_log in sync_logs 351 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
- job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:
A list of SyncResult objects representing the sync jobs.
353 def get_sync_result( 354 self, 355 job_id: int | None = None, 356 ) -> SyncResult | None: 357 """Get the sync result for the connection. 358 359 If `job_id` is not provided, the most recent sync job will be used. 360 361 Returns `None` if job_id is omitted and no previous jobs are found. 362 """ 363 if job_id is None: 364 # Get the most recent sync job 365 results = self.get_previous_sync_logs( 366 limit=1, 367 ) 368 if results: 369 return results[0] 370 371 return None 372 373 # Get the sync job by ID (lazy loaded) 374 return SyncResult( 375 workspace=self.workspace, 376 connection=self, 377 job_id=job_id, 378 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
382 @deprecated("Use 'dump_raw_state()' instead.") 383 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 384 """Deprecated. Use `dump_raw_state()` instead.""" 385 state_response = api_util.get_connection_state( 386 connection_id=self.connection_id, 387 api_root=self.workspace.api_root, 388 client_id=self.workspace.client_id, 389 client_secret=self.workspace.client_secret, 390 bearer_token=self.workspace.bearer_token, 391 ) 392 if state_response.get("stateType") == "not_set": 393 return None 394 return state_response.get("streamState", [])
Deprecated. Use dump_raw_state() instead.
396 def dump_raw_state(self) -> dict[str, Any]: 397 """Dump the full raw state for this connection. 398 399 Returns the connection's sync state as a raw dictionary from the API. 400 The result includes stateType, connectionId, and all state data. 401 402 The output of this method can be passed directly to `import_raw_state()` 403 on the same or a different connection (connectionId is overridden on import). 404 405 Returns: 406 The full connection state as a dictionary. 407 """ 408 return api_util.get_connection_state( 409 connection_id=self.connection_id, 410 api_root=self.workspace.api_root, 411 client_id=self.workspace.client_id, 412 client_secret=self.workspace.client_secret, 413 bearer_token=self.workspace.bearer_token, 414 )
Dump the full raw state for this connection.
Returns the connection's sync state as a raw dictionary from the API. The result includes stateType, connectionId, and all state data.
The output of this method can be passed directly to import_raw_state()
on the same or a different connection (connectionId is overridden on import).
Returns:
The full connection state as a dictionary.
416 def import_raw_state( 417 self, 418 connection_state_dict: dict[str, Any], 419 ) -> dict[str, Any]: 420 """Import (restore) the full raw state for this connection. 421 422 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 423 > could result in broken connections, and/or incorrect sync behavior. 424 425 Replaces the entire connection state with the provided state blob. 426 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 427 428 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 429 The `connectionId` in the blob is always overridden with this connection's 430 ID, making state blobs portable across connections. 431 432 Args: 433 connection_state_dict: The full connection state dict to import. Must include: 434 - stateType: "global", "stream", or "legacy" 435 - One of: state (legacy), streamState (stream), globalState (global) 436 437 Returns: 438 The updated connection state as a dictionary. 439 440 Raises: 441 AirbyteConnectionSyncActiveError: If a sync is currently running on this 442 connection (HTTP 423). Wait for the sync to complete before retrying. 443 """ 444 return api_util.replace_connection_state( 445 connection_id=self.connection_id, 446 connection_state_dict=connection_state_dict, 447 api_root=self.workspace.api_root, 448 client_id=self.workspace.client_id, 449 client_secret=self.workspace.client_secret, 450 bearer_token=self.workspace.bearer_token, 451 )
Import (restore) the full raw state for this connection.
⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
This is the counterpart to dump_raw_state() for backup/restore workflows.
The connectionId in the blob is always overridden with this connection's
ID, making state blobs portable across connections.
Arguments:
- connection_state_dict: The full connection state dict to import. Must include:
- stateType: "global", "stream", or "legacy"
- One of: state (legacy), streamState (stream), globalState (global)
Returns:
The updated connection state as a dictionary.
Raises:
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
453 def get_stream_state( 454 self, 455 stream_name: str, 456 stream_namespace: str | None = None, 457 ) -> dict[str, Any] | None: 458 """Get the state blob for a single stream within this connection. 459 460 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 461 not the full connection state envelope. 462 463 This is compatible with `stream`-type state and stream-level entries 464 within a `global`-type state. It is not compatible with `legacy` state. 465 To get or set the entire connection-level state artifact, use 466 `dump_raw_state` and `import_raw_state` instead. 467 468 Args: 469 stream_name: The name of the stream to get state for. 470 stream_namespace: The source-side stream namespace. This refers to the 471 namespace from the source (e.g., database schema), not any destination 472 namespace override set in connection advanced settings. 473 474 Returns: 475 The stream's state blob as a dictionary, or None if the stream is not found. 476 """ 477 state_data = self.dump_raw_state() 478 result = ConnectionStateResponse(**state_data) 479 480 streams = _get_stream_list(result) 481 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 482 483 if not matching: 484 available = [s.stream_descriptor.name for s in streams] 485 logger.warning( 486 "Stream '%s' not found in connection state for connection '%s'. " 487 "Available streams: %s", 488 stream_name, 489 self.connection_id, 490 available, 491 ) 492 return None 493 494 return matching[0].stream_state
Get the state blob for a single stream within this connection.
Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Arguments:
- stream_name: The name of the stream to get state for.
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:
The stream's state blob as a dictionary, or None if the stream is not found.
496 def set_stream_state( 497 self, 498 stream_name: str, 499 state_blob_dict: dict[str, Any], 500 stream_namespace: str | None = None, 501 ) -> None: 502 """Set the state for a single stream within this connection. 503 504 Fetches the current full state, replaces only the specified stream's state, 505 then sends the full updated state back to the API. If the stream does not 506 exist in the current state, it is appended. 507 508 This is compatible with `stream`-type state and stream-level entries 509 within a `global`-type state. It is not compatible with `legacy` state. 510 To get or set the entire connection-level state artifact, use 511 `dump_raw_state` and `import_raw_state` instead. 512 513 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 514 515 Args: 516 stream_name: The name of the stream to update state for. 517 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 518 stream_namespace: The source-side stream namespace. This refers to the 519 namespace from the source (e.g., database schema), not any destination 520 namespace override set in connection advanced settings. 521 522 Raises: 523 PyAirbyteInputError: If the connection state type is not supported for 524 stream-level operations (not_set, legacy). 525 AirbyteConnectionSyncActiveError: If a sync is currently running on this 526 connection (HTTP 423). Wait for the sync to complete before retrying. 527 """ 528 state_data = self.dump_raw_state() 529 current = ConnectionStateResponse(**state_data) 530 531 if current.state_type == "not_set": 532 raise PyAirbyteInputError( 533 message="Cannot set stream state: connection has no existing state.", 534 context={"connection_id": self.connection_id}, 535 ) 536 537 if current.state_type == "legacy": 538 raise PyAirbyteInputError( 539 message="Cannot set stream state on a legacy-type connection state.", 540 context={"connection_id": self.connection_id}, 541 ) 542 543 new_stream_entry = { 544 "streamDescriptor": { 545 "name": stream_name, 546 **( 547 { 548 "namespace": stream_namespace, 549 } 550 if stream_namespace 551 else {} 552 ), 553 }, 554 "streamState": state_blob_dict, 555 } 556 557 raw_streams: list[dict[str, Any]] 558 if current.state_type == "stream": 559 raw_streams = state_data.get("streamState", []) 560 elif current.state_type == "global": 561 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 562 else: 563 raw_streams = [] 564 565 streams = _get_stream_list(current) 566 found = False 567 updated_streams_raw: list[dict[str, Any]] = [] 568 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 569 if _match_stream(parsed_s, stream_name, stream_namespace): 570 updated_streams_raw.append(new_stream_entry) 571 found = True 572 else: 573 updated_streams_raw.append(raw_s) 574 575 if not found: 576 updated_streams_raw.append(new_stream_entry) 577 578 full_state: dict[str, Any] = { 579 **state_data, 580 } 581 582 if current.state_type == "stream": 583 full_state["streamState"] = updated_streams_raw 584 elif current.state_type == "global": 585 original_global = state_data.get("globalState", {}) 586 full_state["globalState"] = { 587 **original_global, 588 "streamStates": updated_streams_raw, 589 } 590 591 self.import_raw_state(full_state)
Set the state for a single stream within this connection.
Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Uses the safe variant that prevents updates while a sync is running (HTTP 423).
Arguments:
- stream_name: The name of the stream to update state for.
- state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
- PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
593 @deprecated("Use 'dump_raw_catalog()' instead.") 594 def get_catalog_artifact(self) -> dict[str, Any] | None: 595 """Get the configured catalog for this connection. 596 597 Returns the full configured catalog (syncCatalog) for this connection, 598 including stream schemas, sync modes, cursor fields, and primary keys. 599 600 Uses the Config API endpoint: POST /v1/web_backend/connections/get 601 602 Returns: 603 Dictionary containing the configured catalog, or `None` if not found. 604 """ 605 return self.dump_raw_catalog()
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
607 def dump_raw_catalog(self) -> dict[str, Any] | None: 608 """Dump the full configured catalog (syncCatalog) for this connection. 609 610 Returns the raw catalog dict as returned by the API, including stream 611 schemas, sync modes, cursor fields, and primary keys. 612 613 The returned dict can be passed to `import_raw_catalog()` on this or 614 another connection to restore or clone the catalog configuration. 615 616 Returns: 617 Dictionary containing the configured catalog, or `None` if not found. 618 """ 619 connection_response = api_util.get_connection_catalog( 620 connection_id=self.connection_id, 621 api_root=self.workspace.api_root, 622 client_id=self.workspace.client_id, 623 client_secret=self.workspace.client_secret, 624 bearer_token=self.workspace.bearer_token, 625 ) 626 return connection_response.get("syncCatalog")
Dump the full configured catalog (syncCatalog) for this connection.
Returns the raw catalog dict as returned by the API, including stream schemas, sync modes, cursor fields, and primary keys.
The returned dict can be passed to import_raw_catalog() on this or
another connection to restore or clone the catalog configuration.
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
628 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 629 """Replace the configured catalog for this connection. 630 631 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 632 > could result in broken connections, and/or incorrect sync behavior. 633 634 Accepts a configured catalog dict and replaces the connection's entire 635 catalog with it. All other connection settings remain unchanged. 636 637 The catalog shape should match the output of `dump_raw_catalog()`: 638 `{"streams": [{"stream": {...}, "config": {...}}, ...]}`. 639 640 Args: 641 catalog: The configured catalog dict to set. 642 """ 643 api_util.replace_connection_catalog( 644 connection_id=self.connection_id, 645 configured_catalog_dict=catalog, 646 api_root=self.workspace.api_root, 647 client_id=self.workspace.client_id, 648 client_secret=self.workspace.client_secret, 649 bearer_token=self.workspace.bearer_token, 650 )
Replace the configured catalog for this connection.
⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.
The catalog shape should match the output of dump_raw_catalog():
{"streams": [{"stream": {...}, "config": {...}}, ...]}.
Arguments:
- catalog: The configured catalog dict to set.
652 def rename(self, name: str) -> CloudConnection: 653 """Rename the connection. 654 655 Args: 656 name: New name for the connection 657 658 Returns: 659 Updated CloudConnection object with refreshed info 660 """ 661 updated_response = api_util.patch_connection( 662 connection_id=self.connection_id, 663 api_root=self.workspace.api_root, 664 client_id=self.workspace.client_id, 665 client_secret=self.workspace.client_secret, 666 bearer_token=self.workspace.bearer_token, 667 name=name, 668 ) 669 self._connection_info = updated_response 670 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
672 def set_table_prefix(self, prefix: str) -> CloudConnection: 673 """Set the table prefix for the connection. 674 675 Args: 676 prefix: New table prefix to use when syncing to the destination 677 678 Returns: 679 Updated CloudConnection object with refreshed info 680 """ 681 updated_response = api_util.patch_connection( 682 connection_id=self.connection_id, 683 api_root=self.workspace.api_root, 684 client_id=self.workspace.client_id, 685 client_secret=self.workspace.client_secret, 686 bearer_token=self.workspace.bearer_token, 687 prefix=prefix, 688 ) 689 self._connection_info = updated_response 690 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
692 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 693 """Set the selected streams for the connection. 694 695 This is a destructive operation that can break existing connections if the 696 stream selection is changed incorrectly. Use with caution. 697 698 Args: 699 stream_names: List of stream names to sync 700 701 Returns: 702 Updated CloudConnection object with refreshed info 703 """ 704 configurations = api_util.build_stream_configurations(stream_names) 705 706 updated_response = api_util.patch_connection( 707 connection_id=self.connection_id, 708 api_root=self.workspace.api_root, 709 client_id=self.workspace.client_id, 710 client_secret=self.workspace.client_secret, 711 bearer_token=self.workspace.bearer_token, 712 configurations=configurations, 713 ) 714 self._connection_info = updated_response 715 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
719 @property 720 def enabled(self) -> bool: 721 """Get the current enabled status of the connection. 722 723 This property always fetches fresh data from the API to ensure accuracy, 724 as another process or user may have toggled the setting. 725 726 Returns: 727 True if the connection status is 'active', False otherwise. 728 """ 729 connection_info = self._fetch_connection_info(force_refresh=True) 730 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
742 def set_enabled( 743 self, 744 *, 745 enabled: bool, 746 ignore_noop: bool = True, 747 ) -> None: 748 """Set the enabled status of the connection. 749 750 Args: 751 enabled: True to enable (set status to 'active'), False to disable 752 (set status to 'inactive'). 753 ignore_noop: If True (default), silently return if the connection is already 754 in the requested state. If False, raise ValueError when the requested 755 state matches the current state. 756 757 Raises: 758 ValueError: If ignore_noop is False and the connection is already in the 759 requested state. 760 """ 761 # Always fetch fresh data to check current status 762 connection_info = self._fetch_connection_info(force_refresh=True) 763 current_status = connection_info.status 764 desired_status = ( 765 api_util.models.ConnectionStatusEnum.ACTIVE 766 if enabled 767 else api_util.models.ConnectionStatusEnum.INACTIVE 768 ) 769 770 if current_status == desired_status: 771 if ignore_noop: 772 return 773 raise ValueError( 774 f"Connection is already {'enabled' if enabled else 'disabled'}. " 775 f"Current status: {current_status}" 776 ) 777 778 updated_response = api_util.patch_connection( 779 connection_id=self.connection_id, 780 api_root=self.workspace.api_root, 781 client_id=self.workspace.client_id, 782 client_secret=self.workspace.client_secret, 783 bearer_token=self.workspace.bearer_token, 784 status=desired_status, 785 ) 786 self._connection_info = updated_response
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
790 def set_schedule( 791 self, 792 cron_expression: str, 793 ) -> None: 794 """Set a cron schedule for the connection. 795 796 Args: 797 cron_expression: A cron expression defining when syncs should run. 798 799 Examples: 800 - "0 0 * * *" - Daily at midnight UTC 801 - "0 */6 * * *" - Every 6 hours 802 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 803 """ 804 schedule = api_util.models.AirbyteAPIConnectionSchedule( 805 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 806 cron_expression=cron_expression, 807 ) 808 updated_response = api_util.patch_connection( 809 connection_id=self.connection_id, 810 api_root=self.workspace.api_root, 811 client_id=self.workspace.client_id, 812 client_secret=self.workspace.client_secret, 813 bearer_token=self.workspace.bearer_token, 814 schedule=schedule, 815 ) 816 self._connection_info = updated_response
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
818 def set_manual_schedule(self) -> None: 819 """Set the connection to manual scheduling. 820 821 Disables automatic syncs. Syncs will only run when manually triggered. 822 """ 823 schedule = api_util.models.AirbyteAPIConnectionSchedule( 824 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 825 ) 826 updated_response = api_util.patch_connection( 827 connection_id=self.connection_id, 828 api_root=self.workspace.api_root, 829 client_id=self.workspace.client_id, 830 client_secret=self.workspace.client_secret, 831 bearer_token=self.workspace.bearer_token, 832 schedule=schedule, 833 ) 834 self._connection_info = updated_response
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
838 def permanently_delete( 839 self, 840 *, 841 cascade_delete_source: bool = False, 842 cascade_delete_destination: bool = False, 843 ) -> None: 844 """Delete the connection. 845 846 Args: 847 cascade_delete_source: Whether to also delete the source. 848 cascade_delete_destination: Whether to also delete the destination. 849 """ 850 self.workspace.permanently_delete_connection(self) 851 852 if cascade_delete_source: 853 self.workspace.permanently_delete_source(self.source_id) 854 855 if cascade_delete_destination: 856 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.