airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6import logging 7from typing import TYPE_CHECKING, Any, Literal, overload 8 9from typing_extensions import deprecated 10 11from airbyte._util import api_util 12from airbyte.cloud._connection_catalog import ( 13 _denormalize_catalog_to_api, 14 _is_protocol_catalog_format, 15 _normalize_catalog_to_protocol, 16) 17from airbyte.cloud._connection_state import ( 18 ConnectionStateResponse, 19 _denormalize_protocol_state_to_api, 20 _get_stream_list, 21 _is_protocol_state_format, 22 _match_stream, 23 _normalize_state_to_protocol, 24) 25from airbyte.cloud.connectors import CloudDestination, CloudSource 26from airbyte.cloud.models import ( 27 CloudConnectionInfo, 28 CloudJobInfo, 29 JobTypeEnum, 30 _ConnectionResponseLike, 31) 32from airbyte.cloud.sync_results import SyncResult 33from airbyte.exceptions import AirbyteWorkspaceMismatchError, PyAirbyteInputError 34 35 36logger = logging.getLogger(__name__) 37 38 39if TYPE_CHECKING: 40 from airbyte.cloud.workspaces import CloudWorkspace 41 42 43class CloudConnection: # noqa: PLR0904 # Too many public methods 44 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 45 46 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 47 """ 48 49 def __init__( 50 self, 51 workspace: CloudWorkspace, 52 connection_id: str, 53 source: str | None = None, 54 destination: str | None = None, 55 ) -> None: 56 """It is not recommended to create a `CloudConnection` object directly. 57 58 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 59 """ 60 self.connection_id = connection_id 61 """The ID of the connection.""" 62 63 self.workspace = workspace 64 """The workspace that the connection belongs to.""" 65 66 self._source_id = source 67 """The ID of the source.""" 68 69 self._destination_id = destination 70 """The ID of the destination.""" 71 72 self._connection_info: CloudConnectionInfo | None = None 73 """The connection info object. (Cached.)""" 74 75 self._cloud_source_object: CloudSource | None = None 76 """The source object. (Cached.)""" 77 78 self._cloud_destination_object: CloudDestination | None = None 79 """The destination object. (Cached.)""" 80 81 def _fetch_connection_info( 82 self, 83 *, 84 force_refresh: bool = False, 85 verify: bool = True, 86 ) -> CloudConnectionInfo: 87 """Fetch and cache connection info from the API. 88 89 By default, this method will only fetch from the API if connection info is not 90 already cached. It also verifies that the connection belongs to the expected 91 workspace unless verification is explicitly disabled. 92 93 Args: 94 force_refresh: If True, always fetch from the API even if cached. 95 If False (default), only fetch if not already cached. 96 verify: If True (default), verify that the connection is valid (e.g., that 97 the workspace_id matches this object's workspace). Raises an error if 98 validation fails. 99 100 Returns: 101 Information about the connection from the API. 102 103 Raises: 104 AirbyteWorkspaceMismatchError: If verify is True and the connection's 105 workspace_id doesn't match the expected workspace. 106 AirbyteMissingResourceError: If the connection doesn't exist. 107 """ 108 if not force_refresh and self._connection_info is not None: 109 # Use cached info, but still verify if requested 110 if verify: 111 self._verify_workspace_match(self._connection_info) 112 return self._connection_info 113 114 # Fetch from API 115 connection_info = api_util.get_connection( 116 workspace_id=self.workspace.workspace_id, 117 connection_id=self.connection_id, 118 api_root=self.workspace.api_root, 119 client_id=self.workspace.client_id, 120 client_secret=self.workspace.client_secret, 121 bearer_token=self.workspace.bearer_token, 122 ) 123 result = CloudConnectionInfo.from_api_response(connection_info) 124 125 self._connection_info = result 126 127 # Verify if requested 128 if verify: 129 self._verify_workspace_match(result) 130 131 return result 132 133 def _verify_workspace_match(self, connection_info: CloudConnectionInfo) -> None: 134 """Verify that the connection belongs to the expected workspace. 135 136 Raises: 137 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 138 """ 139 if connection_info.workspace_id != self.workspace.workspace_id: 140 raise AirbyteWorkspaceMismatchError( 141 resource_type="connection", 142 resource_id=self.connection_id, 143 workspace=self.workspace, 144 expected_workspace_id=self.workspace.workspace_id, 145 actual_workspace_id=connection_info.workspace_id, 146 message=( 147 f"Connection '{self.connection_id}' belongs to workspace " 148 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 149 ), 150 ) 151 152 def check_is_valid(self) -> bool: 153 """Check if this connection exists and belongs to the expected workspace. 154 155 This method fetches connection info from the API (if not already cached) and 156 verifies that the connection's workspace_id matches the workspace associated 157 with this CloudConnection object. 158 159 Returns: 160 True if the connection exists and belongs to the expected workspace. 161 162 Raises: 163 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 164 AirbyteMissingResourceError: If the connection doesn't exist. 165 """ 166 self._fetch_connection_info(force_refresh=False, verify=True) 167 return True 168 169 @classmethod 170 def _from_connection_response( 171 cls, 172 workspace: CloudWorkspace, 173 connection_response: _ConnectionResponseLike, 174 ) -> CloudConnection: 175 """Create a CloudConnection from an API connection response.""" 176 connection_info = CloudConnectionInfo.from_api_response(connection_response) 177 result = cls( 178 workspace=workspace, 179 connection_id=connection_info.connection_id, 180 source=connection_info.source_id, 181 destination=connection_info.destination_id, 182 ) 183 result._connection_info = connection_info # noqa: SLF001 # Accessing Non-Public API 184 return result 185 186 # Properties 187 188 @property 189 def name(self) -> str | None: 190 """Get the display name of the connection, if available. 191 192 E.g. "My Postgres to Snowflake", not the connection ID. 193 """ 194 if not self._connection_info: 195 self._connection_info = self._fetch_connection_info() 196 197 return self._connection_info.name 198 199 @property 200 def source_id(self) -> str: 201 """The ID of the source.""" 202 if not self._source_id: 203 if not self._connection_info: 204 self._connection_info = self._fetch_connection_info() 205 206 self._source_id = self._connection_info.source_id 207 208 return self._source_id 209 210 @property 211 def source(self) -> CloudSource: 212 """Get the source object.""" 213 if self._cloud_source_object: 214 return self._cloud_source_object 215 216 self._cloud_source_object = CloudSource( 217 workspace=self.workspace, 218 connector_id=self.source_id, 219 ) 220 return self._cloud_source_object 221 222 @property 223 def destination_id(self) -> str: 224 """The ID of the destination.""" 225 if not self._destination_id: 226 if not self._connection_info: 227 self._connection_info = self._fetch_connection_info() 228 229 self._destination_id = self._connection_info.destination_id 230 231 return self._destination_id 232 233 @property 234 def destination(self) -> CloudDestination: 235 """Get the destination object.""" 236 if self._cloud_destination_object: 237 return self._cloud_destination_object 238 239 self._cloud_destination_object = CloudDestination( 240 workspace=self.workspace, 241 connector_id=self.destination_id, 242 ) 243 return self._cloud_destination_object 244 245 @property 246 def stream_names(self) -> list[str]: 247 """The stream names.""" 248 if not self._connection_info: 249 self._connection_info = self._fetch_connection_info() 250 251 return [stream.name for stream in self._connection_info.configurations.streams or []] 252 253 @property 254 def table_prefix(self) -> str: 255 """The table prefix.""" 256 if not self._connection_info: 257 self._connection_info = self._fetch_connection_info() 258 259 return self._connection_info.prefix or "" 260 261 @property 262 def connection_url(self) -> str | None: 263 """The web URL to the connection.""" 264 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 265 266 @property 267 def job_history_url(self) -> str | None: 268 """The URL to the job history for the connection.""" 269 return f"{self.connection_url}/timeline" 270 271 # Run Sync 272 273 def run_sync( 274 self, 275 *, 276 wait: bool = True, 277 wait_timeout: int = 300, 278 ) -> SyncResult: 279 """Run a sync.""" 280 connection_response = api_util.run_connection( 281 connection_id=self.connection_id, 282 api_root=self.workspace.api_root, 283 workspace_id=self.workspace.workspace_id, 284 client_id=self.workspace.client_id, 285 client_secret=self.workspace.client_secret, 286 bearer_token=self.workspace.bearer_token, 287 ) 288 sync_result = SyncResult( 289 workspace=self.workspace, 290 connection=self, 291 job_id=connection_response.job_id, 292 ) 293 294 if wait: 295 sync_result.wait_for_completion( 296 wait_timeout=wait_timeout, 297 raise_failure=True, 298 raise_timeout=True, 299 ) 300 301 return sync_result 302 303 def __repr__(self) -> str: 304 """String representation of the connection.""" 305 return ( 306 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 307 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 308 ) 309 310 # Logs 311 312 def get_previous_sync_logs( 313 self, 314 *, 315 limit: int = 20, 316 offset: int | None = None, 317 from_tail: bool = True, 318 job_type: str | JobTypeEnum | None = None, 319 ) -> list[SyncResult]: 320 """Get previous sync jobs for a connection with pagination support. 321 322 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 323 rows_synced, start_time). Full log text can be fetched lazily via 324 `SyncResult.get_full_log_text()`. 325 326 Args: 327 limit: Maximum number of jobs to return. Defaults to 20. 328 offset: Number of jobs to skip from the beginning. Defaults to None (0). 329 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 330 If False, returns jobs ordered oldest-first (createdAt ASC). 331 Defaults to True. 332 job_type: Filter by job type (e.g., `sync`, `refresh`). 333 If not specified, defaults to sync and reset jobs only (API default behavior). 334 335 Returns: 336 A list of SyncResult objects representing the sync jobs. 337 """ 338 order_by = ( 339 api_util.JOB_ORDER_BY_CREATED_AT_DESC 340 if from_tail 341 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 342 ) 343 sync_logs = api_util.get_job_logs( 344 connection_id=self.connection_id, 345 api_root=self.workspace.api_root, 346 workspace_id=self.workspace.workspace_id, 347 limit=limit, 348 offset=offset, 349 order_by=order_by, 350 job_type=job_type, 351 client_id=self.workspace.client_id, 352 client_secret=self.workspace.client_secret, 353 bearer_token=self.workspace.bearer_token, 354 ) 355 return [ 356 SyncResult( 357 workspace=self.workspace, 358 connection=self, 359 job_id=sync_log.job_id, 360 _latest_job_info=CloudJobInfo.from_api_response(sync_log), 361 ) 362 for sync_log in sync_logs 363 ] 364 365 def get_sync_result( 366 self, 367 job_id: int | None = None, 368 ) -> SyncResult | None: 369 """Get the sync result for the connection. 370 371 If `job_id` is not provided, the most recent sync job will be used. 372 373 Returns `None` if job_id is omitted and no previous jobs are found. 374 """ 375 if job_id is None: 376 # Get the most recent sync job 377 results = self.get_previous_sync_logs( 378 limit=1, 379 ) 380 if results: 381 return results[0] 382 383 return None 384 385 # Get the sync job by ID (lazy loaded) 386 return SyncResult( 387 workspace=self.workspace, 388 connection=self, 389 job_id=job_id, 390 ) 391 392 # Artifacts 393 394 @deprecated("Use 'dump_raw_state()' instead.") 395 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 396 """Deprecated. Use `dump_raw_state()` instead.""" 397 state_response = api_util.get_connection_state( 398 connection_id=self.connection_id, 399 api_root=self.workspace.api_root, 400 client_id=self.workspace.client_id, 401 client_secret=self.workspace.client_secret, 402 bearer_token=self.workspace.bearer_token, 403 config_api_root=self.workspace.config_api_root, 404 ) 405 if state_response.get("stateType") == "not_set": 406 return None 407 return state_response.get("streamState", []) 408 409 @overload 410 def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ... 411 412 @overload 413 def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ... 414 415 def dump_raw_state( 416 self, 417 *, 418 normalize: bool = True, 419 ) -> dict[str, Any] | list[dict[str, Any]]: 420 """Dump the state for this connection. 421 422 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 423 with snake_case keys, suitable for passing to a connector's `--state` flag. 424 425 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 426 includes `stateType` and `connectionId`). This raw format can be passed 427 directly to `import_raw_state()` for backup/restore workflows. 428 429 Args: 430 normalize: If `True` (default), convert to Airbyte protocol format. 431 If `False`, return the raw Config API response. 432 433 Returns: 434 Normalized: list of protocol-format state message dicts (empty list if 435 no state). Raw: the full Config API state dict. 436 """ 437 raw = api_util.get_connection_state( 438 connection_id=self.connection_id, 439 api_root=self.workspace.api_root, 440 client_id=self.workspace.client_id, 441 client_secret=self.workspace.client_secret, 442 bearer_token=self.workspace.bearer_token, 443 config_api_root=self.workspace.config_api_root, 444 ) 445 if normalize: 446 return _normalize_state_to_protocol(raw) 447 return raw 448 449 def import_raw_state( 450 self, 451 connection_state: dict[str, Any] | list[dict[str, Any]], 452 ) -> dict[str, Any]: 453 """Import (restore) the full state for this connection. 454 455 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 456 > could result in broken connections, and/or incorrect sync behavior. 457 458 Replaces the entire connection state with the provided state blob. 459 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 460 461 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 462 The `connectionId` in the blob is always overridden with this connection's 463 ID, making state blobs portable across connections. 464 465 Accepts either format: 466 467 - **Config API format** (dict with `stateType`): passed through directly. 468 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 469 converted to Config API format before sending. 470 471 Args: 472 connection_state: Connection state in either Config API or Airbyte protocol format. 473 474 Returns: 475 The updated connection state as a dictionary. 476 477 Raises: 478 AirbyteConnectionSyncActiveError: If a sync is currently running on this 479 connection (HTTP 423). Wait for the sync to complete before retrying. 480 """ 481 api_state: dict[str, Any] 482 if isinstance(connection_state, list): 483 if not _is_protocol_state_format(connection_state): 484 msg = ( 485 "Expected connection_state list to contain Airbyte protocol state " 486 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 487 "or LEGACY). Got a list that does not match protocol format." 488 ) 489 raise ValueError(msg) 490 api_state = _denormalize_protocol_state_to_api( 491 protocol_messages=connection_state, 492 connection_id=self.connection_id, 493 ) 494 elif isinstance(connection_state, dict): 495 if _is_protocol_state_format(connection_state): 496 api_state = _denormalize_protocol_state_to_api( 497 protocol_messages=[connection_state], 498 connection_id=self.connection_id, 499 ) 500 else: 501 api_state = connection_state 502 else: 503 msg = f"Expected a dict or list, got {type(connection_state)}" 504 raise TypeError(msg) 505 506 return api_util.replace_connection_state( 507 connection_id=self.connection_id, 508 connection_state_dict=api_state, 509 api_root=self.workspace.api_root, 510 client_id=self.workspace.client_id, 511 client_secret=self.workspace.client_secret, 512 bearer_token=self.workspace.bearer_token, 513 config_api_root=self.workspace.config_api_root, 514 ) 515 516 def get_stream_state( 517 self, 518 stream_name: str, 519 stream_namespace: str | None = None, 520 ) -> dict[str, Any] | None: 521 """Get the state blob for a single stream within this connection. 522 523 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 524 not the full connection state envelope. 525 526 This is compatible with `stream`-type state and stream-level entries 527 within a `global`-type state. It is not compatible with `legacy` state. 528 To get or set the entire connection-level state artifact, use 529 `dump_raw_state` and `import_raw_state` instead. 530 531 Args: 532 stream_name: The name of the stream to get state for. 533 stream_namespace: The source-side stream namespace. This refers to the 534 namespace from the source (e.g., database schema), not any destination 535 namespace override set in connection advanced settings. 536 537 Returns: 538 The stream's state blob as a dictionary, or None if the stream is not found. 539 """ 540 state_data = self.dump_raw_state(normalize=False) 541 result = ConnectionStateResponse(**state_data) 542 543 streams = _get_stream_list(result) 544 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 545 546 if not matching: 547 available = [s.stream_descriptor.name for s in streams] 548 logger.warning( 549 "Stream '%s' not found in connection state for connection '%s'. " 550 "Available streams: %s", 551 stream_name, 552 self.connection_id, 553 available, 554 ) 555 return None 556 557 return matching[0].stream_state 558 559 def set_stream_state( 560 self, 561 stream_name: str, 562 state_blob_dict: dict[str, Any], 563 stream_namespace: str | None = None, 564 ) -> None: 565 """Set the state for a single stream within this connection. 566 567 Fetches the current full state, replaces only the specified stream's state, 568 then sends the full updated state back to the API. If the stream does not 569 exist in the current state, it is appended. 570 571 This is compatible with `stream`-type state and stream-level entries 572 within a `global`-type state. It is not compatible with `legacy` state. 573 To get or set the entire connection-level state artifact, use 574 `dump_raw_state` and `import_raw_state` instead. 575 576 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 577 578 Args: 579 stream_name: The name of the stream to update state for. 580 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 581 stream_namespace: The source-side stream namespace. This refers to the 582 namespace from the source (e.g., database schema), not any destination 583 namespace override set in connection advanced settings. 584 585 Raises: 586 PyAirbyteInputError: If the connection state type is not supported for 587 stream-level operations (not_set, legacy). 588 AirbyteConnectionSyncActiveError: If a sync is currently running on this 589 connection (HTTP 423). Wait for the sync to complete before retrying. 590 """ 591 state_data = self.dump_raw_state(normalize=False) 592 current = ConnectionStateResponse(**state_data) 593 594 if current.state_type == "not_set": 595 raise PyAirbyteInputError( 596 message="Cannot set stream state: connection has no existing state.", 597 context={"connection_id": self.connection_id}, 598 ) 599 600 if current.state_type == "legacy": 601 raise PyAirbyteInputError( 602 message="Cannot set stream state on a legacy-type connection state.", 603 context={"connection_id": self.connection_id}, 604 ) 605 606 new_stream_entry = { 607 "streamDescriptor": { 608 "name": stream_name, 609 **( 610 { 611 "namespace": stream_namespace, 612 } 613 if stream_namespace 614 else {} 615 ), 616 }, 617 "streamState": state_blob_dict, 618 } 619 620 raw_streams: list[dict[str, Any]] 621 if current.state_type == "stream": 622 raw_streams = state_data.get("streamState", []) 623 elif current.state_type == "global": 624 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 625 else: 626 raw_streams = [] 627 628 streams = _get_stream_list(current) 629 found = False 630 updated_streams_raw: list[dict[str, Any]] = [] 631 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 632 if _match_stream(parsed_s, stream_name, stream_namespace): 633 updated_streams_raw.append(new_stream_entry) 634 found = True 635 else: 636 updated_streams_raw.append(raw_s) 637 638 if not found: 639 updated_streams_raw.append(new_stream_entry) 640 641 full_state: dict[str, Any] = { 642 **state_data, 643 } 644 645 if current.state_type == "stream": 646 full_state["streamState"] = updated_streams_raw 647 elif current.state_type == "global": 648 original_global = state_data.get("globalState", {}) 649 full_state["globalState"] = { 650 **original_global, 651 "streamStates": updated_streams_raw, 652 } 653 654 self.import_raw_state(full_state) 655 656 @deprecated("Use 'dump_raw_catalog()' instead.") 657 def get_catalog_artifact(self) -> dict[str, Any] | None: 658 """Get the configured catalog for this connection. 659 660 Returns the full configured catalog (syncCatalog) for this connection, 661 including stream schemas, sync modes, cursor fields, and primary keys. 662 663 Uses the Config API endpoint: POST /v1/web_backend/connections/get 664 665 Returns: 666 Dictionary containing the configured catalog, or `None` if not found. 667 """ 668 return self.dump_raw_catalog() 669 670 def dump_raw_catalog( 671 self, 672 *, 673 normalize: bool = True, 674 ) -> dict[str, Any] | None: 675 """Dump the configured catalog for this connection. 676 677 By default, returns the catalog in Airbyte protocol format 678 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 679 to a connector's `--catalog` flag. 680 681 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 682 Config API (camelCase keys, nested `config` block). This raw format can be 683 passed directly to `import_raw_catalog()` for backup/restore workflows. 684 685 Args: 686 normalize: If `True` (default), convert to Airbyte protocol format. 687 If `False`, return the raw Config API catalog. 688 689 Returns: 690 The configured catalog dict, or `None` if not found. 691 """ 692 connection_response = api_util.get_connection_catalog( 693 connection_id=self.connection_id, 694 api_root=self.workspace.api_root, 695 client_id=self.workspace.client_id, 696 client_secret=self.workspace.client_secret, 697 bearer_token=self.workspace.bearer_token, 698 config_api_root=self.workspace.config_api_root, 699 ) 700 raw = connection_response.get("syncCatalog") 701 if raw is None: 702 return None 703 if normalize: 704 return _normalize_catalog_to_protocol(raw) 705 return raw 706 707 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 708 """Replace the configured catalog for this connection. 709 710 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 711 > could result in broken connections, and/or incorrect sync behavior. 712 713 Accepts a configured catalog dict and replaces the connection's entire 714 catalog with it. All other connection settings remain unchanged. 715 716 Accepts either format: 717 718 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 719 passed through directly. 720 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 721 automatically converted to Config API format before sending. 722 723 Args: 724 catalog: The configured catalog dict in either format. 725 """ 726 if _is_protocol_catalog_format(catalog): 727 catalog = _denormalize_catalog_to_api(catalog) 728 729 api_util.replace_connection_catalog( 730 connection_id=self.connection_id, 731 configured_catalog_dict=catalog, 732 api_root=self.workspace.api_root, 733 client_id=self.workspace.client_id, 734 client_secret=self.workspace.client_secret, 735 bearer_token=self.workspace.bearer_token, 736 config_api_root=self.workspace.config_api_root, 737 ) 738 739 def rename(self, name: str) -> CloudConnection: 740 """Rename the connection. 741 742 Args: 743 name: New name for the connection 744 745 Returns: 746 Updated CloudConnection object with refreshed info 747 """ 748 updated_response = api_util.patch_connection( 749 connection_id=self.connection_id, 750 api_root=self.workspace.api_root, 751 client_id=self.workspace.client_id, 752 client_secret=self.workspace.client_secret, 753 bearer_token=self.workspace.bearer_token, 754 name=name, 755 ) 756 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 757 return self 758 759 def set_table_prefix(self, prefix: str) -> CloudConnection: 760 """Set the table prefix for the connection. 761 762 Args: 763 prefix: New table prefix to use when syncing to the destination 764 765 Returns: 766 Updated CloudConnection object with refreshed info 767 """ 768 updated_response = api_util.patch_connection( 769 connection_id=self.connection_id, 770 api_root=self.workspace.api_root, 771 client_id=self.workspace.client_id, 772 client_secret=self.workspace.client_secret, 773 bearer_token=self.workspace.bearer_token, 774 prefix=prefix, 775 ) 776 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 777 return self 778 779 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 780 """Set the selected streams for the connection. 781 782 This is a destructive operation that can break existing connections if the 783 stream selection is changed incorrectly. Use with caution. 784 785 Args: 786 stream_names: List of stream names to sync 787 788 Returns: 789 Updated CloudConnection object with refreshed info 790 """ 791 configurations = api_util.build_stream_configurations(stream_names) 792 793 updated_response = api_util.patch_connection( 794 connection_id=self.connection_id, 795 api_root=self.workspace.api_root, 796 client_id=self.workspace.client_id, 797 client_secret=self.workspace.client_secret, 798 bearer_token=self.workspace.bearer_token, 799 configurations=configurations, 800 ) 801 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 802 return self 803 804 # Enable/Disable 805 806 @property 807 def enabled(self) -> bool: 808 """Get the current enabled status of the connection. 809 810 This property always fetches fresh data from the API to ensure accuracy, 811 as another process or user may have toggled the setting. 812 813 Returns: 814 True if the connection status is 'active', False otherwise. 815 """ 816 connection_info = self._fetch_connection_info(force_refresh=True) 817 return connection_info.status == "active" 818 819 @enabled.setter 820 def enabled(self, value: bool) -> None: 821 """Set the enabled status of the connection. 822 823 Args: 824 value: True to enable (set status to 'active'), False to disable 825 (set status to 'inactive'). 826 """ 827 self.set_enabled(enabled=value) 828 829 def set_enabled( 830 self, 831 *, 832 enabled: bool, 833 ignore_noop: bool = True, 834 ) -> None: 835 """Set the enabled status of the connection. 836 837 Args: 838 enabled: True to enable (set status to 'active'), False to disable 839 (set status to 'inactive'). 840 ignore_noop: If True (default), silently return if the connection is already 841 in the requested state. If False, raise ValueError when the requested 842 state matches the current state. 843 844 Raises: 845 ValueError: If ignore_noop is False and the connection is already in the 846 requested state. 847 """ 848 # Always fetch fresh data to check current status 849 connection_info = self._fetch_connection_info(force_refresh=True) 850 current_status = connection_info.status 851 desired_status = "active" if enabled else "inactive" 852 853 if current_status == desired_status: 854 if ignore_noop: 855 return 856 raise ValueError( 857 f"Connection is already {'enabled' if enabled else 'disabled'}. " 858 f"Current status: {current_status}" 859 ) 860 861 updated_response = api_util.patch_connection( 862 connection_id=self.connection_id, 863 api_root=self.workspace.api_root, 864 client_id=self.workspace.client_id, 865 client_secret=self.workspace.client_secret, 866 bearer_token=self.workspace.bearer_token, 867 status=desired_status, 868 ) 869 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 870 871 # Scheduling 872 873 def set_schedule( 874 self, 875 cron_expression: str, 876 ) -> None: 877 """Set a cron schedule for the connection. 878 879 Args: 880 cron_expression: A cron expression defining when syncs should run. 881 882 Examples: 883 - "0 0 * * *" - Daily at midnight UTC 884 - "0 */6 * * *" - Every 6 hours 885 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 886 """ 887 updated_response = api_util.patch_connection( 888 connection_id=self.connection_id, 889 api_root=self.workspace.api_root, 890 client_id=self.workspace.client_id, 891 client_secret=self.workspace.client_secret, 892 bearer_token=self.workspace.bearer_token, 893 schedule=api_util.build_connection_schedule( 894 schedule_type="cron", 895 cron_expression=cron_expression, 896 ), 897 ) 898 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 899 900 def set_manual_schedule(self) -> None: 901 """Set the connection to manual scheduling. 902 903 Disables automatic syncs. Syncs will only run when manually triggered. 904 """ 905 updated_response = api_util.patch_connection( 906 connection_id=self.connection_id, 907 api_root=self.workspace.api_root, 908 client_id=self.workspace.client_id, 909 client_secret=self.workspace.client_secret, 910 bearer_token=self.workspace.bearer_token, 911 schedule=api_util.build_connection_schedule(schedule_type="manual"), 912 ) 913 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 914 915 # Deletions 916 917 def permanently_delete( 918 self, 919 *, 920 cascade_delete_source: bool = False, 921 cascade_delete_destination: bool = False, 922 ) -> None: 923 """Delete the connection. 924 925 Args: 926 cascade_delete_source: Whether to also delete the source. 927 cascade_delete_destination: Whether to also delete the destination. 928 """ 929 self.workspace.permanently_delete_connection(self) 930 931 if cascade_delete_source: 932 self.workspace.permanently_delete_source(self.source_id) 933 934 if cascade_delete_destination: 935 self.workspace.permanently_delete_destination(self.destination_id)
44class CloudConnection: # noqa: PLR0904 # Too many public methods 45 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 46 47 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 48 """ 49 50 def __init__( 51 self, 52 workspace: CloudWorkspace, 53 connection_id: str, 54 source: str | None = None, 55 destination: str | None = None, 56 ) -> None: 57 """It is not recommended to create a `CloudConnection` object directly. 58 59 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 60 """ 61 self.connection_id = connection_id 62 """The ID of the connection.""" 63 64 self.workspace = workspace 65 """The workspace that the connection belongs to.""" 66 67 self._source_id = source 68 """The ID of the source.""" 69 70 self._destination_id = destination 71 """The ID of the destination.""" 72 73 self._connection_info: CloudConnectionInfo | None = None 74 """The connection info object. (Cached.)""" 75 76 self._cloud_source_object: CloudSource | None = None 77 """The source object. (Cached.)""" 78 79 self._cloud_destination_object: CloudDestination | None = None 80 """The destination object. (Cached.)""" 81 82 def _fetch_connection_info( 83 self, 84 *, 85 force_refresh: bool = False, 86 verify: bool = True, 87 ) -> CloudConnectionInfo: 88 """Fetch and cache connection info from the API. 89 90 By default, this method will only fetch from the API if connection info is not 91 already cached. It also verifies that the connection belongs to the expected 92 workspace unless verification is explicitly disabled. 93 94 Args: 95 force_refresh: If True, always fetch from the API even if cached. 96 If False (default), only fetch if not already cached. 97 verify: If True (default), verify that the connection is valid (e.g., that 98 the workspace_id matches this object's workspace). Raises an error if 99 validation fails. 100 101 Returns: 102 Information about the connection from the API. 103 104 Raises: 105 AirbyteWorkspaceMismatchError: If verify is True and the connection's 106 workspace_id doesn't match the expected workspace. 107 AirbyteMissingResourceError: If the connection doesn't exist. 108 """ 109 if not force_refresh and self._connection_info is not None: 110 # Use cached info, but still verify if requested 111 if verify: 112 self._verify_workspace_match(self._connection_info) 113 return self._connection_info 114 115 # Fetch from API 116 connection_info = api_util.get_connection( 117 workspace_id=self.workspace.workspace_id, 118 connection_id=self.connection_id, 119 api_root=self.workspace.api_root, 120 client_id=self.workspace.client_id, 121 client_secret=self.workspace.client_secret, 122 bearer_token=self.workspace.bearer_token, 123 ) 124 result = CloudConnectionInfo.from_api_response(connection_info) 125 126 self._connection_info = result 127 128 # Verify if requested 129 if verify: 130 self._verify_workspace_match(result) 131 132 return result 133 134 def _verify_workspace_match(self, connection_info: CloudConnectionInfo) -> None: 135 """Verify that the connection belongs to the expected workspace. 136 137 Raises: 138 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 139 """ 140 if connection_info.workspace_id != self.workspace.workspace_id: 141 raise AirbyteWorkspaceMismatchError( 142 resource_type="connection", 143 resource_id=self.connection_id, 144 workspace=self.workspace, 145 expected_workspace_id=self.workspace.workspace_id, 146 actual_workspace_id=connection_info.workspace_id, 147 message=( 148 f"Connection '{self.connection_id}' belongs to workspace " 149 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 150 ), 151 ) 152 153 def check_is_valid(self) -> bool: 154 """Check if this connection exists and belongs to the expected workspace. 155 156 This method fetches connection info from the API (if not already cached) and 157 verifies that the connection's workspace_id matches the workspace associated 158 with this CloudConnection object. 159 160 Returns: 161 True if the connection exists and belongs to the expected workspace. 162 163 Raises: 164 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 165 AirbyteMissingResourceError: If the connection doesn't exist. 166 """ 167 self._fetch_connection_info(force_refresh=False, verify=True) 168 return True 169 170 @classmethod 171 def _from_connection_response( 172 cls, 173 workspace: CloudWorkspace, 174 connection_response: _ConnectionResponseLike, 175 ) -> CloudConnection: 176 """Create a CloudConnection from an API connection response.""" 177 connection_info = CloudConnectionInfo.from_api_response(connection_response) 178 result = cls( 179 workspace=workspace, 180 connection_id=connection_info.connection_id, 181 source=connection_info.source_id, 182 destination=connection_info.destination_id, 183 ) 184 result._connection_info = connection_info # noqa: SLF001 # Accessing Non-Public API 185 return result 186 187 # Properties 188 189 @property 190 def name(self) -> str | None: 191 """Get the display name of the connection, if available. 192 193 E.g. "My Postgres to Snowflake", not the connection ID. 194 """ 195 if not self._connection_info: 196 self._connection_info = self._fetch_connection_info() 197 198 return self._connection_info.name 199 200 @property 201 def source_id(self) -> str: 202 """The ID of the source.""" 203 if not self._source_id: 204 if not self._connection_info: 205 self._connection_info = self._fetch_connection_info() 206 207 self._source_id = self._connection_info.source_id 208 209 return self._source_id 210 211 @property 212 def source(self) -> CloudSource: 213 """Get the source object.""" 214 if self._cloud_source_object: 215 return self._cloud_source_object 216 217 self._cloud_source_object = CloudSource( 218 workspace=self.workspace, 219 connector_id=self.source_id, 220 ) 221 return self._cloud_source_object 222 223 @property 224 def destination_id(self) -> str: 225 """The ID of the destination.""" 226 if not self._destination_id: 227 if not self._connection_info: 228 self._connection_info = self._fetch_connection_info() 229 230 self._destination_id = self._connection_info.destination_id 231 232 return self._destination_id 233 234 @property 235 def destination(self) -> CloudDestination: 236 """Get the destination object.""" 237 if self._cloud_destination_object: 238 return self._cloud_destination_object 239 240 self._cloud_destination_object = CloudDestination( 241 workspace=self.workspace, 242 connector_id=self.destination_id, 243 ) 244 return self._cloud_destination_object 245 246 @property 247 def stream_names(self) -> list[str]: 248 """The stream names.""" 249 if not self._connection_info: 250 self._connection_info = self._fetch_connection_info() 251 252 return [stream.name for stream in self._connection_info.configurations.streams or []] 253 254 @property 255 def table_prefix(self) -> str: 256 """The table prefix.""" 257 if not self._connection_info: 258 self._connection_info = self._fetch_connection_info() 259 260 return self._connection_info.prefix or "" 261 262 @property 263 def connection_url(self) -> str | None: 264 """The web URL to the connection.""" 265 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 266 267 @property 268 def job_history_url(self) -> str | None: 269 """The URL to the job history for the connection.""" 270 return f"{self.connection_url}/timeline" 271 272 # Run Sync 273 274 def run_sync( 275 self, 276 *, 277 wait: bool = True, 278 wait_timeout: int = 300, 279 ) -> SyncResult: 280 """Run a sync.""" 281 connection_response = api_util.run_connection( 282 connection_id=self.connection_id, 283 api_root=self.workspace.api_root, 284 workspace_id=self.workspace.workspace_id, 285 client_id=self.workspace.client_id, 286 client_secret=self.workspace.client_secret, 287 bearer_token=self.workspace.bearer_token, 288 ) 289 sync_result = SyncResult( 290 workspace=self.workspace, 291 connection=self, 292 job_id=connection_response.job_id, 293 ) 294 295 if wait: 296 sync_result.wait_for_completion( 297 wait_timeout=wait_timeout, 298 raise_failure=True, 299 raise_timeout=True, 300 ) 301 302 return sync_result 303 304 def __repr__(self) -> str: 305 """String representation of the connection.""" 306 return ( 307 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 308 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 309 ) 310 311 # Logs 312 313 def get_previous_sync_logs( 314 self, 315 *, 316 limit: int = 20, 317 offset: int | None = None, 318 from_tail: bool = True, 319 job_type: str | JobTypeEnum | None = None, 320 ) -> list[SyncResult]: 321 """Get previous sync jobs for a connection with pagination support. 322 323 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 324 rows_synced, start_time). Full log text can be fetched lazily via 325 `SyncResult.get_full_log_text()`. 326 327 Args: 328 limit: Maximum number of jobs to return. Defaults to 20. 329 offset: Number of jobs to skip from the beginning. Defaults to None (0). 330 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 331 If False, returns jobs ordered oldest-first (createdAt ASC). 332 Defaults to True. 333 job_type: Filter by job type (e.g., `sync`, `refresh`). 334 If not specified, defaults to sync and reset jobs only (API default behavior). 335 336 Returns: 337 A list of SyncResult objects representing the sync jobs. 338 """ 339 order_by = ( 340 api_util.JOB_ORDER_BY_CREATED_AT_DESC 341 if from_tail 342 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 343 ) 344 sync_logs = api_util.get_job_logs( 345 connection_id=self.connection_id, 346 api_root=self.workspace.api_root, 347 workspace_id=self.workspace.workspace_id, 348 limit=limit, 349 offset=offset, 350 order_by=order_by, 351 job_type=job_type, 352 client_id=self.workspace.client_id, 353 client_secret=self.workspace.client_secret, 354 bearer_token=self.workspace.bearer_token, 355 ) 356 return [ 357 SyncResult( 358 workspace=self.workspace, 359 connection=self, 360 job_id=sync_log.job_id, 361 _latest_job_info=CloudJobInfo.from_api_response(sync_log), 362 ) 363 for sync_log in sync_logs 364 ] 365 366 def get_sync_result( 367 self, 368 job_id: int | None = None, 369 ) -> SyncResult | None: 370 """Get the sync result for the connection. 371 372 If `job_id` is not provided, the most recent sync job will be used. 373 374 Returns `None` if job_id is omitted and no previous jobs are found. 375 """ 376 if job_id is None: 377 # Get the most recent sync job 378 results = self.get_previous_sync_logs( 379 limit=1, 380 ) 381 if results: 382 return results[0] 383 384 return None 385 386 # Get the sync job by ID (lazy loaded) 387 return SyncResult( 388 workspace=self.workspace, 389 connection=self, 390 job_id=job_id, 391 ) 392 393 # Artifacts 394 395 @deprecated("Use 'dump_raw_state()' instead.") 396 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 397 """Deprecated. Use `dump_raw_state()` instead.""" 398 state_response = api_util.get_connection_state( 399 connection_id=self.connection_id, 400 api_root=self.workspace.api_root, 401 client_id=self.workspace.client_id, 402 client_secret=self.workspace.client_secret, 403 bearer_token=self.workspace.bearer_token, 404 config_api_root=self.workspace.config_api_root, 405 ) 406 if state_response.get("stateType") == "not_set": 407 return None 408 return state_response.get("streamState", []) 409 410 @overload 411 def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ... 412 413 @overload 414 def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ... 415 416 def dump_raw_state( 417 self, 418 *, 419 normalize: bool = True, 420 ) -> dict[str, Any] | list[dict[str, Any]]: 421 """Dump the state for this connection. 422 423 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 424 with snake_case keys, suitable for passing to a connector's `--state` flag. 425 426 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 427 includes `stateType` and `connectionId`). This raw format can be passed 428 directly to `import_raw_state()` for backup/restore workflows. 429 430 Args: 431 normalize: If `True` (default), convert to Airbyte protocol format. 432 If `False`, return the raw Config API response. 433 434 Returns: 435 Normalized: list of protocol-format state message dicts (empty list if 436 no state). Raw: the full Config API state dict. 437 """ 438 raw = api_util.get_connection_state( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 config_api_root=self.workspace.config_api_root, 445 ) 446 if normalize: 447 return _normalize_state_to_protocol(raw) 448 return raw 449 450 def import_raw_state( 451 self, 452 connection_state: dict[str, Any] | list[dict[str, Any]], 453 ) -> dict[str, Any]: 454 """Import (restore) the full state for this connection. 455 456 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 457 > could result in broken connections, and/or incorrect sync behavior. 458 459 Replaces the entire connection state with the provided state blob. 460 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 461 462 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 463 The `connectionId` in the blob is always overridden with this connection's 464 ID, making state blobs portable across connections. 465 466 Accepts either format: 467 468 - **Config API format** (dict with `stateType`): passed through directly. 469 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 470 converted to Config API format before sending. 471 472 Args: 473 connection_state: Connection state in either Config API or Airbyte protocol format. 474 475 Returns: 476 The updated connection state as a dictionary. 477 478 Raises: 479 AirbyteConnectionSyncActiveError: If a sync is currently running on this 480 connection (HTTP 423). Wait for the sync to complete before retrying. 481 """ 482 api_state: dict[str, Any] 483 if isinstance(connection_state, list): 484 if not _is_protocol_state_format(connection_state): 485 msg = ( 486 "Expected connection_state list to contain Airbyte protocol state " 487 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 488 "or LEGACY). Got a list that does not match protocol format." 489 ) 490 raise ValueError(msg) 491 api_state = _denormalize_protocol_state_to_api( 492 protocol_messages=connection_state, 493 connection_id=self.connection_id, 494 ) 495 elif isinstance(connection_state, dict): 496 if _is_protocol_state_format(connection_state): 497 api_state = _denormalize_protocol_state_to_api( 498 protocol_messages=[connection_state], 499 connection_id=self.connection_id, 500 ) 501 else: 502 api_state = connection_state 503 else: 504 msg = f"Expected a dict or list, got {type(connection_state)}" 505 raise TypeError(msg) 506 507 return api_util.replace_connection_state( 508 connection_id=self.connection_id, 509 connection_state_dict=api_state, 510 api_root=self.workspace.api_root, 511 client_id=self.workspace.client_id, 512 client_secret=self.workspace.client_secret, 513 bearer_token=self.workspace.bearer_token, 514 config_api_root=self.workspace.config_api_root, 515 ) 516 517 def get_stream_state( 518 self, 519 stream_name: str, 520 stream_namespace: str | None = None, 521 ) -> dict[str, Any] | None: 522 """Get the state blob for a single stream within this connection. 523 524 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 525 not the full connection state envelope. 526 527 This is compatible with `stream`-type state and stream-level entries 528 within a `global`-type state. It is not compatible with `legacy` state. 529 To get or set the entire connection-level state artifact, use 530 `dump_raw_state` and `import_raw_state` instead. 531 532 Args: 533 stream_name: The name of the stream to get state for. 534 stream_namespace: The source-side stream namespace. This refers to the 535 namespace from the source (e.g., database schema), not any destination 536 namespace override set in connection advanced settings. 537 538 Returns: 539 The stream's state blob as a dictionary, or None if the stream is not found. 540 """ 541 state_data = self.dump_raw_state(normalize=False) 542 result = ConnectionStateResponse(**state_data) 543 544 streams = _get_stream_list(result) 545 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 546 547 if not matching: 548 available = [s.stream_descriptor.name for s in streams] 549 logger.warning( 550 "Stream '%s' not found in connection state for connection '%s'. " 551 "Available streams: %s", 552 stream_name, 553 self.connection_id, 554 available, 555 ) 556 return None 557 558 return matching[0].stream_state 559 560 def set_stream_state( 561 self, 562 stream_name: str, 563 state_blob_dict: dict[str, Any], 564 stream_namespace: str | None = None, 565 ) -> None: 566 """Set the state for a single stream within this connection. 567 568 Fetches the current full state, replaces only the specified stream's state, 569 then sends the full updated state back to the API. If the stream does not 570 exist in the current state, it is appended. 571 572 This is compatible with `stream`-type state and stream-level entries 573 within a `global`-type state. It is not compatible with `legacy` state. 574 To get or set the entire connection-level state artifact, use 575 `dump_raw_state` and `import_raw_state` instead. 576 577 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 578 579 Args: 580 stream_name: The name of the stream to update state for. 581 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 582 stream_namespace: The source-side stream namespace. This refers to the 583 namespace from the source (e.g., database schema), not any destination 584 namespace override set in connection advanced settings. 585 586 Raises: 587 PyAirbyteInputError: If the connection state type is not supported for 588 stream-level operations (not_set, legacy). 589 AirbyteConnectionSyncActiveError: If a sync is currently running on this 590 connection (HTTP 423). Wait for the sync to complete before retrying. 591 """ 592 state_data = self.dump_raw_state(normalize=False) 593 current = ConnectionStateResponse(**state_data) 594 595 if current.state_type == "not_set": 596 raise PyAirbyteInputError( 597 message="Cannot set stream state: connection has no existing state.", 598 context={"connection_id": self.connection_id}, 599 ) 600 601 if current.state_type == "legacy": 602 raise PyAirbyteInputError( 603 message="Cannot set stream state on a legacy-type connection state.", 604 context={"connection_id": self.connection_id}, 605 ) 606 607 new_stream_entry = { 608 "streamDescriptor": { 609 "name": stream_name, 610 **( 611 { 612 "namespace": stream_namespace, 613 } 614 if stream_namespace 615 else {} 616 ), 617 }, 618 "streamState": state_blob_dict, 619 } 620 621 raw_streams: list[dict[str, Any]] 622 if current.state_type == "stream": 623 raw_streams = state_data.get("streamState", []) 624 elif current.state_type == "global": 625 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 626 else: 627 raw_streams = [] 628 629 streams = _get_stream_list(current) 630 found = False 631 updated_streams_raw: list[dict[str, Any]] = [] 632 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 633 if _match_stream(parsed_s, stream_name, stream_namespace): 634 updated_streams_raw.append(new_stream_entry) 635 found = True 636 else: 637 updated_streams_raw.append(raw_s) 638 639 if not found: 640 updated_streams_raw.append(new_stream_entry) 641 642 full_state: dict[str, Any] = { 643 **state_data, 644 } 645 646 if current.state_type == "stream": 647 full_state["streamState"] = updated_streams_raw 648 elif current.state_type == "global": 649 original_global = state_data.get("globalState", {}) 650 full_state["globalState"] = { 651 **original_global, 652 "streamStates": updated_streams_raw, 653 } 654 655 self.import_raw_state(full_state) 656 657 @deprecated("Use 'dump_raw_catalog()' instead.") 658 def get_catalog_artifact(self) -> dict[str, Any] | None: 659 """Get the configured catalog for this connection. 660 661 Returns the full configured catalog (syncCatalog) for this connection, 662 including stream schemas, sync modes, cursor fields, and primary keys. 663 664 Uses the Config API endpoint: POST /v1/web_backend/connections/get 665 666 Returns: 667 Dictionary containing the configured catalog, or `None` if not found. 668 """ 669 return self.dump_raw_catalog() 670 671 def dump_raw_catalog( 672 self, 673 *, 674 normalize: bool = True, 675 ) -> dict[str, Any] | None: 676 """Dump the configured catalog for this connection. 677 678 By default, returns the catalog in Airbyte protocol format 679 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 680 to a connector's `--catalog` flag. 681 682 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 683 Config API (camelCase keys, nested `config` block). This raw format can be 684 passed directly to `import_raw_catalog()` for backup/restore workflows. 685 686 Args: 687 normalize: If `True` (default), convert to Airbyte protocol format. 688 If `False`, return the raw Config API catalog. 689 690 Returns: 691 The configured catalog dict, or `None` if not found. 692 """ 693 connection_response = api_util.get_connection_catalog( 694 connection_id=self.connection_id, 695 api_root=self.workspace.api_root, 696 client_id=self.workspace.client_id, 697 client_secret=self.workspace.client_secret, 698 bearer_token=self.workspace.bearer_token, 699 config_api_root=self.workspace.config_api_root, 700 ) 701 raw = connection_response.get("syncCatalog") 702 if raw is None: 703 return None 704 if normalize: 705 return _normalize_catalog_to_protocol(raw) 706 return raw 707 708 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 709 """Replace the configured catalog for this connection. 710 711 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 712 > could result in broken connections, and/or incorrect sync behavior. 713 714 Accepts a configured catalog dict and replaces the connection's entire 715 catalog with it. All other connection settings remain unchanged. 716 717 Accepts either format: 718 719 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 720 passed through directly. 721 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 722 automatically converted to Config API format before sending. 723 724 Args: 725 catalog: The configured catalog dict in either format. 726 """ 727 if _is_protocol_catalog_format(catalog): 728 catalog = _denormalize_catalog_to_api(catalog) 729 730 api_util.replace_connection_catalog( 731 connection_id=self.connection_id, 732 configured_catalog_dict=catalog, 733 api_root=self.workspace.api_root, 734 client_id=self.workspace.client_id, 735 client_secret=self.workspace.client_secret, 736 bearer_token=self.workspace.bearer_token, 737 config_api_root=self.workspace.config_api_root, 738 ) 739 740 def rename(self, name: str) -> CloudConnection: 741 """Rename the connection. 742 743 Args: 744 name: New name for the connection 745 746 Returns: 747 Updated CloudConnection object with refreshed info 748 """ 749 updated_response = api_util.patch_connection( 750 connection_id=self.connection_id, 751 api_root=self.workspace.api_root, 752 client_id=self.workspace.client_id, 753 client_secret=self.workspace.client_secret, 754 bearer_token=self.workspace.bearer_token, 755 name=name, 756 ) 757 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 758 return self 759 760 def set_table_prefix(self, prefix: str) -> CloudConnection: 761 """Set the table prefix for the connection. 762 763 Args: 764 prefix: New table prefix to use when syncing to the destination 765 766 Returns: 767 Updated CloudConnection object with refreshed info 768 """ 769 updated_response = api_util.patch_connection( 770 connection_id=self.connection_id, 771 api_root=self.workspace.api_root, 772 client_id=self.workspace.client_id, 773 client_secret=self.workspace.client_secret, 774 bearer_token=self.workspace.bearer_token, 775 prefix=prefix, 776 ) 777 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 778 return self 779 780 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 781 """Set the selected streams for the connection. 782 783 This is a destructive operation that can break existing connections if the 784 stream selection is changed incorrectly. Use with caution. 785 786 Args: 787 stream_names: List of stream names to sync 788 789 Returns: 790 Updated CloudConnection object with refreshed info 791 """ 792 configurations = api_util.build_stream_configurations(stream_names) 793 794 updated_response = api_util.patch_connection( 795 connection_id=self.connection_id, 796 api_root=self.workspace.api_root, 797 client_id=self.workspace.client_id, 798 client_secret=self.workspace.client_secret, 799 bearer_token=self.workspace.bearer_token, 800 configurations=configurations, 801 ) 802 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 803 return self 804 805 # Enable/Disable 806 807 @property 808 def enabled(self) -> bool: 809 """Get the current enabled status of the connection. 810 811 This property always fetches fresh data from the API to ensure accuracy, 812 as another process or user may have toggled the setting. 813 814 Returns: 815 True if the connection status is 'active', False otherwise. 816 """ 817 connection_info = self._fetch_connection_info(force_refresh=True) 818 return connection_info.status == "active" 819 820 @enabled.setter 821 def enabled(self, value: bool) -> None: 822 """Set the enabled status of the connection. 823 824 Args: 825 value: True to enable (set status to 'active'), False to disable 826 (set status to 'inactive'). 827 """ 828 self.set_enabled(enabled=value) 829 830 def set_enabled( 831 self, 832 *, 833 enabled: bool, 834 ignore_noop: bool = True, 835 ) -> None: 836 """Set the enabled status of the connection. 837 838 Args: 839 enabled: True to enable (set status to 'active'), False to disable 840 (set status to 'inactive'). 841 ignore_noop: If True (default), silently return if the connection is already 842 in the requested state. If False, raise ValueError when the requested 843 state matches the current state. 844 845 Raises: 846 ValueError: If ignore_noop is False and the connection is already in the 847 requested state. 848 """ 849 # Always fetch fresh data to check current status 850 connection_info = self._fetch_connection_info(force_refresh=True) 851 current_status = connection_info.status 852 desired_status = "active" if enabled else "inactive" 853 854 if current_status == desired_status: 855 if ignore_noop: 856 return 857 raise ValueError( 858 f"Connection is already {'enabled' if enabled else 'disabled'}. " 859 f"Current status: {current_status}" 860 ) 861 862 updated_response = api_util.patch_connection( 863 connection_id=self.connection_id, 864 api_root=self.workspace.api_root, 865 client_id=self.workspace.client_id, 866 client_secret=self.workspace.client_secret, 867 bearer_token=self.workspace.bearer_token, 868 status=desired_status, 869 ) 870 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 871 872 # Scheduling 873 874 def set_schedule( 875 self, 876 cron_expression: str, 877 ) -> None: 878 """Set a cron schedule for the connection. 879 880 Args: 881 cron_expression: A cron expression defining when syncs should run. 882 883 Examples: 884 - "0 0 * * *" - Daily at midnight UTC 885 - "0 */6 * * *" - Every 6 hours 886 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 887 """ 888 updated_response = api_util.patch_connection( 889 connection_id=self.connection_id, 890 api_root=self.workspace.api_root, 891 client_id=self.workspace.client_id, 892 client_secret=self.workspace.client_secret, 893 bearer_token=self.workspace.bearer_token, 894 schedule=api_util.build_connection_schedule( 895 schedule_type="cron", 896 cron_expression=cron_expression, 897 ), 898 ) 899 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 900 901 def set_manual_schedule(self) -> None: 902 """Set the connection to manual scheduling. 903 904 Disables automatic syncs. Syncs will only run when manually triggered. 905 """ 906 updated_response = api_util.patch_connection( 907 connection_id=self.connection_id, 908 api_root=self.workspace.api_root, 909 client_id=self.workspace.client_id, 910 client_secret=self.workspace.client_secret, 911 bearer_token=self.workspace.bearer_token, 912 schedule=api_util.build_connection_schedule(schedule_type="manual"), 913 ) 914 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 915 916 # Deletions 917 918 def permanently_delete( 919 self, 920 *, 921 cascade_delete_source: bool = False, 922 cascade_delete_destination: bool = False, 923 ) -> None: 924 """Delete the connection. 925 926 Args: 927 cascade_delete_source: Whether to also delete the source. 928 cascade_delete_destination: Whether to also delete the destination. 929 """ 930 self.workspace.permanently_delete_connection(self) 931 932 if cascade_delete_source: 933 self.workspace.permanently_delete_source(self.source_id) 934 935 if cascade_delete_destination: 936 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
50 def __init__( 51 self, 52 workspace: CloudWorkspace, 53 connection_id: str, 54 source: str | None = None, 55 destination: str | None = None, 56 ) -> None: 57 """It is not recommended to create a `CloudConnection` object directly. 58 59 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 60 """ 61 self.connection_id = connection_id 62 """The ID of the connection.""" 63 64 self.workspace = workspace 65 """The workspace that the connection belongs to.""" 66 67 self._source_id = source 68 """The ID of the source.""" 69 70 self._destination_id = destination 71 """The ID of the destination.""" 72 73 self._connection_info: CloudConnectionInfo | None = None 74 """The connection info object. (Cached.)""" 75 76 self._cloud_source_object: CloudSource | None = None 77 """The source object. (Cached.)""" 78 79 self._cloud_destination_object: CloudDestination | None = None 80 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
153 def check_is_valid(self) -> bool: 154 """Check if this connection exists and belongs to the expected workspace. 155 156 This method fetches connection info from the API (if not already cached) and 157 verifies that the connection's workspace_id matches the workspace associated 158 with this CloudConnection object. 159 160 Returns: 161 True if the connection exists and belongs to the expected workspace. 162 163 Raises: 164 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 165 AirbyteMissingResourceError: If the connection doesn't exist. 166 """ 167 self._fetch_connection_info(force_refresh=False, verify=True) 168 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
189 @property 190 def name(self) -> str | None: 191 """Get the display name of the connection, if available. 192 193 E.g. "My Postgres to Snowflake", not the connection ID. 194 """ 195 if not self._connection_info: 196 self._connection_info = self._fetch_connection_info() 197 198 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
200 @property 201 def source_id(self) -> str: 202 """The ID of the source.""" 203 if not self._source_id: 204 if not self._connection_info: 205 self._connection_info = self._fetch_connection_info() 206 207 self._source_id = self._connection_info.source_id 208 209 return self._source_id
The ID of the source.
211 @property 212 def source(self) -> CloudSource: 213 """Get the source object.""" 214 if self._cloud_source_object: 215 return self._cloud_source_object 216 217 self._cloud_source_object = CloudSource( 218 workspace=self.workspace, 219 connector_id=self.source_id, 220 ) 221 return self._cloud_source_object
Get the source object.
223 @property 224 def destination_id(self) -> str: 225 """The ID of the destination.""" 226 if not self._destination_id: 227 if not self._connection_info: 228 self._connection_info = self._fetch_connection_info() 229 230 self._destination_id = self._connection_info.destination_id 231 232 return self._destination_id
The ID of the destination.
234 @property 235 def destination(self) -> CloudDestination: 236 """Get the destination object.""" 237 if self._cloud_destination_object: 238 return self._cloud_destination_object 239 240 self._cloud_destination_object = CloudDestination( 241 workspace=self.workspace, 242 connector_id=self.destination_id, 243 ) 244 return self._cloud_destination_object
Get the destination object.
246 @property 247 def stream_names(self) -> list[str]: 248 """The stream names.""" 249 if not self._connection_info: 250 self._connection_info = self._fetch_connection_info() 251 252 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
254 @property 255 def table_prefix(self) -> str: 256 """The table prefix.""" 257 if not self._connection_info: 258 self._connection_info = self._fetch_connection_info() 259 260 return self._connection_info.prefix or ""
The table prefix.
262 @property 263 def connection_url(self) -> str | None: 264 """The web URL to the connection.""" 265 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
267 @property 268 def job_history_url(self) -> str | None: 269 """The URL to the job history for the connection.""" 270 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
274 def run_sync( 275 self, 276 *, 277 wait: bool = True, 278 wait_timeout: int = 300, 279 ) -> SyncResult: 280 """Run a sync.""" 281 connection_response = api_util.run_connection( 282 connection_id=self.connection_id, 283 api_root=self.workspace.api_root, 284 workspace_id=self.workspace.workspace_id, 285 client_id=self.workspace.client_id, 286 client_secret=self.workspace.client_secret, 287 bearer_token=self.workspace.bearer_token, 288 ) 289 sync_result = SyncResult( 290 workspace=self.workspace, 291 connection=self, 292 job_id=connection_response.job_id, 293 ) 294 295 if wait: 296 sync_result.wait_for_completion( 297 wait_timeout=wait_timeout, 298 raise_failure=True, 299 raise_timeout=True, 300 ) 301 302 return sync_result
Run a sync.
313 def get_previous_sync_logs( 314 self, 315 *, 316 limit: int = 20, 317 offset: int | None = None, 318 from_tail: bool = True, 319 job_type: str | JobTypeEnum | None = None, 320 ) -> list[SyncResult]: 321 """Get previous sync jobs for a connection with pagination support. 322 323 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 324 rows_synced, start_time). Full log text can be fetched lazily via 325 `SyncResult.get_full_log_text()`. 326 327 Args: 328 limit: Maximum number of jobs to return. Defaults to 20. 329 offset: Number of jobs to skip from the beginning. Defaults to None (0). 330 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 331 If False, returns jobs ordered oldest-first (createdAt ASC). 332 Defaults to True. 333 job_type: Filter by job type (e.g., `sync`, `refresh`). 334 If not specified, defaults to sync and reset jobs only (API default behavior). 335 336 Returns: 337 A list of SyncResult objects representing the sync jobs. 338 """ 339 order_by = ( 340 api_util.JOB_ORDER_BY_CREATED_AT_DESC 341 if from_tail 342 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 343 ) 344 sync_logs = api_util.get_job_logs( 345 connection_id=self.connection_id, 346 api_root=self.workspace.api_root, 347 workspace_id=self.workspace.workspace_id, 348 limit=limit, 349 offset=offset, 350 order_by=order_by, 351 job_type=job_type, 352 client_id=self.workspace.client_id, 353 client_secret=self.workspace.client_secret, 354 bearer_token=self.workspace.bearer_token, 355 ) 356 return [ 357 SyncResult( 358 workspace=self.workspace, 359 connection=self, 360 job_id=sync_log.job_id, 361 _latest_job_info=CloudJobInfo.from_api_response(sync_log), 362 ) 363 for sync_log in sync_logs 364 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
- job_type: Filter by job type (e.g.,
sync,refresh). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:
A list of SyncResult objects representing the sync jobs.
366 def get_sync_result( 367 self, 368 job_id: int | None = None, 369 ) -> SyncResult | None: 370 """Get the sync result for the connection. 371 372 If `job_id` is not provided, the most recent sync job will be used. 373 374 Returns `None` if job_id is omitted and no previous jobs are found. 375 """ 376 if job_id is None: 377 # Get the most recent sync job 378 results = self.get_previous_sync_logs( 379 limit=1, 380 ) 381 if results: 382 return results[0] 383 384 return None 385 386 # Get the sync job by ID (lazy loaded) 387 return SyncResult( 388 workspace=self.workspace, 389 connection=self, 390 job_id=job_id, 391 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
395 @deprecated("Use 'dump_raw_state()' instead.") 396 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 397 """Deprecated. Use `dump_raw_state()` instead.""" 398 state_response = api_util.get_connection_state( 399 connection_id=self.connection_id, 400 api_root=self.workspace.api_root, 401 client_id=self.workspace.client_id, 402 client_secret=self.workspace.client_secret, 403 bearer_token=self.workspace.bearer_token, 404 config_api_root=self.workspace.config_api_root, 405 ) 406 if state_response.get("stateType") == "not_set": 407 return None 408 return state_response.get("streamState", [])
Deprecated. Use dump_raw_state() instead.
416 def dump_raw_state( 417 self, 418 *, 419 normalize: bool = True, 420 ) -> dict[str, Any] | list[dict[str, Any]]: 421 """Dump the state for this connection. 422 423 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 424 with snake_case keys, suitable for passing to a connector's `--state` flag. 425 426 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 427 includes `stateType` and `connectionId`). This raw format can be passed 428 directly to `import_raw_state()` for backup/restore workflows. 429 430 Args: 431 normalize: If `True` (default), convert to Airbyte protocol format. 432 If `False`, return the raw Config API response. 433 434 Returns: 435 Normalized: list of protocol-format state message dicts (empty list if 436 no state). Raw: the full Config API state dict. 437 """ 438 raw = api_util.get_connection_state( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 config_api_root=self.workspace.config_api_root, 445 ) 446 if normalize: 447 return _normalize_state_to_protocol(raw) 448 return raw
Dump the state for this connection.
By default, returns a list of Airbyte protocol AirbyteStateMessage dicts
with snake_case keys, suitable for passing to a connector's --state flag.
When normalize is False, returns the raw Config API dict (camelCase keys,
includes stateType and connectionId). This raw format can be passed
directly to import_raw_state() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API response.
Returns:
Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.
450 def import_raw_state( 451 self, 452 connection_state: dict[str, Any] | list[dict[str, Any]], 453 ) -> dict[str, Any]: 454 """Import (restore) the full state for this connection. 455 456 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 457 > could result in broken connections, and/or incorrect sync behavior. 458 459 Replaces the entire connection state with the provided state blob. 460 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 461 462 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 463 The `connectionId` in the blob is always overridden with this connection's 464 ID, making state blobs portable across connections. 465 466 Accepts either format: 467 468 - **Config API format** (dict with `stateType`): passed through directly. 469 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 470 converted to Config API format before sending. 471 472 Args: 473 connection_state: Connection state in either Config API or Airbyte protocol format. 474 475 Returns: 476 The updated connection state as a dictionary. 477 478 Raises: 479 AirbyteConnectionSyncActiveError: If a sync is currently running on this 480 connection (HTTP 423). Wait for the sync to complete before retrying. 481 """ 482 api_state: dict[str, Any] 483 if isinstance(connection_state, list): 484 if not _is_protocol_state_format(connection_state): 485 msg = ( 486 "Expected connection_state list to contain Airbyte protocol state " 487 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 488 "or LEGACY). Got a list that does not match protocol format." 489 ) 490 raise ValueError(msg) 491 api_state = _denormalize_protocol_state_to_api( 492 protocol_messages=connection_state, 493 connection_id=self.connection_id, 494 ) 495 elif isinstance(connection_state, dict): 496 if _is_protocol_state_format(connection_state): 497 api_state = _denormalize_protocol_state_to_api( 498 protocol_messages=[connection_state], 499 connection_id=self.connection_id, 500 ) 501 else: 502 api_state = connection_state 503 else: 504 msg = f"Expected a dict or list, got {type(connection_state)}" 505 raise TypeError(msg) 506 507 return api_util.replace_connection_state( 508 connection_id=self.connection_id, 509 connection_state_dict=api_state, 510 api_root=self.workspace.api_root, 511 client_id=self.workspace.client_id, 512 client_secret=self.workspace.client_secret, 513 bearer_token=self.workspace.bearer_token, 514 config_api_root=self.workspace.config_api_root, 515 )
Import (restore) the full state for this connection.
⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
This is the counterpart to dump_raw_state() for backup/restore workflows.
The connectionId in the blob is always overridden with this connection's
ID, making state blobs portable across connections.
Accepts either format:
- Config API format (dict with
stateType): passed through directly. - Airbyte protocol format (list of
AirbyteStateMessagedicts): automatically converted to Config API format before sending.
Arguments:
- connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:
The updated connection state as a dictionary.
Raises:
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
517 def get_stream_state( 518 self, 519 stream_name: str, 520 stream_namespace: str | None = None, 521 ) -> dict[str, Any] | None: 522 """Get the state blob for a single stream within this connection. 523 524 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 525 not the full connection state envelope. 526 527 This is compatible with `stream`-type state and stream-level entries 528 within a `global`-type state. It is not compatible with `legacy` state. 529 To get or set the entire connection-level state artifact, use 530 `dump_raw_state` and `import_raw_state` instead. 531 532 Args: 533 stream_name: The name of the stream to get state for. 534 stream_namespace: The source-side stream namespace. This refers to the 535 namespace from the source (e.g., database schema), not any destination 536 namespace override set in connection advanced settings. 537 538 Returns: 539 The stream's state blob as a dictionary, or None if the stream is not found. 540 """ 541 state_data = self.dump_raw_state(normalize=False) 542 result = ConnectionStateResponse(**state_data) 543 544 streams = _get_stream_list(result) 545 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 546 547 if not matching: 548 available = [s.stream_descriptor.name for s in streams] 549 logger.warning( 550 "Stream '%s' not found in connection state for connection '%s'. " 551 "Available streams: %s", 552 stream_name, 553 self.connection_id, 554 available, 555 ) 556 return None 557 558 return matching[0].stream_state
Get the state blob for a single stream within this connection.
Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Arguments:
- stream_name: The name of the stream to get state for.
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:
The stream's state blob as a dictionary, or None if the stream is not found.
560 def set_stream_state( 561 self, 562 stream_name: str, 563 state_blob_dict: dict[str, Any], 564 stream_namespace: str | None = None, 565 ) -> None: 566 """Set the state for a single stream within this connection. 567 568 Fetches the current full state, replaces only the specified stream's state, 569 then sends the full updated state back to the API. If the stream does not 570 exist in the current state, it is appended. 571 572 This is compatible with `stream`-type state and stream-level entries 573 within a `global`-type state. It is not compatible with `legacy` state. 574 To get or set the entire connection-level state artifact, use 575 `dump_raw_state` and `import_raw_state` instead. 576 577 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 578 579 Args: 580 stream_name: The name of the stream to update state for. 581 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 582 stream_namespace: The source-side stream namespace. This refers to the 583 namespace from the source (e.g., database schema), not any destination 584 namespace override set in connection advanced settings. 585 586 Raises: 587 PyAirbyteInputError: If the connection state type is not supported for 588 stream-level operations (not_set, legacy). 589 AirbyteConnectionSyncActiveError: If a sync is currently running on this 590 connection (HTTP 423). Wait for the sync to complete before retrying. 591 """ 592 state_data = self.dump_raw_state(normalize=False) 593 current = ConnectionStateResponse(**state_data) 594 595 if current.state_type == "not_set": 596 raise PyAirbyteInputError( 597 message="Cannot set stream state: connection has no existing state.", 598 context={"connection_id": self.connection_id}, 599 ) 600 601 if current.state_type == "legacy": 602 raise PyAirbyteInputError( 603 message="Cannot set stream state on a legacy-type connection state.", 604 context={"connection_id": self.connection_id}, 605 ) 606 607 new_stream_entry = { 608 "streamDescriptor": { 609 "name": stream_name, 610 **( 611 { 612 "namespace": stream_namespace, 613 } 614 if stream_namespace 615 else {} 616 ), 617 }, 618 "streamState": state_blob_dict, 619 } 620 621 raw_streams: list[dict[str, Any]] 622 if current.state_type == "stream": 623 raw_streams = state_data.get("streamState", []) 624 elif current.state_type == "global": 625 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 626 else: 627 raw_streams = [] 628 629 streams = _get_stream_list(current) 630 found = False 631 updated_streams_raw: list[dict[str, Any]] = [] 632 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 633 if _match_stream(parsed_s, stream_name, stream_namespace): 634 updated_streams_raw.append(new_stream_entry) 635 found = True 636 else: 637 updated_streams_raw.append(raw_s) 638 639 if not found: 640 updated_streams_raw.append(new_stream_entry) 641 642 full_state: dict[str, Any] = { 643 **state_data, 644 } 645 646 if current.state_type == "stream": 647 full_state["streamState"] = updated_streams_raw 648 elif current.state_type == "global": 649 original_global = state_data.get("globalState", {}) 650 full_state["globalState"] = { 651 **original_global, 652 "streamStates": updated_streams_raw, 653 } 654 655 self.import_raw_state(full_state)
Set the state for a single stream within this connection.
Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Uses the safe variant that prevents updates while a sync is running (HTTP 423).
Arguments:
- stream_name: The name of the stream to update state for.
- state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
- PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
657 @deprecated("Use 'dump_raw_catalog()' instead.") 658 def get_catalog_artifact(self) -> dict[str, Any] | None: 659 """Get the configured catalog for this connection. 660 661 Returns the full configured catalog (syncCatalog) for this connection, 662 including stream schemas, sync modes, cursor fields, and primary keys. 663 664 Uses the Config API endpoint: POST /v1/web_backend/connections/get 665 666 Returns: 667 Dictionary containing the configured catalog, or `None` if not found. 668 """ 669 return self.dump_raw_catalog()
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
671 def dump_raw_catalog( 672 self, 673 *, 674 normalize: bool = True, 675 ) -> dict[str, Any] | None: 676 """Dump the configured catalog for this connection. 677 678 By default, returns the catalog in Airbyte protocol format 679 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 680 to a connector's `--catalog` flag. 681 682 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 683 Config API (camelCase keys, nested `config` block). This raw format can be 684 passed directly to `import_raw_catalog()` for backup/restore workflows. 685 686 Args: 687 normalize: If `True` (default), convert to Airbyte protocol format. 688 If `False`, return the raw Config API catalog. 689 690 Returns: 691 The configured catalog dict, or `None` if not found. 692 """ 693 connection_response = api_util.get_connection_catalog( 694 connection_id=self.connection_id, 695 api_root=self.workspace.api_root, 696 client_id=self.workspace.client_id, 697 client_secret=self.workspace.client_secret, 698 bearer_token=self.workspace.bearer_token, 699 config_api_root=self.workspace.config_api_root, 700 ) 701 raw = connection_response.get("syncCatalog") 702 if raw is None: 703 return None 704 if normalize: 705 return _normalize_catalog_to_protocol(raw) 706 return raw
Dump the configured catalog for this connection.
By default, returns the catalog in Airbyte protocol format
(ConfiguredAirbyteCatalog with snake_case keys), suitable for passing
to a connector's --catalog flag.
When normalize is False, returns the raw syncCatalog dict from the
Config API (camelCase keys, nested config block). This raw format can be
passed directly to import_raw_catalog() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API catalog.
Returns:
The configured catalog dict, or
Noneif not found.
708 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 709 """Replace the configured catalog for this connection. 710 711 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 712 > could result in broken connections, and/or incorrect sync behavior. 713 714 Accepts a configured catalog dict and replaces the connection's entire 715 catalog with it. All other connection settings remain unchanged. 716 717 Accepts either format: 718 719 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 720 passed through directly. 721 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 722 automatically converted to Config API format before sending. 723 724 Args: 725 catalog: The configured catalog dict in either format. 726 """ 727 if _is_protocol_catalog_format(catalog): 728 catalog = _denormalize_catalog_to_api(catalog) 729 730 api_util.replace_connection_catalog( 731 connection_id=self.connection_id, 732 configured_catalog_dict=catalog, 733 api_root=self.workspace.api_root, 734 client_id=self.workspace.client_id, 735 client_secret=self.workspace.client_secret, 736 bearer_token=self.workspace.bearer_token, 737 config_api_root=self.workspace.config_api_root, 738 )
Replace the configured catalog for this connection.
⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.
Accepts either format:
- Config API format (
syncCatalogwith camelCase keys and nestedconfig): passed through directly. - Airbyte protocol format (
ConfiguredAirbyteCatalogwith snake_case keys): automatically converted to Config API format before sending.
Arguments:
- catalog: The configured catalog dict in either format.
740 def rename(self, name: str) -> CloudConnection: 741 """Rename the connection. 742 743 Args: 744 name: New name for the connection 745 746 Returns: 747 Updated CloudConnection object with refreshed info 748 """ 749 updated_response = api_util.patch_connection( 750 connection_id=self.connection_id, 751 api_root=self.workspace.api_root, 752 client_id=self.workspace.client_id, 753 client_secret=self.workspace.client_secret, 754 bearer_token=self.workspace.bearer_token, 755 name=name, 756 ) 757 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 758 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
760 def set_table_prefix(self, prefix: str) -> CloudConnection: 761 """Set the table prefix for the connection. 762 763 Args: 764 prefix: New table prefix to use when syncing to the destination 765 766 Returns: 767 Updated CloudConnection object with refreshed info 768 """ 769 updated_response = api_util.patch_connection( 770 connection_id=self.connection_id, 771 api_root=self.workspace.api_root, 772 client_id=self.workspace.client_id, 773 client_secret=self.workspace.client_secret, 774 bearer_token=self.workspace.bearer_token, 775 prefix=prefix, 776 ) 777 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 778 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
780 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 781 """Set the selected streams for the connection. 782 783 This is a destructive operation that can break existing connections if the 784 stream selection is changed incorrectly. Use with caution. 785 786 Args: 787 stream_names: List of stream names to sync 788 789 Returns: 790 Updated CloudConnection object with refreshed info 791 """ 792 configurations = api_util.build_stream_configurations(stream_names) 793 794 updated_response = api_util.patch_connection( 795 connection_id=self.connection_id, 796 api_root=self.workspace.api_root, 797 client_id=self.workspace.client_id, 798 client_secret=self.workspace.client_secret, 799 bearer_token=self.workspace.bearer_token, 800 configurations=configurations, 801 ) 802 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 803 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
807 @property 808 def enabled(self) -> bool: 809 """Get the current enabled status of the connection. 810 811 This property always fetches fresh data from the API to ensure accuracy, 812 as another process or user may have toggled the setting. 813 814 Returns: 815 True if the connection status is 'active', False otherwise. 816 """ 817 connection_info = self._fetch_connection_info(force_refresh=True) 818 return connection_info.status == "active"
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
830 def set_enabled( 831 self, 832 *, 833 enabled: bool, 834 ignore_noop: bool = True, 835 ) -> None: 836 """Set the enabled status of the connection. 837 838 Args: 839 enabled: True to enable (set status to 'active'), False to disable 840 (set status to 'inactive'). 841 ignore_noop: If True (default), silently return if the connection is already 842 in the requested state. If False, raise ValueError when the requested 843 state matches the current state. 844 845 Raises: 846 ValueError: If ignore_noop is False and the connection is already in the 847 requested state. 848 """ 849 # Always fetch fresh data to check current status 850 connection_info = self._fetch_connection_info(force_refresh=True) 851 current_status = connection_info.status 852 desired_status = "active" if enabled else "inactive" 853 854 if current_status == desired_status: 855 if ignore_noop: 856 return 857 raise ValueError( 858 f"Connection is already {'enabled' if enabled else 'disabled'}. " 859 f"Current status: {current_status}" 860 ) 861 862 updated_response = api_util.patch_connection( 863 connection_id=self.connection_id, 864 api_root=self.workspace.api_root, 865 client_id=self.workspace.client_id, 866 client_secret=self.workspace.client_secret, 867 bearer_token=self.workspace.bearer_token, 868 status=desired_status, 869 ) 870 self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
874 def set_schedule( 875 self, 876 cron_expression: str, 877 ) -> None: 878 """Set a cron schedule for the connection. 879 880 Args: 881 cron_expression: A cron expression defining when syncs should run. 882 883 Examples: 884 - "0 0 * * *" - Daily at midnight UTC 885 - "0 */6 * * *" - Every 6 hours 886 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 887 """ 888 updated_response = api_util.patch_connection( 889 connection_id=self.connection_id, 890 api_root=self.workspace.api_root, 891 client_id=self.workspace.client_id, 892 client_secret=self.workspace.client_secret, 893 bearer_token=self.workspace.bearer_token, 894 schedule=api_util.build_connection_schedule( 895 schedule_type="cron", 896 cron_expression=cron_expression, 897 ), 898 ) 899 self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
901 def set_manual_schedule(self) -> None: 902 """Set the connection to manual scheduling. 903 904 Disables automatic syncs. Syncs will only run when manually triggered. 905 """ 906 updated_response = api_util.patch_connection( 907 connection_id=self.connection_id, 908 api_root=self.workspace.api_root, 909 client_id=self.workspace.client_id, 910 client_secret=self.workspace.client_secret, 911 bearer_token=self.workspace.bearer_token, 912 schedule=api_util.build_connection_schedule(schedule_type="manual"), 913 ) 914 self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
918 def permanently_delete( 919 self, 920 *, 921 cascade_delete_source: bool = False, 922 cascade_delete_destination: bool = False, 923 ) -> None: 924 """Delete the connection. 925 926 Args: 927 cascade_delete_source: Whether to also delete the source. 928 cascade_delete_destination: Whether to also delete the destination. 929 """ 930 self.workspace.permanently_delete_connection(self) 931 932 if cascade_delete_source: 933 self.workspace.permanently_delete_source(self.source_id) 934 935 if cascade_delete_destination: 936 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.