airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6import logging 7from typing import TYPE_CHECKING, Any, Literal, overload 8 9from typing_extensions import deprecated 10 11from airbyte._util import api_util 12from airbyte.cloud._connection_catalog import ( 13 _denormalize_catalog_to_api, 14 _is_protocol_catalog_format, 15 _normalize_catalog_to_protocol, 16) 17from airbyte.cloud._connection_state import ( 18 ConnectionStateResponse, 19 _denormalize_protocol_state_to_api, 20 _get_stream_list, 21 _is_protocol_state_format, 22 _match_stream, 23 _normalize_state_to_protocol, 24) 25from airbyte.cloud.connectors import CloudDestination, CloudSource 26from airbyte.cloud.sync_results import SyncResult 27from airbyte.exceptions import AirbyteWorkspaceMismatchError, PyAirbyteInputError 28 29 30logger = logging.getLogger(__name__) 31 32 33if TYPE_CHECKING: 34 from airbyte_api.models import ConnectionResponse, JobResponse, JobTypeEnum 35 36 from airbyte.cloud.workspaces import CloudWorkspace 37 38 39class CloudConnection: # noqa: PLR0904 # Too many public methods 40 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 41 42 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 43 """ 44 45 def __init__( 46 self, 47 workspace: CloudWorkspace, 48 connection_id: str, 49 source: str | None = None, 50 destination: str | None = None, 51 ) -> None: 52 """It is not recommended to create a `CloudConnection` object directly. 53 54 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 55 """ 56 self.connection_id = connection_id 57 """The ID of the connection.""" 58 59 self.workspace = workspace 60 """The workspace that the connection belongs to.""" 61 62 self._source_id = source 63 """The ID of the source.""" 64 65 self._destination_id = destination 66 """The ID of the destination.""" 67 68 self._connection_info: ConnectionResponse | None = None 69 """The connection info object. (Cached.)""" 70 71 self._cloud_source_object: CloudSource | None = None 72 """The source object. (Cached.)""" 73 74 self._cloud_destination_object: CloudDestination | None = None 75 """The destination object. (Cached.)""" 76 77 def _fetch_connection_info( 78 self, 79 *, 80 force_refresh: bool = False, 81 verify: bool = True, 82 ) -> ConnectionResponse: 83 """Fetch and cache connection info from the API. 84 85 By default, this method will only fetch from the API if connection info is not 86 already cached. It also verifies that the connection belongs to the expected 87 workspace unless verification is explicitly disabled. 88 89 Args: 90 force_refresh: If True, always fetch from the API even if cached. 91 If False (default), only fetch if not already cached. 92 verify: If True (default), verify that the connection is valid (e.g., that 93 the workspace_id matches this object's workspace). Raises an error if 94 validation fails. 95 96 Returns: 97 The ConnectionResponse from the API. 98 99 Raises: 100 AirbyteWorkspaceMismatchError: If verify is True and the connection's 101 workspace_id doesn't match the expected workspace. 102 AirbyteMissingResourceError: If the connection doesn't exist. 103 """ 104 if not force_refresh and self._connection_info is not None: 105 # Use cached info, but still verify if requested 106 if verify: 107 self._verify_workspace_match(self._connection_info) 108 return self._connection_info 109 110 # Fetch from API 111 connection_info = api_util.get_connection( 112 workspace_id=self.workspace.workspace_id, 113 connection_id=self.connection_id, 114 api_root=self.workspace.api_root, 115 client_id=self.workspace.client_id, 116 client_secret=self.workspace.client_secret, 117 bearer_token=self.workspace.bearer_token, 118 ) 119 120 # Cache the result first (before verification may raise) 121 self._connection_info = connection_info 122 123 # Verify if requested 124 if verify: 125 self._verify_workspace_match(connection_info) 126 127 return connection_info 128 129 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 130 """Verify that the connection belongs to the expected workspace. 131 132 Raises: 133 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 134 """ 135 if connection_info.workspace_id != self.workspace.workspace_id: 136 raise AirbyteWorkspaceMismatchError( 137 resource_type="connection", 138 resource_id=self.connection_id, 139 workspace=self.workspace, 140 expected_workspace_id=self.workspace.workspace_id, 141 actual_workspace_id=connection_info.workspace_id, 142 message=( 143 f"Connection '{self.connection_id}' belongs to workspace " 144 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 145 ), 146 ) 147 148 def check_is_valid(self) -> bool: 149 """Check if this connection exists and belongs to the expected workspace. 150 151 This method fetches connection info from the API (if not already cached) and 152 verifies that the connection's workspace_id matches the workspace associated 153 with this CloudConnection object. 154 155 Returns: 156 True if the connection exists and belongs to the expected workspace. 157 158 Raises: 159 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 160 AirbyteMissingResourceError: If the connection doesn't exist. 161 """ 162 self._fetch_connection_info(force_refresh=False, verify=True) 163 return True 164 165 @classmethod 166 def _from_connection_response( 167 cls, 168 workspace: CloudWorkspace, 169 connection_response: ConnectionResponse, 170 ) -> CloudConnection: 171 """Create a CloudConnection from a ConnectionResponse.""" 172 result = cls( 173 workspace=workspace, 174 connection_id=connection_response.connection_id, 175 source=connection_response.source_id, 176 destination=connection_response.destination_id, 177 ) 178 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 179 return result 180 181 # Properties 182 183 @property 184 def name(self) -> str | None: 185 """Get the display name of the connection, if available. 186 187 E.g. "My Postgres to Snowflake", not the connection ID. 188 """ 189 if not self._connection_info: 190 self._connection_info = self._fetch_connection_info() 191 192 return self._connection_info.name 193 194 @property 195 def source_id(self) -> str: 196 """The ID of the source.""" 197 if not self._source_id: 198 if not self._connection_info: 199 self._connection_info = self._fetch_connection_info() 200 201 self._source_id = self._connection_info.source_id 202 203 return self._source_id 204 205 @property 206 def source(self) -> CloudSource: 207 """Get the source object.""" 208 if self._cloud_source_object: 209 return self._cloud_source_object 210 211 self._cloud_source_object = CloudSource( 212 workspace=self.workspace, 213 connector_id=self.source_id, 214 ) 215 return self._cloud_source_object 216 217 @property 218 def destination_id(self) -> str: 219 """The ID of the destination.""" 220 if not self._destination_id: 221 if not self._connection_info: 222 self._connection_info = self._fetch_connection_info() 223 224 self._destination_id = self._connection_info.destination_id 225 226 return self._destination_id 227 228 @property 229 def destination(self) -> CloudDestination: 230 """Get the destination object.""" 231 if self._cloud_destination_object: 232 return self._cloud_destination_object 233 234 self._cloud_destination_object = CloudDestination( 235 workspace=self.workspace, 236 connector_id=self.destination_id, 237 ) 238 return self._cloud_destination_object 239 240 @property 241 def stream_names(self) -> list[str]: 242 """The stream names.""" 243 if not self._connection_info: 244 self._connection_info = self._fetch_connection_info() 245 246 return [stream.name for stream in self._connection_info.configurations.streams or []] 247 248 @property 249 def table_prefix(self) -> str: 250 """The table prefix.""" 251 if not self._connection_info: 252 self._connection_info = self._fetch_connection_info() 253 254 return self._connection_info.prefix or "" 255 256 @property 257 def connection_url(self) -> str | None: 258 """The web URL to the connection.""" 259 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 260 261 @property 262 def job_history_url(self) -> str | None: 263 """The URL to the job history for the connection.""" 264 return f"{self.connection_url}/timeline" 265 266 # Run Sync 267 268 def run_sync( 269 self, 270 *, 271 wait: bool = True, 272 wait_timeout: int = 300, 273 ) -> SyncResult: 274 """Run a sync.""" 275 connection_response = api_util.run_connection( 276 connection_id=self.connection_id, 277 api_root=self.workspace.api_root, 278 workspace_id=self.workspace.workspace_id, 279 client_id=self.workspace.client_id, 280 client_secret=self.workspace.client_secret, 281 bearer_token=self.workspace.bearer_token, 282 ) 283 sync_result = SyncResult( 284 workspace=self.workspace, 285 connection=self, 286 job_id=connection_response.job_id, 287 ) 288 289 if wait: 290 sync_result.wait_for_completion( 291 wait_timeout=wait_timeout, 292 raise_failure=True, 293 raise_timeout=True, 294 ) 295 296 return sync_result 297 298 def __repr__(self) -> str: 299 """String representation of the connection.""" 300 return ( 301 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 302 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 303 ) 304 305 # Logs 306 307 def get_previous_sync_logs( 308 self, 309 *, 310 limit: int = 20, 311 offset: int | None = None, 312 from_tail: bool = True, 313 job_type: JobTypeEnum | None = None, 314 ) -> list[SyncResult]: 315 """Get previous sync jobs for a connection with pagination support. 316 317 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 318 rows_synced, start_time). Full log text can be fetched lazily via 319 `SyncResult.get_full_log_text()`. 320 321 Args: 322 limit: Maximum number of jobs to return. Defaults to 20. 323 offset: Number of jobs to skip from the beginning. Defaults to None (0). 324 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 325 If False, returns jobs ordered oldest-first (createdAt ASC). 326 Defaults to True. 327 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 328 If not specified, defaults to sync and reset jobs only (API default behavior). 329 330 Returns: 331 A list of SyncResult objects representing the sync jobs. 332 """ 333 order_by = ( 334 api_util.JOB_ORDER_BY_CREATED_AT_DESC 335 if from_tail 336 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 337 ) 338 sync_logs: list[JobResponse] = api_util.get_job_logs( 339 connection_id=self.connection_id, 340 api_root=self.workspace.api_root, 341 workspace_id=self.workspace.workspace_id, 342 limit=limit, 343 offset=offset, 344 order_by=order_by, 345 job_type=job_type, 346 client_id=self.workspace.client_id, 347 client_secret=self.workspace.client_secret, 348 bearer_token=self.workspace.bearer_token, 349 ) 350 return [ 351 SyncResult( 352 workspace=self.workspace, 353 connection=self, 354 job_id=sync_log.job_id, 355 _latest_job_info=sync_log, 356 ) 357 for sync_log in sync_logs 358 ] 359 360 def get_sync_result( 361 self, 362 job_id: int | None = None, 363 ) -> SyncResult | None: 364 """Get the sync result for the connection. 365 366 If `job_id` is not provided, the most recent sync job will be used. 367 368 Returns `None` if job_id is omitted and no previous jobs are found. 369 """ 370 if job_id is None: 371 # Get the most recent sync job 372 results = self.get_previous_sync_logs( 373 limit=1, 374 ) 375 if results: 376 return results[0] 377 378 return None 379 380 # Get the sync job by ID (lazy loaded) 381 return SyncResult( 382 workspace=self.workspace, 383 connection=self, 384 job_id=job_id, 385 ) 386 387 # Artifacts 388 389 @deprecated("Use 'dump_raw_state()' instead.") 390 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 391 """Deprecated. Use `dump_raw_state()` instead.""" 392 state_response = api_util.get_connection_state( 393 connection_id=self.connection_id, 394 api_root=self.workspace.api_root, 395 client_id=self.workspace.client_id, 396 client_secret=self.workspace.client_secret, 397 bearer_token=self.workspace.bearer_token, 398 ) 399 if state_response.get("stateType") == "not_set": 400 return None 401 return state_response.get("streamState", []) 402 403 @overload 404 def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ... 405 406 @overload 407 def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ... 408 409 def dump_raw_state( 410 self, 411 *, 412 normalize: bool = True, 413 ) -> dict[str, Any] | list[dict[str, Any]]: 414 """Dump the state for this connection. 415 416 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 417 with snake_case keys, suitable for passing to a connector's `--state` flag. 418 419 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 420 includes `stateType` and `connectionId`). This raw format can be passed 421 directly to `import_raw_state()` for backup/restore workflows. 422 423 Args: 424 normalize: If `True` (default), convert to Airbyte protocol format. 425 If `False`, return the raw Config API response. 426 427 Returns: 428 Normalized: list of protocol-format state message dicts (empty list if 429 no state). Raw: the full Config API state dict. 430 """ 431 raw = api_util.get_connection_state( 432 connection_id=self.connection_id, 433 api_root=self.workspace.api_root, 434 client_id=self.workspace.client_id, 435 client_secret=self.workspace.client_secret, 436 bearer_token=self.workspace.bearer_token, 437 ) 438 if normalize: 439 return _normalize_state_to_protocol(raw) 440 return raw 441 442 def import_raw_state( 443 self, 444 connection_state: dict[str, Any] | list[dict[str, Any]], 445 ) -> dict[str, Any]: 446 """Import (restore) the full state for this connection. 447 448 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 449 > could result in broken connections, and/or incorrect sync behavior. 450 451 Replaces the entire connection state with the provided state blob. 452 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 453 454 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 455 The `connectionId` in the blob is always overridden with this connection's 456 ID, making state blobs portable across connections. 457 458 Accepts either format: 459 460 - **Config API format** (dict with `stateType`): passed through directly. 461 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 462 converted to Config API format before sending. 463 464 Args: 465 connection_state: Connection state in either Config API or Airbyte protocol format. 466 467 Returns: 468 The updated connection state as a dictionary. 469 470 Raises: 471 AirbyteConnectionSyncActiveError: If a sync is currently running on this 472 connection (HTTP 423). Wait for the sync to complete before retrying. 473 """ 474 api_state: dict[str, Any] 475 if isinstance(connection_state, list): 476 if not _is_protocol_state_format(connection_state): 477 msg = ( 478 "Expected connection_state list to contain Airbyte protocol state " 479 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 480 "or LEGACY). Got a list that does not match protocol format." 481 ) 482 raise ValueError(msg) 483 api_state = _denormalize_protocol_state_to_api( 484 protocol_messages=connection_state, 485 connection_id=self.connection_id, 486 ) 487 elif isinstance(connection_state, dict): 488 if _is_protocol_state_format(connection_state): 489 api_state = _denormalize_protocol_state_to_api( 490 protocol_messages=[connection_state], 491 connection_id=self.connection_id, 492 ) 493 else: 494 api_state = connection_state 495 else: 496 msg = f"Expected a dict or list, got {type(connection_state)}" 497 raise TypeError(msg) 498 499 return api_util.replace_connection_state( 500 connection_id=self.connection_id, 501 connection_state_dict=api_state, 502 api_root=self.workspace.api_root, 503 client_id=self.workspace.client_id, 504 client_secret=self.workspace.client_secret, 505 bearer_token=self.workspace.bearer_token, 506 ) 507 508 def get_stream_state( 509 self, 510 stream_name: str, 511 stream_namespace: str | None = None, 512 ) -> dict[str, Any] | None: 513 """Get the state blob for a single stream within this connection. 514 515 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 516 not the full connection state envelope. 517 518 This is compatible with `stream`-type state and stream-level entries 519 within a `global`-type state. It is not compatible with `legacy` state. 520 To get or set the entire connection-level state artifact, use 521 `dump_raw_state` and `import_raw_state` instead. 522 523 Args: 524 stream_name: The name of the stream to get state for. 525 stream_namespace: The source-side stream namespace. This refers to the 526 namespace from the source (e.g., database schema), not any destination 527 namespace override set in connection advanced settings. 528 529 Returns: 530 The stream's state blob as a dictionary, or None if the stream is not found. 531 """ 532 state_data = self.dump_raw_state(normalize=False) 533 result = ConnectionStateResponse(**state_data) 534 535 streams = _get_stream_list(result) 536 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 537 538 if not matching: 539 available = [s.stream_descriptor.name for s in streams] 540 logger.warning( 541 "Stream '%s' not found in connection state for connection '%s'. " 542 "Available streams: %s", 543 stream_name, 544 self.connection_id, 545 available, 546 ) 547 return None 548 549 return matching[0].stream_state 550 551 def set_stream_state( 552 self, 553 stream_name: str, 554 state_blob_dict: dict[str, Any], 555 stream_namespace: str | None = None, 556 ) -> None: 557 """Set the state for a single stream within this connection. 558 559 Fetches the current full state, replaces only the specified stream's state, 560 then sends the full updated state back to the API. If the stream does not 561 exist in the current state, it is appended. 562 563 This is compatible with `stream`-type state and stream-level entries 564 within a `global`-type state. It is not compatible with `legacy` state. 565 To get or set the entire connection-level state artifact, use 566 `dump_raw_state` and `import_raw_state` instead. 567 568 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 569 570 Args: 571 stream_name: The name of the stream to update state for. 572 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 573 stream_namespace: The source-side stream namespace. This refers to the 574 namespace from the source (e.g., database schema), not any destination 575 namespace override set in connection advanced settings. 576 577 Raises: 578 PyAirbyteInputError: If the connection state type is not supported for 579 stream-level operations (not_set, legacy). 580 AirbyteConnectionSyncActiveError: If a sync is currently running on this 581 connection (HTTP 423). Wait for the sync to complete before retrying. 582 """ 583 state_data = self.dump_raw_state(normalize=False) 584 current = ConnectionStateResponse(**state_data) 585 586 if current.state_type == "not_set": 587 raise PyAirbyteInputError( 588 message="Cannot set stream state: connection has no existing state.", 589 context={"connection_id": self.connection_id}, 590 ) 591 592 if current.state_type == "legacy": 593 raise PyAirbyteInputError( 594 message="Cannot set stream state on a legacy-type connection state.", 595 context={"connection_id": self.connection_id}, 596 ) 597 598 new_stream_entry = { 599 "streamDescriptor": { 600 "name": stream_name, 601 **( 602 { 603 "namespace": stream_namespace, 604 } 605 if stream_namespace 606 else {} 607 ), 608 }, 609 "streamState": state_blob_dict, 610 } 611 612 raw_streams: list[dict[str, Any]] 613 if current.state_type == "stream": 614 raw_streams = state_data.get("streamState", []) 615 elif current.state_type == "global": 616 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 617 else: 618 raw_streams = [] 619 620 streams = _get_stream_list(current) 621 found = False 622 updated_streams_raw: list[dict[str, Any]] = [] 623 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 624 if _match_stream(parsed_s, stream_name, stream_namespace): 625 updated_streams_raw.append(new_stream_entry) 626 found = True 627 else: 628 updated_streams_raw.append(raw_s) 629 630 if not found: 631 updated_streams_raw.append(new_stream_entry) 632 633 full_state: dict[str, Any] = { 634 **state_data, 635 } 636 637 if current.state_type == "stream": 638 full_state["streamState"] = updated_streams_raw 639 elif current.state_type == "global": 640 original_global = state_data.get("globalState", {}) 641 full_state["globalState"] = { 642 **original_global, 643 "streamStates": updated_streams_raw, 644 } 645 646 self.import_raw_state(full_state) 647 648 @deprecated("Use 'dump_raw_catalog()' instead.") 649 def get_catalog_artifact(self) -> dict[str, Any] | None: 650 """Get the configured catalog for this connection. 651 652 Returns the full configured catalog (syncCatalog) for this connection, 653 including stream schemas, sync modes, cursor fields, and primary keys. 654 655 Uses the Config API endpoint: POST /v1/web_backend/connections/get 656 657 Returns: 658 Dictionary containing the configured catalog, or `None` if not found. 659 """ 660 return self.dump_raw_catalog() 661 662 def dump_raw_catalog( 663 self, 664 *, 665 normalize: bool = True, 666 ) -> dict[str, Any] | None: 667 """Dump the configured catalog for this connection. 668 669 By default, returns the catalog in Airbyte protocol format 670 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 671 to a connector's `--catalog` flag. 672 673 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 674 Config API (camelCase keys, nested `config` block). This raw format can be 675 passed directly to `import_raw_catalog()` for backup/restore workflows. 676 677 Args: 678 normalize: If `True` (default), convert to Airbyte protocol format. 679 If `False`, return the raw Config API catalog. 680 681 Returns: 682 The configured catalog dict, or `None` if not found. 683 """ 684 connection_response = api_util.get_connection_catalog( 685 connection_id=self.connection_id, 686 api_root=self.workspace.api_root, 687 client_id=self.workspace.client_id, 688 client_secret=self.workspace.client_secret, 689 bearer_token=self.workspace.bearer_token, 690 ) 691 raw = connection_response.get("syncCatalog") 692 if raw is None: 693 return None 694 if normalize: 695 return _normalize_catalog_to_protocol(raw) 696 return raw 697 698 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 699 """Replace the configured catalog for this connection. 700 701 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 702 > could result in broken connections, and/or incorrect sync behavior. 703 704 Accepts a configured catalog dict and replaces the connection's entire 705 catalog with it. All other connection settings remain unchanged. 706 707 Accepts either format: 708 709 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 710 passed through directly. 711 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 712 automatically converted to Config API format before sending. 713 714 Args: 715 catalog: The configured catalog dict in either format. 716 """ 717 if _is_protocol_catalog_format(catalog): 718 catalog = _denormalize_catalog_to_api(catalog) 719 720 api_util.replace_connection_catalog( 721 connection_id=self.connection_id, 722 configured_catalog_dict=catalog, 723 api_root=self.workspace.api_root, 724 client_id=self.workspace.client_id, 725 client_secret=self.workspace.client_secret, 726 bearer_token=self.workspace.bearer_token, 727 ) 728 729 def rename(self, name: str) -> CloudConnection: 730 """Rename the connection. 731 732 Args: 733 name: New name for the connection 734 735 Returns: 736 Updated CloudConnection object with refreshed info 737 """ 738 updated_response = api_util.patch_connection( 739 connection_id=self.connection_id, 740 api_root=self.workspace.api_root, 741 client_id=self.workspace.client_id, 742 client_secret=self.workspace.client_secret, 743 bearer_token=self.workspace.bearer_token, 744 name=name, 745 ) 746 self._connection_info = updated_response 747 return self 748 749 def set_table_prefix(self, prefix: str) -> CloudConnection: 750 """Set the table prefix for the connection. 751 752 Args: 753 prefix: New table prefix to use when syncing to the destination 754 755 Returns: 756 Updated CloudConnection object with refreshed info 757 """ 758 updated_response = api_util.patch_connection( 759 connection_id=self.connection_id, 760 api_root=self.workspace.api_root, 761 client_id=self.workspace.client_id, 762 client_secret=self.workspace.client_secret, 763 bearer_token=self.workspace.bearer_token, 764 prefix=prefix, 765 ) 766 self._connection_info = updated_response 767 return self 768 769 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 770 """Set the selected streams for the connection. 771 772 This is a destructive operation that can break existing connections if the 773 stream selection is changed incorrectly. Use with caution. 774 775 Args: 776 stream_names: List of stream names to sync 777 778 Returns: 779 Updated CloudConnection object with refreshed info 780 """ 781 configurations = api_util.build_stream_configurations(stream_names) 782 783 updated_response = api_util.patch_connection( 784 connection_id=self.connection_id, 785 api_root=self.workspace.api_root, 786 client_id=self.workspace.client_id, 787 client_secret=self.workspace.client_secret, 788 bearer_token=self.workspace.bearer_token, 789 configurations=configurations, 790 ) 791 self._connection_info = updated_response 792 return self 793 794 # Enable/Disable 795 796 @property 797 def enabled(self) -> bool: 798 """Get the current enabled status of the connection. 799 800 This property always fetches fresh data from the API to ensure accuracy, 801 as another process or user may have toggled the setting. 802 803 Returns: 804 True if the connection status is 'active', False otherwise. 805 """ 806 connection_info = self._fetch_connection_info(force_refresh=True) 807 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 808 809 @enabled.setter 810 def enabled(self, value: bool) -> None: 811 """Set the enabled status of the connection. 812 813 Args: 814 value: True to enable (set status to 'active'), False to disable 815 (set status to 'inactive'). 816 """ 817 self.set_enabled(enabled=value) 818 819 def set_enabled( 820 self, 821 *, 822 enabled: bool, 823 ignore_noop: bool = True, 824 ) -> None: 825 """Set the enabled status of the connection. 826 827 Args: 828 enabled: True to enable (set status to 'active'), False to disable 829 (set status to 'inactive'). 830 ignore_noop: If True (default), silently return if the connection is already 831 in the requested state. If False, raise ValueError when the requested 832 state matches the current state. 833 834 Raises: 835 ValueError: If ignore_noop is False and the connection is already in the 836 requested state. 837 """ 838 # Always fetch fresh data to check current status 839 connection_info = self._fetch_connection_info(force_refresh=True) 840 current_status = connection_info.status 841 desired_status = ( 842 api_util.models.ConnectionStatusEnum.ACTIVE 843 if enabled 844 else api_util.models.ConnectionStatusEnum.INACTIVE 845 ) 846 847 if current_status == desired_status: 848 if ignore_noop: 849 return 850 raise ValueError( 851 f"Connection is already {'enabled' if enabled else 'disabled'}. " 852 f"Current status: {current_status}" 853 ) 854 855 updated_response = api_util.patch_connection( 856 connection_id=self.connection_id, 857 api_root=self.workspace.api_root, 858 client_id=self.workspace.client_id, 859 client_secret=self.workspace.client_secret, 860 bearer_token=self.workspace.bearer_token, 861 status=desired_status, 862 ) 863 self._connection_info = updated_response 864 865 # Scheduling 866 867 def set_schedule( 868 self, 869 cron_expression: str, 870 ) -> None: 871 """Set a cron schedule for the connection. 872 873 Args: 874 cron_expression: A cron expression defining when syncs should run. 875 876 Examples: 877 - "0 0 * * *" - Daily at midnight UTC 878 - "0 */6 * * *" - Every 6 hours 879 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 880 """ 881 schedule = api_util.models.AirbyteAPIConnectionSchedule( 882 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 883 cron_expression=cron_expression, 884 ) 885 updated_response = api_util.patch_connection( 886 connection_id=self.connection_id, 887 api_root=self.workspace.api_root, 888 client_id=self.workspace.client_id, 889 client_secret=self.workspace.client_secret, 890 bearer_token=self.workspace.bearer_token, 891 schedule=schedule, 892 ) 893 self._connection_info = updated_response 894 895 def set_manual_schedule(self) -> None: 896 """Set the connection to manual scheduling. 897 898 Disables automatic syncs. Syncs will only run when manually triggered. 899 """ 900 schedule = api_util.models.AirbyteAPIConnectionSchedule( 901 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 902 ) 903 updated_response = api_util.patch_connection( 904 connection_id=self.connection_id, 905 api_root=self.workspace.api_root, 906 client_id=self.workspace.client_id, 907 client_secret=self.workspace.client_secret, 908 bearer_token=self.workspace.bearer_token, 909 schedule=schedule, 910 ) 911 self._connection_info = updated_response 912 913 # Deletions 914 915 def permanently_delete( 916 self, 917 *, 918 cascade_delete_source: bool = False, 919 cascade_delete_destination: bool = False, 920 ) -> None: 921 """Delete the connection. 922 923 Args: 924 cascade_delete_source: Whether to also delete the source. 925 cascade_delete_destination: Whether to also delete the destination. 926 """ 927 self.workspace.permanently_delete_connection(self) 928 929 if cascade_delete_source: 930 self.workspace.permanently_delete_source(self.source_id) 931 932 if cascade_delete_destination: 933 self.workspace.permanently_delete_destination(self.destination_id)
40class CloudConnection: # noqa: PLR0904 # Too many public methods 41 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 42 43 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 44 """ 45 46 def __init__( 47 self, 48 workspace: CloudWorkspace, 49 connection_id: str, 50 source: str | None = None, 51 destination: str | None = None, 52 ) -> None: 53 """It is not recommended to create a `CloudConnection` object directly. 54 55 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 56 """ 57 self.connection_id = connection_id 58 """The ID of the connection.""" 59 60 self.workspace = workspace 61 """The workspace that the connection belongs to.""" 62 63 self._source_id = source 64 """The ID of the source.""" 65 66 self._destination_id = destination 67 """The ID of the destination.""" 68 69 self._connection_info: ConnectionResponse | None = None 70 """The connection info object. (Cached.)""" 71 72 self._cloud_source_object: CloudSource | None = None 73 """The source object. (Cached.)""" 74 75 self._cloud_destination_object: CloudDestination | None = None 76 """The destination object. (Cached.)""" 77 78 def _fetch_connection_info( 79 self, 80 *, 81 force_refresh: bool = False, 82 verify: bool = True, 83 ) -> ConnectionResponse: 84 """Fetch and cache connection info from the API. 85 86 By default, this method will only fetch from the API if connection info is not 87 already cached. It also verifies that the connection belongs to the expected 88 workspace unless verification is explicitly disabled. 89 90 Args: 91 force_refresh: If True, always fetch from the API even if cached. 92 If False (default), only fetch if not already cached. 93 verify: If True (default), verify that the connection is valid (e.g., that 94 the workspace_id matches this object's workspace). Raises an error if 95 validation fails. 96 97 Returns: 98 The ConnectionResponse from the API. 99 100 Raises: 101 AirbyteWorkspaceMismatchError: If verify is True and the connection's 102 workspace_id doesn't match the expected workspace. 103 AirbyteMissingResourceError: If the connection doesn't exist. 104 """ 105 if not force_refresh and self._connection_info is not None: 106 # Use cached info, but still verify if requested 107 if verify: 108 self._verify_workspace_match(self._connection_info) 109 return self._connection_info 110 111 # Fetch from API 112 connection_info = api_util.get_connection( 113 workspace_id=self.workspace.workspace_id, 114 connection_id=self.connection_id, 115 api_root=self.workspace.api_root, 116 client_id=self.workspace.client_id, 117 client_secret=self.workspace.client_secret, 118 bearer_token=self.workspace.bearer_token, 119 ) 120 121 # Cache the result first (before verification may raise) 122 self._connection_info = connection_info 123 124 # Verify if requested 125 if verify: 126 self._verify_workspace_match(connection_info) 127 128 return connection_info 129 130 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 131 """Verify that the connection belongs to the expected workspace. 132 133 Raises: 134 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 135 """ 136 if connection_info.workspace_id != self.workspace.workspace_id: 137 raise AirbyteWorkspaceMismatchError( 138 resource_type="connection", 139 resource_id=self.connection_id, 140 workspace=self.workspace, 141 expected_workspace_id=self.workspace.workspace_id, 142 actual_workspace_id=connection_info.workspace_id, 143 message=( 144 f"Connection '{self.connection_id}' belongs to workspace " 145 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 146 ), 147 ) 148 149 def check_is_valid(self) -> bool: 150 """Check if this connection exists and belongs to the expected workspace. 151 152 This method fetches connection info from the API (if not already cached) and 153 verifies that the connection's workspace_id matches the workspace associated 154 with this CloudConnection object. 155 156 Returns: 157 True if the connection exists and belongs to the expected workspace. 158 159 Raises: 160 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 161 AirbyteMissingResourceError: If the connection doesn't exist. 162 """ 163 self._fetch_connection_info(force_refresh=False, verify=True) 164 return True 165 166 @classmethod 167 def _from_connection_response( 168 cls, 169 workspace: CloudWorkspace, 170 connection_response: ConnectionResponse, 171 ) -> CloudConnection: 172 """Create a CloudConnection from a ConnectionResponse.""" 173 result = cls( 174 workspace=workspace, 175 connection_id=connection_response.connection_id, 176 source=connection_response.source_id, 177 destination=connection_response.destination_id, 178 ) 179 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 180 return result 181 182 # Properties 183 184 @property 185 def name(self) -> str | None: 186 """Get the display name of the connection, if available. 187 188 E.g. "My Postgres to Snowflake", not the connection ID. 189 """ 190 if not self._connection_info: 191 self._connection_info = self._fetch_connection_info() 192 193 return self._connection_info.name 194 195 @property 196 def source_id(self) -> str: 197 """The ID of the source.""" 198 if not self._source_id: 199 if not self._connection_info: 200 self._connection_info = self._fetch_connection_info() 201 202 self._source_id = self._connection_info.source_id 203 204 return self._source_id 205 206 @property 207 def source(self) -> CloudSource: 208 """Get the source object.""" 209 if self._cloud_source_object: 210 return self._cloud_source_object 211 212 self._cloud_source_object = CloudSource( 213 workspace=self.workspace, 214 connector_id=self.source_id, 215 ) 216 return self._cloud_source_object 217 218 @property 219 def destination_id(self) -> str: 220 """The ID of the destination.""" 221 if not self._destination_id: 222 if not self._connection_info: 223 self._connection_info = self._fetch_connection_info() 224 225 self._destination_id = self._connection_info.destination_id 226 227 return self._destination_id 228 229 @property 230 def destination(self) -> CloudDestination: 231 """Get the destination object.""" 232 if self._cloud_destination_object: 233 return self._cloud_destination_object 234 235 self._cloud_destination_object = CloudDestination( 236 workspace=self.workspace, 237 connector_id=self.destination_id, 238 ) 239 return self._cloud_destination_object 240 241 @property 242 def stream_names(self) -> list[str]: 243 """The stream names.""" 244 if not self._connection_info: 245 self._connection_info = self._fetch_connection_info() 246 247 return [stream.name for stream in self._connection_info.configurations.streams or []] 248 249 @property 250 def table_prefix(self) -> str: 251 """The table prefix.""" 252 if not self._connection_info: 253 self._connection_info = self._fetch_connection_info() 254 255 return self._connection_info.prefix or "" 256 257 @property 258 def connection_url(self) -> str | None: 259 """The web URL to the connection.""" 260 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 261 262 @property 263 def job_history_url(self) -> str | None: 264 """The URL to the job history for the connection.""" 265 return f"{self.connection_url}/timeline" 266 267 # Run Sync 268 269 def run_sync( 270 self, 271 *, 272 wait: bool = True, 273 wait_timeout: int = 300, 274 ) -> SyncResult: 275 """Run a sync.""" 276 connection_response = api_util.run_connection( 277 connection_id=self.connection_id, 278 api_root=self.workspace.api_root, 279 workspace_id=self.workspace.workspace_id, 280 client_id=self.workspace.client_id, 281 client_secret=self.workspace.client_secret, 282 bearer_token=self.workspace.bearer_token, 283 ) 284 sync_result = SyncResult( 285 workspace=self.workspace, 286 connection=self, 287 job_id=connection_response.job_id, 288 ) 289 290 if wait: 291 sync_result.wait_for_completion( 292 wait_timeout=wait_timeout, 293 raise_failure=True, 294 raise_timeout=True, 295 ) 296 297 return sync_result 298 299 def __repr__(self) -> str: 300 """String representation of the connection.""" 301 return ( 302 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 303 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 304 ) 305 306 # Logs 307 308 def get_previous_sync_logs( 309 self, 310 *, 311 limit: int = 20, 312 offset: int | None = None, 313 from_tail: bool = True, 314 job_type: JobTypeEnum | None = None, 315 ) -> list[SyncResult]: 316 """Get previous sync jobs for a connection with pagination support. 317 318 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 319 rows_synced, start_time). Full log text can be fetched lazily via 320 `SyncResult.get_full_log_text()`. 321 322 Args: 323 limit: Maximum number of jobs to return. Defaults to 20. 324 offset: Number of jobs to skip from the beginning. Defaults to None (0). 325 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 326 If False, returns jobs ordered oldest-first (createdAt ASC). 327 Defaults to True. 328 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 329 If not specified, defaults to sync and reset jobs only (API default behavior). 330 331 Returns: 332 A list of SyncResult objects representing the sync jobs. 333 """ 334 order_by = ( 335 api_util.JOB_ORDER_BY_CREATED_AT_DESC 336 if from_tail 337 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 338 ) 339 sync_logs: list[JobResponse] = api_util.get_job_logs( 340 connection_id=self.connection_id, 341 api_root=self.workspace.api_root, 342 workspace_id=self.workspace.workspace_id, 343 limit=limit, 344 offset=offset, 345 order_by=order_by, 346 job_type=job_type, 347 client_id=self.workspace.client_id, 348 client_secret=self.workspace.client_secret, 349 bearer_token=self.workspace.bearer_token, 350 ) 351 return [ 352 SyncResult( 353 workspace=self.workspace, 354 connection=self, 355 job_id=sync_log.job_id, 356 _latest_job_info=sync_log, 357 ) 358 for sync_log in sync_logs 359 ] 360 361 def get_sync_result( 362 self, 363 job_id: int | None = None, 364 ) -> SyncResult | None: 365 """Get the sync result for the connection. 366 367 If `job_id` is not provided, the most recent sync job will be used. 368 369 Returns `None` if job_id is omitted and no previous jobs are found. 370 """ 371 if job_id is None: 372 # Get the most recent sync job 373 results = self.get_previous_sync_logs( 374 limit=1, 375 ) 376 if results: 377 return results[0] 378 379 return None 380 381 # Get the sync job by ID (lazy loaded) 382 return SyncResult( 383 workspace=self.workspace, 384 connection=self, 385 job_id=job_id, 386 ) 387 388 # Artifacts 389 390 @deprecated("Use 'dump_raw_state()' instead.") 391 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 392 """Deprecated. Use `dump_raw_state()` instead.""" 393 state_response = api_util.get_connection_state( 394 connection_id=self.connection_id, 395 api_root=self.workspace.api_root, 396 client_id=self.workspace.client_id, 397 client_secret=self.workspace.client_secret, 398 bearer_token=self.workspace.bearer_token, 399 ) 400 if state_response.get("stateType") == "not_set": 401 return None 402 return state_response.get("streamState", []) 403 404 @overload 405 def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ... 406 407 @overload 408 def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ... 409 410 def dump_raw_state( 411 self, 412 *, 413 normalize: bool = True, 414 ) -> dict[str, Any] | list[dict[str, Any]]: 415 """Dump the state for this connection. 416 417 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 418 with snake_case keys, suitable for passing to a connector's `--state` flag. 419 420 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 421 includes `stateType` and `connectionId`). This raw format can be passed 422 directly to `import_raw_state()` for backup/restore workflows. 423 424 Args: 425 normalize: If `True` (default), convert to Airbyte protocol format. 426 If `False`, return the raw Config API response. 427 428 Returns: 429 Normalized: list of protocol-format state message dicts (empty list if 430 no state). Raw: the full Config API state dict. 431 """ 432 raw = api_util.get_connection_state( 433 connection_id=self.connection_id, 434 api_root=self.workspace.api_root, 435 client_id=self.workspace.client_id, 436 client_secret=self.workspace.client_secret, 437 bearer_token=self.workspace.bearer_token, 438 ) 439 if normalize: 440 return _normalize_state_to_protocol(raw) 441 return raw 442 443 def import_raw_state( 444 self, 445 connection_state: dict[str, Any] | list[dict[str, Any]], 446 ) -> dict[str, Any]: 447 """Import (restore) the full state for this connection. 448 449 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 450 > could result in broken connections, and/or incorrect sync behavior. 451 452 Replaces the entire connection state with the provided state blob. 453 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 454 455 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 456 The `connectionId` in the blob is always overridden with this connection's 457 ID, making state blobs portable across connections. 458 459 Accepts either format: 460 461 - **Config API format** (dict with `stateType`): passed through directly. 462 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 463 converted to Config API format before sending. 464 465 Args: 466 connection_state: Connection state in either Config API or Airbyte protocol format. 467 468 Returns: 469 The updated connection state as a dictionary. 470 471 Raises: 472 AirbyteConnectionSyncActiveError: If a sync is currently running on this 473 connection (HTTP 423). Wait for the sync to complete before retrying. 474 """ 475 api_state: dict[str, Any] 476 if isinstance(connection_state, list): 477 if not _is_protocol_state_format(connection_state): 478 msg = ( 479 "Expected connection_state list to contain Airbyte protocol state " 480 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 481 "or LEGACY). Got a list that does not match protocol format." 482 ) 483 raise ValueError(msg) 484 api_state = _denormalize_protocol_state_to_api( 485 protocol_messages=connection_state, 486 connection_id=self.connection_id, 487 ) 488 elif isinstance(connection_state, dict): 489 if _is_protocol_state_format(connection_state): 490 api_state = _denormalize_protocol_state_to_api( 491 protocol_messages=[connection_state], 492 connection_id=self.connection_id, 493 ) 494 else: 495 api_state = connection_state 496 else: 497 msg = f"Expected a dict or list, got {type(connection_state)}" 498 raise TypeError(msg) 499 500 return api_util.replace_connection_state( 501 connection_id=self.connection_id, 502 connection_state_dict=api_state, 503 api_root=self.workspace.api_root, 504 client_id=self.workspace.client_id, 505 client_secret=self.workspace.client_secret, 506 bearer_token=self.workspace.bearer_token, 507 ) 508 509 def get_stream_state( 510 self, 511 stream_name: str, 512 stream_namespace: str | None = None, 513 ) -> dict[str, Any] | None: 514 """Get the state blob for a single stream within this connection. 515 516 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 517 not the full connection state envelope. 518 519 This is compatible with `stream`-type state and stream-level entries 520 within a `global`-type state. It is not compatible with `legacy` state. 521 To get or set the entire connection-level state artifact, use 522 `dump_raw_state` and `import_raw_state` instead. 523 524 Args: 525 stream_name: The name of the stream to get state for. 526 stream_namespace: The source-side stream namespace. This refers to the 527 namespace from the source (e.g., database schema), not any destination 528 namespace override set in connection advanced settings. 529 530 Returns: 531 The stream's state blob as a dictionary, or None if the stream is not found. 532 """ 533 state_data = self.dump_raw_state(normalize=False) 534 result = ConnectionStateResponse(**state_data) 535 536 streams = _get_stream_list(result) 537 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 538 539 if not matching: 540 available = [s.stream_descriptor.name for s in streams] 541 logger.warning( 542 "Stream '%s' not found in connection state for connection '%s'. " 543 "Available streams: %s", 544 stream_name, 545 self.connection_id, 546 available, 547 ) 548 return None 549 550 return matching[0].stream_state 551 552 def set_stream_state( 553 self, 554 stream_name: str, 555 state_blob_dict: dict[str, Any], 556 stream_namespace: str | None = None, 557 ) -> None: 558 """Set the state for a single stream within this connection. 559 560 Fetches the current full state, replaces only the specified stream's state, 561 then sends the full updated state back to the API. If the stream does not 562 exist in the current state, it is appended. 563 564 This is compatible with `stream`-type state and stream-level entries 565 within a `global`-type state. It is not compatible with `legacy` state. 566 To get or set the entire connection-level state artifact, use 567 `dump_raw_state` and `import_raw_state` instead. 568 569 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 570 571 Args: 572 stream_name: The name of the stream to update state for. 573 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 574 stream_namespace: The source-side stream namespace. This refers to the 575 namespace from the source (e.g., database schema), not any destination 576 namespace override set in connection advanced settings. 577 578 Raises: 579 PyAirbyteInputError: If the connection state type is not supported for 580 stream-level operations (not_set, legacy). 581 AirbyteConnectionSyncActiveError: If a sync is currently running on this 582 connection (HTTP 423). Wait for the sync to complete before retrying. 583 """ 584 state_data = self.dump_raw_state(normalize=False) 585 current = ConnectionStateResponse(**state_data) 586 587 if current.state_type == "not_set": 588 raise PyAirbyteInputError( 589 message="Cannot set stream state: connection has no existing state.", 590 context={"connection_id": self.connection_id}, 591 ) 592 593 if current.state_type == "legacy": 594 raise PyAirbyteInputError( 595 message="Cannot set stream state on a legacy-type connection state.", 596 context={"connection_id": self.connection_id}, 597 ) 598 599 new_stream_entry = { 600 "streamDescriptor": { 601 "name": stream_name, 602 **( 603 { 604 "namespace": stream_namespace, 605 } 606 if stream_namespace 607 else {} 608 ), 609 }, 610 "streamState": state_blob_dict, 611 } 612 613 raw_streams: list[dict[str, Any]] 614 if current.state_type == "stream": 615 raw_streams = state_data.get("streamState", []) 616 elif current.state_type == "global": 617 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 618 else: 619 raw_streams = [] 620 621 streams = _get_stream_list(current) 622 found = False 623 updated_streams_raw: list[dict[str, Any]] = [] 624 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 625 if _match_stream(parsed_s, stream_name, stream_namespace): 626 updated_streams_raw.append(new_stream_entry) 627 found = True 628 else: 629 updated_streams_raw.append(raw_s) 630 631 if not found: 632 updated_streams_raw.append(new_stream_entry) 633 634 full_state: dict[str, Any] = { 635 **state_data, 636 } 637 638 if current.state_type == "stream": 639 full_state["streamState"] = updated_streams_raw 640 elif current.state_type == "global": 641 original_global = state_data.get("globalState", {}) 642 full_state["globalState"] = { 643 **original_global, 644 "streamStates": updated_streams_raw, 645 } 646 647 self.import_raw_state(full_state) 648 649 @deprecated("Use 'dump_raw_catalog()' instead.") 650 def get_catalog_artifact(self) -> dict[str, Any] | None: 651 """Get the configured catalog for this connection. 652 653 Returns the full configured catalog (syncCatalog) for this connection, 654 including stream schemas, sync modes, cursor fields, and primary keys. 655 656 Uses the Config API endpoint: POST /v1/web_backend/connections/get 657 658 Returns: 659 Dictionary containing the configured catalog, or `None` if not found. 660 """ 661 return self.dump_raw_catalog() 662 663 def dump_raw_catalog( 664 self, 665 *, 666 normalize: bool = True, 667 ) -> dict[str, Any] | None: 668 """Dump the configured catalog for this connection. 669 670 By default, returns the catalog in Airbyte protocol format 671 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 672 to a connector's `--catalog` flag. 673 674 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 675 Config API (camelCase keys, nested `config` block). This raw format can be 676 passed directly to `import_raw_catalog()` for backup/restore workflows. 677 678 Args: 679 normalize: If `True` (default), convert to Airbyte protocol format. 680 If `False`, return the raw Config API catalog. 681 682 Returns: 683 The configured catalog dict, or `None` if not found. 684 """ 685 connection_response = api_util.get_connection_catalog( 686 connection_id=self.connection_id, 687 api_root=self.workspace.api_root, 688 client_id=self.workspace.client_id, 689 client_secret=self.workspace.client_secret, 690 bearer_token=self.workspace.bearer_token, 691 ) 692 raw = connection_response.get("syncCatalog") 693 if raw is None: 694 return None 695 if normalize: 696 return _normalize_catalog_to_protocol(raw) 697 return raw 698 699 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 700 """Replace the configured catalog for this connection. 701 702 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 703 > could result in broken connections, and/or incorrect sync behavior. 704 705 Accepts a configured catalog dict and replaces the connection's entire 706 catalog with it. All other connection settings remain unchanged. 707 708 Accepts either format: 709 710 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 711 passed through directly. 712 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 713 automatically converted to Config API format before sending. 714 715 Args: 716 catalog: The configured catalog dict in either format. 717 """ 718 if _is_protocol_catalog_format(catalog): 719 catalog = _denormalize_catalog_to_api(catalog) 720 721 api_util.replace_connection_catalog( 722 connection_id=self.connection_id, 723 configured_catalog_dict=catalog, 724 api_root=self.workspace.api_root, 725 client_id=self.workspace.client_id, 726 client_secret=self.workspace.client_secret, 727 bearer_token=self.workspace.bearer_token, 728 ) 729 730 def rename(self, name: str) -> CloudConnection: 731 """Rename the connection. 732 733 Args: 734 name: New name for the connection 735 736 Returns: 737 Updated CloudConnection object with refreshed info 738 """ 739 updated_response = api_util.patch_connection( 740 connection_id=self.connection_id, 741 api_root=self.workspace.api_root, 742 client_id=self.workspace.client_id, 743 client_secret=self.workspace.client_secret, 744 bearer_token=self.workspace.bearer_token, 745 name=name, 746 ) 747 self._connection_info = updated_response 748 return self 749 750 def set_table_prefix(self, prefix: str) -> CloudConnection: 751 """Set the table prefix for the connection. 752 753 Args: 754 prefix: New table prefix to use when syncing to the destination 755 756 Returns: 757 Updated CloudConnection object with refreshed info 758 """ 759 updated_response = api_util.patch_connection( 760 connection_id=self.connection_id, 761 api_root=self.workspace.api_root, 762 client_id=self.workspace.client_id, 763 client_secret=self.workspace.client_secret, 764 bearer_token=self.workspace.bearer_token, 765 prefix=prefix, 766 ) 767 self._connection_info = updated_response 768 return self 769 770 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 771 """Set the selected streams for the connection. 772 773 This is a destructive operation that can break existing connections if the 774 stream selection is changed incorrectly. Use with caution. 775 776 Args: 777 stream_names: List of stream names to sync 778 779 Returns: 780 Updated CloudConnection object with refreshed info 781 """ 782 configurations = api_util.build_stream_configurations(stream_names) 783 784 updated_response = api_util.patch_connection( 785 connection_id=self.connection_id, 786 api_root=self.workspace.api_root, 787 client_id=self.workspace.client_id, 788 client_secret=self.workspace.client_secret, 789 bearer_token=self.workspace.bearer_token, 790 configurations=configurations, 791 ) 792 self._connection_info = updated_response 793 return self 794 795 # Enable/Disable 796 797 @property 798 def enabled(self) -> bool: 799 """Get the current enabled status of the connection. 800 801 This property always fetches fresh data from the API to ensure accuracy, 802 as another process or user may have toggled the setting. 803 804 Returns: 805 True if the connection status is 'active', False otherwise. 806 """ 807 connection_info = self._fetch_connection_info(force_refresh=True) 808 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 809 810 @enabled.setter 811 def enabled(self, value: bool) -> None: 812 """Set the enabled status of the connection. 813 814 Args: 815 value: True to enable (set status to 'active'), False to disable 816 (set status to 'inactive'). 817 """ 818 self.set_enabled(enabled=value) 819 820 def set_enabled( 821 self, 822 *, 823 enabled: bool, 824 ignore_noop: bool = True, 825 ) -> None: 826 """Set the enabled status of the connection. 827 828 Args: 829 enabled: True to enable (set status to 'active'), False to disable 830 (set status to 'inactive'). 831 ignore_noop: If True (default), silently return if the connection is already 832 in the requested state. If False, raise ValueError when the requested 833 state matches the current state. 834 835 Raises: 836 ValueError: If ignore_noop is False and the connection is already in the 837 requested state. 838 """ 839 # Always fetch fresh data to check current status 840 connection_info = self._fetch_connection_info(force_refresh=True) 841 current_status = connection_info.status 842 desired_status = ( 843 api_util.models.ConnectionStatusEnum.ACTIVE 844 if enabled 845 else api_util.models.ConnectionStatusEnum.INACTIVE 846 ) 847 848 if current_status == desired_status: 849 if ignore_noop: 850 return 851 raise ValueError( 852 f"Connection is already {'enabled' if enabled else 'disabled'}. " 853 f"Current status: {current_status}" 854 ) 855 856 updated_response = api_util.patch_connection( 857 connection_id=self.connection_id, 858 api_root=self.workspace.api_root, 859 client_id=self.workspace.client_id, 860 client_secret=self.workspace.client_secret, 861 bearer_token=self.workspace.bearer_token, 862 status=desired_status, 863 ) 864 self._connection_info = updated_response 865 866 # Scheduling 867 868 def set_schedule( 869 self, 870 cron_expression: str, 871 ) -> None: 872 """Set a cron schedule for the connection. 873 874 Args: 875 cron_expression: A cron expression defining when syncs should run. 876 877 Examples: 878 - "0 0 * * *" - Daily at midnight UTC 879 - "0 */6 * * *" - Every 6 hours 880 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 881 """ 882 schedule = api_util.models.AirbyteAPIConnectionSchedule( 883 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 884 cron_expression=cron_expression, 885 ) 886 updated_response = api_util.patch_connection( 887 connection_id=self.connection_id, 888 api_root=self.workspace.api_root, 889 client_id=self.workspace.client_id, 890 client_secret=self.workspace.client_secret, 891 bearer_token=self.workspace.bearer_token, 892 schedule=schedule, 893 ) 894 self._connection_info = updated_response 895 896 def set_manual_schedule(self) -> None: 897 """Set the connection to manual scheduling. 898 899 Disables automatic syncs. Syncs will only run when manually triggered. 900 """ 901 schedule = api_util.models.AirbyteAPIConnectionSchedule( 902 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 903 ) 904 updated_response = api_util.patch_connection( 905 connection_id=self.connection_id, 906 api_root=self.workspace.api_root, 907 client_id=self.workspace.client_id, 908 client_secret=self.workspace.client_secret, 909 bearer_token=self.workspace.bearer_token, 910 schedule=schedule, 911 ) 912 self._connection_info = updated_response 913 914 # Deletions 915 916 def permanently_delete( 917 self, 918 *, 919 cascade_delete_source: bool = False, 920 cascade_delete_destination: bool = False, 921 ) -> None: 922 """Delete the connection. 923 924 Args: 925 cascade_delete_source: Whether to also delete the source. 926 cascade_delete_destination: Whether to also delete the destination. 927 """ 928 self.workspace.permanently_delete_connection(self) 929 930 if cascade_delete_source: 931 self.workspace.permanently_delete_source(self.source_id) 932 933 if cascade_delete_destination: 934 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
46 def __init__( 47 self, 48 workspace: CloudWorkspace, 49 connection_id: str, 50 source: str | None = None, 51 destination: str | None = None, 52 ) -> None: 53 """It is not recommended to create a `CloudConnection` object directly. 54 55 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 56 """ 57 self.connection_id = connection_id 58 """The ID of the connection.""" 59 60 self.workspace = workspace 61 """The workspace that the connection belongs to.""" 62 63 self._source_id = source 64 """The ID of the source.""" 65 66 self._destination_id = destination 67 """The ID of the destination.""" 68 69 self._connection_info: ConnectionResponse | None = None 70 """The connection info object. (Cached.)""" 71 72 self._cloud_source_object: CloudSource | None = None 73 """The source object. (Cached.)""" 74 75 self._cloud_destination_object: CloudDestination | None = None 76 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
149 def check_is_valid(self) -> bool: 150 """Check if this connection exists and belongs to the expected workspace. 151 152 This method fetches connection info from the API (if not already cached) and 153 verifies that the connection's workspace_id matches the workspace associated 154 with this CloudConnection object. 155 156 Returns: 157 True if the connection exists and belongs to the expected workspace. 158 159 Raises: 160 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 161 AirbyteMissingResourceError: If the connection doesn't exist. 162 """ 163 self._fetch_connection_info(force_refresh=False, verify=True) 164 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
184 @property 185 def name(self) -> str | None: 186 """Get the display name of the connection, if available. 187 188 E.g. "My Postgres to Snowflake", not the connection ID. 189 """ 190 if not self._connection_info: 191 self._connection_info = self._fetch_connection_info() 192 193 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
195 @property 196 def source_id(self) -> str: 197 """The ID of the source.""" 198 if not self._source_id: 199 if not self._connection_info: 200 self._connection_info = self._fetch_connection_info() 201 202 self._source_id = self._connection_info.source_id 203 204 return self._source_id
The ID of the source.
206 @property 207 def source(self) -> CloudSource: 208 """Get the source object.""" 209 if self._cloud_source_object: 210 return self._cloud_source_object 211 212 self._cloud_source_object = CloudSource( 213 workspace=self.workspace, 214 connector_id=self.source_id, 215 ) 216 return self._cloud_source_object
Get the source object.
218 @property 219 def destination_id(self) -> str: 220 """The ID of the destination.""" 221 if not self._destination_id: 222 if not self._connection_info: 223 self._connection_info = self._fetch_connection_info() 224 225 self._destination_id = self._connection_info.destination_id 226 227 return self._destination_id
The ID of the destination.
229 @property 230 def destination(self) -> CloudDestination: 231 """Get the destination object.""" 232 if self._cloud_destination_object: 233 return self._cloud_destination_object 234 235 self._cloud_destination_object = CloudDestination( 236 workspace=self.workspace, 237 connector_id=self.destination_id, 238 ) 239 return self._cloud_destination_object
Get the destination object.
241 @property 242 def stream_names(self) -> list[str]: 243 """The stream names.""" 244 if not self._connection_info: 245 self._connection_info = self._fetch_connection_info() 246 247 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
249 @property 250 def table_prefix(self) -> str: 251 """The table prefix.""" 252 if not self._connection_info: 253 self._connection_info = self._fetch_connection_info() 254 255 return self._connection_info.prefix or ""
The table prefix.
257 @property 258 def connection_url(self) -> str | None: 259 """The web URL to the connection.""" 260 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
262 @property 263 def job_history_url(self) -> str | None: 264 """The URL to the job history for the connection.""" 265 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
269 def run_sync( 270 self, 271 *, 272 wait: bool = True, 273 wait_timeout: int = 300, 274 ) -> SyncResult: 275 """Run a sync.""" 276 connection_response = api_util.run_connection( 277 connection_id=self.connection_id, 278 api_root=self.workspace.api_root, 279 workspace_id=self.workspace.workspace_id, 280 client_id=self.workspace.client_id, 281 client_secret=self.workspace.client_secret, 282 bearer_token=self.workspace.bearer_token, 283 ) 284 sync_result = SyncResult( 285 workspace=self.workspace, 286 connection=self, 287 job_id=connection_response.job_id, 288 ) 289 290 if wait: 291 sync_result.wait_for_completion( 292 wait_timeout=wait_timeout, 293 raise_failure=True, 294 raise_timeout=True, 295 ) 296 297 return sync_result
Run a sync.
308 def get_previous_sync_logs( 309 self, 310 *, 311 limit: int = 20, 312 offset: int | None = None, 313 from_tail: bool = True, 314 job_type: JobTypeEnum | None = None, 315 ) -> list[SyncResult]: 316 """Get previous sync jobs for a connection with pagination support. 317 318 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 319 rows_synced, start_time). Full log text can be fetched lazily via 320 `SyncResult.get_full_log_text()`. 321 322 Args: 323 limit: Maximum number of jobs to return. Defaults to 20. 324 offset: Number of jobs to skip from the beginning. Defaults to None (0). 325 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 326 If False, returns jobs ordered oldest-first (createdAt ASC). 327 Defaults to True. 328 job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). 329 If not specified, defaults to sync and reset jobs only (API default behavior). 330 331 Returns: 332 A list of SyncResult objects representing the sync jobs. 333 """ 334 order_by = ( 335 api_util.JOB_ORDER_BY_CREATED_AT_DESC 336 if from_tail 337 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 338 ) 339 sync_logs: list[JobResponse] = api_util.get_job_logs( 340 connection_id=self.connection_id, 341 api_root=self.workspace.api_root, 342 workspace_id=self.workspace.workspace_id, 343 limit=limit, 344 offset=offset, 345 order_by=order_by, 346 job_type=job_type, 347 client_id=self.workspace.client_id, 348 client_secret=self.workspace.client_secret, 349 bearer_token=self.workspace.bearer_token, 350 ) 351 return [ 352 SyncResult( 353 workspace=self.workspace, 354 connection=self, 355 job_id=sync_log.job_id, 356 _latest_job_info=sync_log, 357 ) 358 for sync_log in sync_logs 359 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
- job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:
A list of SyncResult objects representing the sync jobs.
361 def get_sync_result( 362 self, 363 job_id: int | None = None, 364 ) -> SyncResult | None: 365 """Get the sync result for the connection. 366 367 If `job_id` is not provided, the most recent sync job will be used. 368 369 Returns `None` if job_id is omitted and no previous jobs are found. 370 """ 371 if job_id is None: 372 # Get the most recent sync job 373 results = self.get_previous_sync_logs( 374 limit=1, 375 ) 376 if results: 377 return results[0] 378 379 return None 380 381 # Get the sync job by ID (lazy loaded) 382 return SyncResult( 383 workspace=self.workspace, 384 connection=self, 385 job_id=job_id, 386 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
390 @deprecated("Use 'dump_raw_state()' instead.") 391 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 392 """Deprecated. Use `dump_raw_state()` instead.""" 393 state_response = api_util.get_connection_state( 394 connection_id=self.connection_id, 395 api_root=self.workspace.api_root, 396 client_id=self.workspace.client_id, 397 client_secret=self.workspace.client_secret, 398 bearer_token=self.workspace.bearer_token, 399 ) 400 if state_response.get("stateType") == "not_set": 401 return None 402 return state_response.get("streamState", [])
Deprecated. Use dump_raw_state() instead.
410 def dump_raw_state( 411 self, 412 *, 413 normalize: bool = True, 414 ) -> dict[str, Any] | list[dict[str, Any]]: 415 """Dump the state for this connection. 416 417 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 418 with snake_case keys, suitable for passing to a connector's `--state` flag. 419 420 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 421 includes `stateType` and `connectionId`). This raw format can be passed 422 directly to `import_raw_state()` for backup/restore workflows. 423 424 Args: 425 normalize: If `True` (default), convert to Airbyte protocol format. 426 If `False`, return the raw Config API response. 427 428 Returns: 429 Normalized: list of protocol-format state message dicts (empty list if 430 no state). Raw: the full Config API state dict. 431 """ 432 raw = api_util.get_connection_state( 433 connection_id=self.connection_id, 434 api_root=self.workspace.api_root, 435 client_id=self.workspace.client_id, 436 client_secret=self.workspace.client_secret, 437 bearer_token=self.workspace.bearer_token, 438 ) 439 if normalize: 440 return _normalize_state_to_protocol(raw) 441 return raw
Dump the state for this connection.
By default, returns a list of Airbyte protocol AirbyteStateMessage dicts
with snake_case keys, suitable for passing to a connector's --state flag.
When normalize is False, returns the raw Config API dict (camelCase keys,
includes stateType and connectionId). This raw format can be passed
directly to import_raw_state() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API response.
Returns:
Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.
443 def import_raw_state( 444 self, 445 connection_state: dict[str, Any] | list[dict[str, Any]], 446 ) -> dict[str, Any]: 447 """Import (restore) the full state for this connection. 448 449 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 450 > could result in broken connections, and/or incorrect sync behavior. 451 452 Replaces the entire connection state with the provided state blob. 453 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 454 455 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 456 The `connectionId` in the blob is always overridden with this connection's 457 ID, making state blobs portable across connections. 458 459 Accepts either format: 460 461 - **Config API format** (dict with `stateType`): passed through directly. 462 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 463 converted to Config API format before sending. 464 465 Args: 466 connection_state: Connection state in either Config API or Airbyte protocol format. 467 468 Returns: 469 The updated connection state as a dictionary. 470 471 Raises: 472 AirbyteConnectionSyncActiveError: If a sync is currently running on this 473 connection (HTTP 423). Wait for the sync to complete before retrying. 474 """ 475 api_state: dict[str, Any] 476 if isinstance(connection_state, list): 477 if not _is_protocol_state_format(connection_state): 478 msg = ( 479 "Expected connection_state list to contain Airbyte protocol state " 480 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 481 "or LEGACY). Got a list that does not match protocol format." 482 ) 483 raise ValueError(msg) 484 api_state = _denormalize_protocol_state_to_api( 485 protocol_messages=connection_state, 486 connection_id=self.connection_id, 487 ) 488 elif isinstance(connection_state, dict): 489 if _is_protocol_state_format(connection_state): 490 api_state = _denormalize_protocol_state_to_api( 491 protocol_messages=[connection_state], 492 connection_id=self.connection_id, 493 ) 494 else: 495 api_state = connection_state 496 else: 497 msg = f"Expected a dict or list, got {type(connection_state)}" 498 raise TypeError(msg) 499 500 return api_util.replace_connection_state( 501 connection_id=self.connection_id, 502 connection_state_dict=api_state, 503 api_root=self.workspace.api_root, 504 client_id=self.workspace.client_id, 505 client_secret=self.workspace.client_secret, 506 bearer_token=self.workspace.bearer_token, 507 )
Import (restore) the full state for this connection.
⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
This is the counterpart to dump_raw_state() for backup/restore workflows.
The connectionId in the blob is always overridden with this connection's
ID, making state blobs portable across connections.
Accepts either format:
- Config API format (dict with
stateType): passed through directly. - Airbyte protocol format (list of
AirbyteStateMessagedicts): automatically converted to Config API format before sending.
Arguments:
- connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:
The updated connection state as a dictionary.
Raises:
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
509 def get_stream_state( 510 self, 511 stream_name: str, 512 stream_namespace: str | None = None, 513 ) -> dict[str, Any] | None: 514 """Get the state blob for a single stream within this connection. 515 516 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 517 not the full connection state envelope. 518 519 This is compatible with `stream`-type state and stream-level entries 520 within a `global`-type state. It is not compatible with `legacy` state. 521 To get or set the entire connection-level state artifact, use 522 `dump_raw_state` and `import_raw_state` instead. 523 524 Args: 525 stream_name: The name of the stream to get state for. 526 stream_namespace: The source-side stream namespace. This refers to the 527 namespace from the source (e.g., database schema), not any destination 528 namespace override set in connection advanced settings. 529 530 Returns: 531 The stream's state blob as a dictionary, or None if the stream is not found. 532 """ 533 state_data = self.dump_raw_state(normalize=False) 534 result = ConnectionStateResponse(**state_data) 535 536 streams = _get_stream_list(result) 537 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 538 539 if not matching: 540 available = [s.stream_descriptor.name for s in streams] 541 logger.warning( 542 "Stream '%s' not found in connection state for connection '%s'. " 543 "Available streams: %s", 544 stream_name, 545 self.connection_id, 546 available, 547 ) 548 return None 549 550 return matching[0].stream_state
Get the state blob for a single stream within this connection.
Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Arguments:
- stream_name: The name of the stream to get state for.
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:
The stream's state blob as a dictionary, or None if the stream is not found.
552 def set_stream_state( 553 self, 554 stream_name: str, 555 state_blob_dict: dict[str, Any], 556 stream_namespace: str | None = None, 557 ) -> None: 558 """Set the state for a single stream within this connection. 559 560 Fetches the current full state, replaces only the specified stream's state, 561 then sends the full updated state back to the API. If the stream does not 562 exist in the current state, it is appended. 563 564 This is compatible with `stream`-type state and stream-level entries 565 within a `global`-type state. It is not compatible with `legacy` state. 566 To get or set the entire connection-level state artifact, use 567 `dump_raw_state` and `import_raw_state` instead. 568 569 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 570 571 Args: 572 stream_name: The name of the stream to update state for. 573 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 574 stream_namespace: The source-side stream namespace. This refers to the 575 namespace from the source (e.g., database schema), not any destination 576 namespace override set in connection advanced settings. 577 578 Raises: 579 PyAirbyteInputError: If the connection state type is not supported for 580 stream-level operations (not_set, legacy). 581 AirbyteConnectionSyncActiveError: If a sync is currently running on this 582 connection (HTTP 423). Wait for the sync to complete before retrying. 583 """ 584 state_data = self.dump_raw_state(normalize=False) 585 current = ConnectionStateResponse(**state_data) 586 587 if current.state_type == "not_set": 588 raise PyAirbyteInputError( 589 message="Cannot set stream state: connection has no existing state.", 590 context={"connection_id": self.connection_id}, 591 ) 592 593 if current.state_type == "legacy": 594 raise PyAirbyteInputError( 595 message="Cannot set stream state on a legacy-type connection state.", 596 context={"connection_id": self.connection_id}, 597 ) 598 599 new_stream_entry = { 600 "streamDescriptor": { 601 "name": stream_name, 602 **( 603 { 604 "namespace": stream_namespace, 605 } 606 if stream_namespace 607 else {} 608 ), 609 }, 610 "streamState": state_blob_dict, 611 } 612 613 raw_streams: list[dict[str, Any]] 614 if current.state_type == "stream": 615 raw_streams = state_data.get("streamState", []) 616 elif current.state_type == "global": 617 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 618 else: 619 raw_streams = [] 620 621 streams = _get_stream_list(current) 622 found = False 623 updated_streams_raw: list[dict[str, Any]] = [] 624 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 625 if _match_stream(parsed_s, stream_name, stream_namespace): 626 updated_streams_raw.append(new_stream_entry) 627 found = True 628 else: 629 updated_streams_raw.append(raw_s) 630 631 if not found: 632 updated_streams_raw.append(new_stream_entry) 633 634 full_state: dict[str, Any] = { 635 **state_data, 636 } 637 638 if current.state_type == "stream": 639 full_state["streamState"] = updated_streams_raw 640 elif current.state_type == "global": 641 original_global = state_data.get("globalState", {}) 642 full_state["globalState"] = { 643 **original_global, 644 "streamStates": updated_streams_raw, 645 } 646 647 self.import_raw_state(full_state)
Set the state for a single stream within this connection.
Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Uses the safe variant that prevents updates while a sync is running (HTTP 423).
Arguments:
- stream_name: The name of the stream to update state for.
- state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
- PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
649 @deprecated("Use 'dump_raw_catalog()' instead.") 650 def get_catalog_artifact(self) -> dict[str, Any] | None: 651 """Get the configured catalog for this connection. 652 653 Returns the full configured catalog (syncCatalog) for this connection, 654 including stream schemas, sync modes, cursor fields, and primary keys. 655 656 Uses the Config API endpoint: POST /v1/web_backend/connections/get 657 658 Returns: 659 Dictionary containing the configured catalog, or `None` if not found. 660 """ 661 return self.dump_raw_catalog()
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
663 def dump_raw_catalog( 664 self, 665 *, 666 normalize: bool = True, 667 ) -> dict[str, Any] | None: 668 """Dump the configured catalog for this connection. 669 670 By default, returns the catalog in Airbyte protocol format 671 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 672 to a connector's `--catalog` flag. 673 674 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 675 Config API (camelCase keys, nested `config` block). This raw format can be 676 passed directly to `import_raw_catalog()` for backup/restore workflows. 677 678 Args: 679 normalize: If `True` (default), convert to Airbyte protocol format. 680 If `False`, return the raw Config API catalog. 681 682 Returns: 683 The configured catalog dict, or `None` if not found. 684 """ 685 connection_response = api_util.get_connection_catalog( 686 connection_id=self.connection_id, 687 api_root=self.workspace.api_root, 688 client_id=self.workspace.client_id, 689 client_secret=self.workspace.client_secret, 690 bearer_token=self.workspace.bearer_token, 691 ) 692 raw = connection_response.get("syncCatalog") 693 if raw is None: 694 return None 695 if normalize: 696 return _normalize_catalog_to_protocol(raw) 697 return raw
Dump the configured catalog for this connection.
By default, returns the catalog in Airbyte protocol format
(ConfiguredAirbyteCatalog with snake_case keys), suitable for passing
to a connector's --catalog flag.
When normalize is False, returns the raw syncCatalog dict from the
Config API (camelCase keys, nested config block). This raw format can be
passed directly to import_raw_catalog() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API catalog.
Returns:
The configured catalog dict, or
Noneif not found.
699 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 700 """Replace the configured catalog for this connection. 701 702 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 703 > could result in broken connections, and/or incorrect sync behavior. 704 705 Accepts a configured catalog dict and replaces the connection's entire 706 catalog with it. All other connection settings remain unchanged. 707 708 Accepts either format: 709 710 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 711 passed through directly. 712 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 713 automatically converted to Config API format before sending. 714 715 Args: 716 catalog: The configured catalog dict in either format. 717 """ 718 if _is_protocol_catalog_format(catalog): 719 catalog = _denormalize_catalog_to_api(catalog) 720 721 api_util.replace_connection_catalog( 722 connection_id=self.connection_id, 723 configured_catalog_dict=catalog, 724 api_root=self.workspace.api_root, 725 client_id=self.workspace.client_id, 726 client_secret=self.workspace.client_secret, 727 bearer_token=self.workspace.bearer_token, 728 )
Replace the configured catalog for this connection.
⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.
Accepts either format:
- Config API format (
syncCatalogwith camelCase keys and nestedconfig): passed through directly. - Airbyte protocol format (
ConfiguredAirbyteCatalogwith snake_case keys): automatically converted to Config API format before sending.
Arguments:
- catalog: The configured catalog dict in either format.
730 def rename(self, name: str) -> CloudConnection: 731 """Rename the connection. 732 733 Args: 734 name: New name for the connection 735 736 Returns: 737 Updated CloudConnection object with refreshed info 738 """ 739 updated_response = api_util.patch_connection( 740 connection_id=self.connection_id, 741 api_root=self.workspace.api_root, 742 client_id=self.workspace.client_id, 743 client_secret=self.workspace.client_secret, 744 bearer_token=self.workspace.bearer_token, 745 name=name, 746 ) 747 self._connection_info = updated_response 748 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
750 def set_table_prefix(self, prefix: str) -> CloudConnection: 751 """Set the table prefix for the connection. 752 753 Args: 754 prefix: New table prefix to use when syncing to the destination 755 756 Returns: 757 Updated CloudConnection object with refreshed info 758 """ 759 updated_response = api_util.patch_connection( 760 connection_id=self.connection_id, 761 api_root=self.workspace.api_root, 762 client_id=self.workspace.client_id, 763 client_secret=self.workspace.client_secret, 764 bearer_token=self.workspace.bearer_token, 765 prefix=prefix, 766 ) 767 self._connection_info = updated_response 768 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
770 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 771 """Set the selected streams for the connection. 772 773 This is a destructive operation that can break existing connections if the 774 stream selection is changed incorrectly. Use with caution. 775 776 Args: 777 stream_names: List of stream names to sync 778 779 Returns: 780 Updated CloudConnection object with refreshed info 781 """ 782 configurations = api_util.build_stream_configurations(stream_names) 783 784 updated_response = api_util.patch_connection( 785 connection_id=self.connection_id, 786 api_root=self.workspace.api_root, 787 client_id=self.workspace.client_id, 788 client_secret=self.workspace.client_secret, 789 bearer_token=self.workspace.bearer_token, 790 configurations=configurations, 791 ) 792 self._connection_info = updated_response 793 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
797 @property 798 def enabled(self) -> bool: 799 """Get the current enabled status of the connection. 800 801 This property always fetches fresh data from the API to ensure accuracy, 802 as another process or user may have toggled the setting. 803 804 Returns: 805 True if the connection status is 'active', False otherwise. 806 """ 807 connection_info = self._fetch_connection_info(force_refresh=True) 808 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
820 def set_enabled( 821 self, 822 *, 823 enabled: bool, 824 ignore_noop: bool = True, 825 ) -> None: 826 """Set the enabled status of the connection. 827 828 Args: 829 enabled: True to enable (set status to 'active'), False to disable 830 (set status to 'inactive'). 831 ignore_noop: If True (default), silently return if the connection is already 832 in the requested state. If False, raise ValueError when the requested 833 state matches the current state. 834 835 Raises: 836 ValueError: If ignore_noop is False and the connection is already in the 837 requested state. 838 """ 839 # Always fetch fresh data to check current status 840 connection_info = self._fetch_connection_info(force_refresh=True) 841 current_status = connection_info.status 842 desired_status = ( 843 api_util.models.ConnectionStatusEnum.ACTIVE 844 if enabled 845 else api_util.models.ConnectionStatusEnum.INACTIVE 846 ) 847 848 if current_status == desired_status: 849 if ignore_noop: 850 return 851 raise ValueError( 852 f"Connection is already {'enabled' if enabled else 'disabled'}. " 853 f"Current status: {current_status}" 854 ) 855 856 updated_response = api_util.patch_connection( 857 connection_id=self.connection_id, 858 api_root=self.workspace.api_root, 859 client_id=self.workspace.client_id, 860 client_secret=self.workspace.client_secret, 861 bearer_token=self.workspace.bearer_token, 862 status=desired_status, 863 ) 864 self._connection_info = updated_response
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
868 def set_schedule( 869 self, 870 cron_expression: str, 871 ) -> None: 872 """Set a cron schedule for the connection. 873 874 Args: 875 cron_expression: A cron expression defining when syncs should run. 876 877 Examples: 878 - "0 0 * * *" - Daily at midnight UTC 879 - "0 */6 * * *" - Every 6 hours 880 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 881 """ 882 schedule = api_util.models.AirbyteAPIConnectionSchedule( 883 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 884 cron_expression=cron_expression, 885 ) 886 updated_response = api_util.patch_connection( 887 connection_id=self.connection_id, 888 api_root=self.workspace.api_root, 889 client_id=self.workspace.client_id, 890 client_secret=self.workspace.client_secret, 891 bearer_token=self.workspace.bearer_token, 892 schedule=schedule, 893 ) 894 self._connection_info = updated_response
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
896 def set_manual_schedule(self) -> None: 897 """Set the connection to manual scheduling. 898 899 Disables automatic syncs. Syncs will only run when manually triggered. 900 """ 901 schedule = api_util.models.AirbyteAPIConnectionSchedule( 902 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 903 ) 904 updated_response = api_util.patch_connection( 905 connection_id=self.connection_id, 906 api_root=self.workspace.api_root, 907 client_id=self.workspace.client_id, 908 client_secret=self.workspace.client_secret, 909 bearer_token=self.workspace.bearer_token, 910 schedule=schedule, 911 ) 912 self._connection_info = updated_response
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
916 def permanently_delete( 917 self, 918 *, 919 cascade_delete_source: bool = False, 920 cascade_delete_destination: bool = False, 921 ) -> None: 922 """Delete the connection. 923 924 Args: 925 cascade_delete_source: Whether to also delete the source. 926 cascade_delete_destination: Whether to also delete the destination. 927 """ 928 self.workspace.permanently_delete_connection(self) 929 930 if cascade_delete_source: 931 self.workspace.permanently_delete_source(self.source_id) 932 933 if cascade_delete_destination: 934 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.