airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6import logging
  7from typing import TYPE_CHECKING, Any, Literal, overload
  8
  9from typing_extensions import deprecated
 10
 11from airbyte._util import api_util
 12from airbyte.cloud._connection_catalog import (
 13    _denormalize_catalog_to_api,
 14    _is_protocol_catalog_format,
 15    _normalize_catalog_to_protocol,
 16)
 17from airbyte.cloud._connection_state import (
 18    ConnectionStateResponse,
 19    _denormalize_protocol_state_to_api,
 20    _get_stream_list,
 21    _is_protocol_state_format,
 22    _match_stream,
 23    _normalize_state_to_protocol,
 24)
 25from airbyte.cloud.connectors import CloudDestination, CloudSource
 26from airbyte.cloud.models import (
 27    CloudConnectionInfo,
 28    CloudJobInfo,
 29    JobTypeEnum,
 30    _ConnectionResponseLike,
 31)
 32from airbyte.cloud.sync_results import SyncResult
 33from airbyte.exceptions import AirbyteWorkspaceMismatchError, PyAirbyteInputError
 34
 35
 36logger = logging.getLogger(__name__)
 37
 38
 39if TYPE_CHECKING:
 40    from airbyte.cloud.workspaces import CloudWorkspace
 41
 42
 43class CloudConnection:  # noqa: PLR0904  # Too many public methods
 44    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 45
 46    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 47    """
 48
 49    def __init__(
 50        self,
 51        workspace: CloudWorkspace,
 52        connection_id: str,
 53        source: str | None = None,
 54        destination: str | None = None,
 55    ) -> None:
 56        """It is not recommended to create a `CloudConnection` object directly.
 57
 58        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 59        """
 60        self.connection_id = connection_id
 61        """The ID of the connection."""
 62
 63        self.workspace = workspace
 64        """The workspace that the connection belongs to."""
 65
 66        self._source_id = source
 67        """The ID of the source."""
 68
 69        self._destination_id = destination
 70        """The ID of the destination."""
 71
 72        self._connection_info: CloudConnectionInfo | None = None
 73        """The connection info object. (Cached.)"""
 74
 75        self._cloud_source_object: CloudSource | None = None
 76        """The source object. (Cached.)"""
 77
 78        self._cloud_destination_object: CloudDestination | None = None
 79        """The destination object. (Cached.)"""
 80
 81    def _fetch_connection_info(
 82        self,
 83        *,
 84        force_refresh: bool = False,
 85        verify: bool = True,
 86    ) -> CloudConnectionInfo:
 87        """Fetch and cache connection info from the API.
 88
 89        By default, this method will only fetch from the API if connection info is not
 90        already cached. It also verifies that the connection belongs to the expected
 91        workspace unless verification is explicitly disabled.
 92
 93        Args:
 94            force_refresh: If True, always fetch from the API even if cached.
 95                If False (default), only fetch if not already cached.
 96            verify: If True (default), verify that the connection is valid (e.g., that
 97                the workspace_id matches this object's workspace). Raises an error if
 98                validation fails.
 99
100        Returns:
101            Information about the connection from the API.
102
103        Raises:
104            AirbyteWorkspaceMismatchError: If verify is True and the connection's
105                workspace_id doesn't match the expected workspace.
106            AirbyteMissingResourceError: If the connection doesn't exist.
107        """
108        if not force_refresh and self._connection_info is not None:
109            # Use cached info, but still verify if requested
110            if verify:
111                self._verify_workspace_match(self._connection_info)
112            return self._connection_info
113
114        # Fetch from API
115        connection_info = api_util.get_connection(
116            workspace_id=self.workspace.workspace_id,
117            connection_id=self.connection_id,
118            api_root=self.workspace.api_root,
119            client_id=self.workspace.client_id,
120            client_secret=self.workspace.client_secret,
121            bearer_token=self.workspace.bearer_token,
122        )
123        result = CloudConnectionInfo.from_api_response(connection_info)
124
125        self._connection_info = result
126
127        # Verify if requested
128        if verify:
129            self._verify_workspace_match(result)
130
131        return result
132
133    def _verify_workspace_match(self, connection_info: CloudConnectionInfo) -> None:
134        """Verify that the connection belongs to the expected workspace.
135
136        Raises:
137            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
138        """
139        if connection_info.workspace_id != self.workspace.workspace_id:
140            raise AirbyteWorkspaceMismatchError(
141                resource_type="connection",
142                resource_id=self.connection_id,
143                workspace=self.workspace,
144                expected_workspace_id=self.workspace.workspace_id,
145                actual_workspace_id=connection_info.workspace_id,
146                message=(
147                    f"Connection '{self.connection_id}' belongs to workspace "
148                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
149                ),
150            )
151
152    def check_is_valid(self) -> bool:
153        """Check if this connection exists and belongs to the expected workspace.
154
155        This method fetches connection info from the API (if not already cached) and
156        verifies that the connection's workspace_id matches the workspace associated
157        with this CloudConnection object.
158
159        Returns:
160            True if the connection exists and belongs to the expected workspace.
161
162        Raises:
163            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
164            AirbyteMissingResourceError: If the connection doesn't exist.
165        """
166        self._fetch_connection_info(force_refresh=False, verify=True)
167        return True
168
169    @classmethod
170    def _from_connection_response(
171        cls,
172        workspace: CloudWorkspace,
173        connection_response: _ConnectionResponseLike,
174    ) -> CloudConnection:
175        """Create a CloudConnection from an API connection response."""
176        connection_info = CloudConnectionInfo.from_api_response(connection_response)
177        result = cls(
178            workspace=workspace,
179            connection_id=connection_info.connection_id,
180            source=connection_info.source_id,
181            destination=connection_info.destination_id,
182        )
183        result._connection_info = connection_info  # noqa: SLF001 # Accessing Non-Public API
184        return result
185
186    # Properties
187
188    @property
189    def name(self) -> str | None:
190        """Get the display name of the connection, if available.
191
192        E.g. "My Postgres to Snowflake", not the connection ID.
193        """
194        if not self._connection_info:
195            self._connection_info = self._fetch_connection_info()
196
197        return self._connection_info.name
198
199    @property
200    def source_id(self) -> str:
201        """The ID of the source."""
202        if not self._source_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._source_id = self._connection_info.source_id
207
208        return self._source_id
209
210    @property
211    def source(self) -> CloudSource:
212        """Get the source object."""
213        if self._cloud_source_object:
214            return self._cloud_source_object
215
216        self._cloud_source_object = CloudSource(
217            workspace=self.workspace,
218            connector_id=self.source_id,
219        )
220        return self._cloud_source_object
221
222    @property
223    def destination_id(self) -> str:
224        """The ID of the destination."""
225        if not self._destination_id:
226            if not self._connection_info:
227                self._connection_info = self._fetch_connection_info()
228
229            self._destination_id = self._connection_info.destination_id
230
231        return self._destination_id
232
233    @property
234    def destination(self) -> CloudDestination:
235        """Get the destination object."""
236        if self._cloud_destination_object:
237            return self._cloud_destination_object
238
239        self._cloud_destination_object = CloudDestination(
240            workspace=self.workspace,
241            connector_id=self.destination_id,
242        )
243        return self._cloud_destination_object
244
245    @property
246    def stream_names(self) -> list[str]:
247        """The stream names."""
248        if not self._connection_info:
249            self._connection_info = self._fetch_connection_info()
250
251        return [stream.name for stream in self._connection_info.configurations.streams or []]
252
253    @property
254    def table_prefix(self) -> str:
255        """The table prefix."""
256        if not self._connection_info:
257            self._connection_info = self._fetch_connection_info()
258
259        return self._connection_info.prefix or ""
260
261    @property
262    def connection_url(self) -> str | None:
263        """The web URL to the connection."""
264        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
265
266    @property
267    def job_history_url(self) -> str | None:
268        """The URL to the job history for the connection."""
269        return f"{self.connection_url}/timeline"
270
271    # Run Sync
272
273    def run_sync(
274        self,
275        *,
276        wait: bool = True,
277        wait_timeout: int = 300,
278    ) -> SyncResult:
279        """Run a sync."""
280        connection_response = api_util.run_connection(
281            connection_id=self.connection_id,
282            api_root=self.workspace.api_root,
283            workspace_id=self.workspace.workspace_id,
284            client_id=self.workspace.client_id,
285            client_secret=self.workspace.client_secret,
286            bearer_token=self.workspace.bearer_token,
287        )
288        sync_result = SyncResult(
289            workspace=self.workspace,
290            connection=self,
291            job_id=connection_response.job_id,
292        )
293
294        if wait:
295            sync_result.wait_for_completion(
296                wait_timeout=wait_timeout,
297                raise_failure=True,
298                raise_timeout=True,
299            )
300
301        return sync_result
302
303    def __repr__(self) -> str:
304        """String representation of the connection."""
305        return (
306            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
307            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
308        )
309
310    # Logs
311
312    def get_previous_sync_logs(
313        self,
314        *,
315        limit: int = 20,
316        offset: int | None = None,
317        from_tail: bool = True,
318        job_type: str | JobTypeEnum | None = None,
319    ) -> list[SyncResult]:
320        """Get previous sync jobs for a connection with pagination support.
321
322        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
323        rows_synced, start_time). Full log text can be fetched lazily via
324        `SyncResult.get_full_log_text()`.
325
326        Args:
327            limit: Maximum number of jobs to return. Defaults to 20.
328            offset: Number of jobs to skip from the beginning. Defaults to None (0).
329            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
330                If False, returns jobs ordered oldest-first (createdAt ASC).
331                Defaults to True.
332            job_type: Filter by job type (e.g., `sync`, `refresh`).
333                If not specified, defaults to sync and reset jobs only (API default behavior).
334
335        Returns:
336            A list of SyncResult objects representing the sync jobs.
337        """
338        order_by = (
339            api_util.JOB_ORDER_BY_CREATED_AT_DESC
340            if from_tail
341            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
342        )
343        sync_logs = api_util.get_job_logs(
344            connection_id=self.connection_id,
345            api_root=self.workspace.api_root,
346            workspace_id=self.workspace.workspace_id,
347            limit=limit,
348            offset=offset,
349            order_by=order_by,
350            job_type=job_type,
351            client_id=self.workspace.client_id,
352            client_secret=self.workspace.client_secret,
353            bearer_token=self.workspace.bearer_token,
354        )
355        return [
356            SyncResult(
357                workspace=self.workspace,
358                connection=self,
359                job_id=sync_log.job_id,
360                _latest_job_info=CloudJobInfo.from_api_response(sync_log),
361            )
362            for sync_log in sync_logs
363        ]
364
365    def get_sync_result(
366        self,
367        job_id: int | None = None,
368    ) -> SyncResult | None:
369        """Get the sync result for the connection.
370
371        If `job_id` is not provided, the most recent sync job will be used.
372
373        Returns `None` if job_id is omitted and no previous jobs are found.
374        """
375        if job_id is None:
376            # Get the most recent sync job
377            results = self.get_previous_sync_logs(
378                limit=1,
379            )
380            if results:
381                return results[0]
382
383            return None
384
385        # Get the sync job by ID (lazy loaded)
386        return SyncResult(
387            workspace=self.workspace,
388            connection=self,
389            job_id=job_id,
390        )
391
392    # Artifacts
393
394    @deprecated("Use 'dump_raw_state()' instead.")
395    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
396        """Deprecated. Use `dump_raw_state()` instead."""
397        state_response = api_util.get_connection_state(
398            connection_id=self.connection_id,
399            api_root=self.workspace.api_root,
400            client_id=self.workspace.client_id,
401            client_secret=self.workspace.client_secret,
402            bearer_token=self.workspace.bearer_token,
403            config_api_root=self.workspace.config_api_root,
404        )
405        if state_response.get("stateType") == "not_set":
406            return None
407        return state_response.get("streamState", [])
408
409    @overload
410    def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ...
411
412    @overload
413    def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ...
414
415    def dump_raw_state(
416        self,
417        *,
418        normalize: bool = True,
419    ) -> dict[str, Any] | list[dict[str, Any]]:
420        """Dump the state for this connection.
421
422        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
423        with snake_case keys, suitable for passing to a connector's `--state` flag.
424
425        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
426        includes `stateType` and `connectionId`). This raw format can be passed
427        directly to `import_raw_state()` for backup/restore workflows.
428
429        Args:
430            normalize: If `True` (default), convert to Airbyte protocol format.
431                If `False`, return the raw Config API response.
432
433        Returns:
434            Normalized: list of protocol-format state message dicts (empty list if
435            no state). Raw: the full Config API state dict.
436        """
437        raw = api_util.get_connection_state(
438            connection_id=self.connection_id,
439            api_root=self.workspace.api_root,
440            client_id=self.workspace.client_id,
441            client_secret=self.workspace.client_secret,
442            bearer_token=self.workspace.bearer_token,
443            config_api_root=self.workspace.config_api_root,
444        )
445        if normalize:
446            return _normalize_state_to_protocol(raw)
447        return raw
448
449    def import_raw_state(
450        self,
451        connection_state: dict[str, Any] | list[dict[str, Any]],
452    ) -> dict[str, Any]:
453        """Import (restore) the full state for this connection.
454
455        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
456        > could result in broken connections, and/or incorrect sync behavior.
457
458        Replaces the entire connection state with the provided state blob.
459        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
460
461        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
462        The `connectionId` in the blob is always overridden with this connection's
463        ID, making state blobs portable across connections.
464
465        Accepts either format:
466
467        - **Config API format** (dict with `stateType`): passed through directly.
468        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
469          converted to Config API format before sending.
470
471        Args:
472            connection_state: Connection state in either Config API or Airbyte protocol format.
473
474        Returns:
475            The updated connection state as a dictionary.
476
477        Raises:
478            AirbyteConnectionSyncActiveError: If a sync is currently running on this
479                connection (HTTP 423). Wait for the sync to complete before retrying.
480        """
481        api_state: dict[str, Any]
482        if isinstance(connection_state, list):
483            if not _is_protocol_state_format(connection_state):
484                msg = (
485                    "Expected connection_state list to contain Airbyte protocol state "
486                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
487                    "or LEGACY). Got a list that does not match protocol format."
488                )
489                raise ValueError(msg)
490            api_state = _denormalize_protocol_state_to_api(
491                protocol_messages=connection_state,
492                connection_id=self.connection_id,
493            )
494        elif isinstance(connection_state, dict):
495            if _is_protocol_state_format(connection_state):
496                api_state = _denormalize_protocol_state_to_api(
497                    protocol_messages=[connection_state],
498                    connection_id=self.connection_id,
499                )
500            else:
501                api_state = connection_state
502        else:
503            msg = f"Expected a dict or list, got {type(connection_state)}"
504            raise TypeError(msg)
505
506        return api_util.replace_connection_state(
507            connection_id=self.connection_id,
508            connection_state_dict=api_state,
509            api_root=self.workspace.api_root,
510            client_id=self.workspace.client_id,
511            client_secret=self.workspace.client_secret,
512            bearer_token=self.workspace.bearer_token,
513            config_api_root=self.workspace.config_api_root,
514        )
515
516    def get_stream_state(
517        self,
518        stream_name: str,
519        stream_namespace: str | None = None,
520    ) -> dict[str, Any] | None:
521        """Get the state blob for a single stream within this connection.
522
523        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
524        not the full connection state envelope.
525
526        This is compatible with `stream`-type state and stream-level entries
527        within a `global`-type state. It is not compatible with `legacy` state.
528        To get or set the entire connection-level state artifact, use
529        `dump_raw_state` and `import_raw_state` instead.
530
531        Args:
532            stream_name: The name of the stream to get state for.
533            stream_namespace: The source-side stream namespace. This refers to the
534                namespace from the source (e.g., database schema), not any destination
535                namespace override set in connection advanced settings.
536
537        Returns:
538            The stream's state blob as a dictionary, or None if the stream is not found.
539        """
540        state_data = self.dump_raw_state(normalize=False)
541        result = ConnectionStateResponse(**state_data)
542
543        streams = _get_stream_list(result)
544        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
545
546        if not matching:
547            available = [s.stream_descriptor.name for s in streams]
548            logger.warning(
549                "Stream '%s' not found in connection state for connection '%s'. "
550                "Available streams: %s",
551                stream_name,
552                self.connection_id,
553                available,
554            )
555            return None
556
557        return matching[0].stream_state
558
559    def set_stream_state(
560        self,
561        stream_name: str,
562        state_blob_dict: dict[str, Any],
563        stream_namespace: str | None = None,
564    ) -> None:
565        """Set the state for a single stream within this connection.
566
567        Fetches the current full state, replaces only the specified stream's state,
568        then sends the full updated state back to the API. If the stream does not
569        exist in the current state, it is appended.
570
571        This is compatible with `stream`-type state and stream-level entries
572        within a `global`-type state. It is not compatible with `legacy` state.
573        To get or set the entire connection-level state artifact, use
574        `dump_raw_state` and `import_raw_state` instead.
575
576        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
577
578        Args:
579            stream_name: The name of the stream to update state for.
580            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
581            stream_namespace: The source-side stream namespace. This refers to the
582                namespace from the source (e.g., database schema), not any destination
583                namespace override set in connection advanced settings.
584
585        Raises:
586            PyAirbyteInputError: If the connection state type is not supported for
587                stream-level operations (not_set, legacy).
588            AirbyteConnectionSyncActiveError: If a sync is currently running on this
589                connection (HTTP 423). Wait for the sync to complete before retrying.
590        """
591        state_data = self.dump_raw_state(normalize=False)
592        current = ConnectionStateResponse(**state_data)
593
594        if current.state_type == "not_set":
595            raise PyAirbyteInputError(
596                message="Cannot set stream state: connection has no existing state.",
597                context={"connection_id": self.connection_id},
598            )
599
600        if current.state_type == "legacy":
601            raise PyAirbyteInputError(
602                message="Cannot set stream state on a legacy-type connection state.",
603                context={"connection_id": self.connection_id},
604            )
605
606        new_stream_entry = {
607            "streamDescriptor": {
608                "name": stream_name,
609                **(
610                    {
611                        "namespace": stream_namespace,
612                    }
613                    if stream_namespace
614                    else {}
615                ),
616            },
617            "streamState": state_blob_dict,
618        }
619
620        raw_streams: list[dict[str, Any]]
621        if current.state_type == "stream":
622            raw_streams = state_data.get("streamState", [])
623        elif current.state_type == "global":
624            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
625        else:
626            raw_streams = []
627
628        streams = _get_stream_list(current)
629        found = False
630        updated_streams_raw: list[dict[str, Any]] = []
631        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
632            if _match_stream(parsed_s, stream_name, stream_namespace):
633                updated_streams_raw.append(new_stream_entry)
634                found = True
635            else:
636                updated_streams_raw.append(raw_s)
637
638        if not found:
639            updated_streams_raw.append(new_stream_entry)
640
641        full_state: dict[str, Any] = {
642            **state_data,
643        }
644
645        if current.state_type == "stream":
646            full_state["streamState"] = updated_streams_raw
647        elif current.state_type == "global":
648            original_global = state_data.get("globalState", {})
649            full_state["globalState"] = {
650                **original_global,
651                "streamStates": updated_streams_raw,
652            }
653
654        self.import_raw_state(full_state)
655
656    @deprecated("Use 'dump_raw_catalog()' instead.")
657    def get_catalog_artifact(self) -> dict[str, Any] | None:
658        """Get the configured catalog for this connection.
659
660        Returns the full configured catalog (syncCatalog) for this connection,
661        including stream schemas, sync modes, cursor fields, and primary keys.
662
663        Uses the Config API endpoint: POST /v1/web_backend/connections/get
664
665        Returns:
666            Dictionary containing the configured catalog, or `None` if not found.
667        """
668        return self.dump_raw_catalog()
669
670    def dump_raw_catalog(
671        self,
672        *,
673        normalize: bool = True,
674    ) -> dict[str, Any] | None:
675        """Dump the configured catalog for this connection.
676
677        By default, returns the catalog in Airbyte protocol format
678        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
679        to a connector's `--catalog` flag.
680
681        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
682        Config API (camelCase keys, nested `config` block). This raw format can be
683        passed directly to `import_raw_catalog()` for backup/restore workflows.
684
685        Args:
686            normalize: If `True` (default), convert to Airbyte protocol format.
687                If `False`, return the raw Config API catalog.
688
689        Returns:
690            The configured catalog dict, or `None` if not found.
691        """
692        connection_response = api_util.get_connection_catalog(
693            connection_id=self.connection_id,
694            api_root=self.workspace.api_root,
695            client_id=self.workspace.client_id,
696            client_secret=self.workspace.client_secret,
697            bearer_token=self.workspace.bearer_token,
698            config_api_root=self.workspace.config_api_root,
699        )
700        raw = connection_response.get("syncCatalog")
701        if raw is None:
702            return None
703        if normalize:
704            return _normalize_catalog_to_protocol(raw)
705        return raw
706
707    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
708        """Replace the configured catalog for this connection.
709
710        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
711        > could result in broken connections, and/or incorrect sync behavior.
712
713        Accepts a configured catalog dict and replaces the connection's entire
714        catalog with it. All other connection settings remain unchanged.
715
716        Accepts either format:
717
718        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
719          passed through directly.
720        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
721          automatically converted to Config API format before sending.
722
723        Args:
724            catalog: The configured catalog dict in either format.
725        """
726        if _is_protocol_catalog_format(catalog):
727            catalog = _denormalize_catalog_to_api(catalog)
728
729        api_util.replace_connection_catalog(
730            connection_id=self.connection_id,
731            configured_catalog_dict=catalog,
732            api_root=self.workspace.api_root,
733            client_id=self.workspace.client_id,
734            client_secret=self.workspace.client_secret,
735            bearer_token=self.workspace.bearer_token,
736            config_api_root=self.workspace.config_api_root,
737        )
738
739    def rename(self, name: str) -> CloudConnection:
740        """Rename the connection.
741
742        Args:
743            name: New name for the connection
744
745        Returns:
746            Updated CloudConnection object with refreshed info
747        """
748        updated_response = api_util.patch_connection(
749            connection_id=self.connection_id,
750            api_root=self.workspace.api_root,
751            client_id=self.workspace.client_id,
752            client_secret=self.workspace.client_secret,
753            bearer_token=self.workspace.bearer_token,
754            name=name,
755        )
756        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
757        return self
758
759    def set_table_prefix(self, prefix: str) -> CloudConnection:
760        """Set the table prefix for the connection.
761
762        Args:
763            prefix: New table prefix to use when syncing to the destination
764
765        Returns:
766            Updated CloudConnection object with refreshed info
767        """
768        updated_response = api_util.patch_connection(
769            connection_id=self.connection_id,
770            api_root=self.workspace.api_root,
771            client_id=self.workspace.client_id,
772            client_secret=self.workspace.client_secret,
773            bearer_token=self.workspace.bearer_token,
774            prefix=prefix,
775        )
776        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
777        return self
778
779    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
780        """Set the selected streams for the connection.
781
782        This is a destructive operation that can break existing connections if the
783        stream selection is changed incorrectly. Use with caution.
784
785        Args:
786            stream_names: List of stream names to sync
787
788        Returns:
789            Updated CloudConnection object with refreshed info
790        """
791        configurations = api_util.build_stream_configurations(stream_names)
792
793        updated_response = api_util.patch_connection(
794            connection_id=self.connection_id,
795            api_root=self.workspace.api_root,
796            client_id=self.workspace.client_id,
797            client_secret=self.workspace.client_secret,
798            bearer_token=self.workspace.bearer_token,
799            configurations=configurations,
800        )
801        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
802        return self
803
804    # Enable/Disable
805
806    @property
807    def enabled(self) -> bool:
808        """Get the current enabled status of the connection.
809
810        This property always fetches fresh data from the API to ensure accuracy,
811        as another process or user may have toggled the setting.
812
813        Returns:
814            True if the connection status is 'active', False otherwise.
815        """
816        connection_info = self._fetch_connection_info(force_refresh=True)
817        return connection_info.status == "active"
818
819    @enabled.setter
820    def enabled(self, value: bool) -> None:
821        """Set the enabled status of the connection.
822
823        Args:
824            value: True to enable (set status to 'active'), False to disable
825                (set status to 'inactive').
826        """
827        self.set_enabled(enabled=value)
828
829    def set_enabled(
830        self,
831        *,
832        enabled: bool,
833        ignore_noop: bool = True,
834    ) -> None:
835        """Set the enabled status of the connection.
836
837        Args:
838            enabled: True to enable (set status to 'active'), False to disable
839                (set status to 'inactive').
840            ignore_noop: If True (default), silently return if the connection is already
841                in the requested state. If False, raise ValueError when the requested
842                state matches the current state.
843
844        Raises:
845            ValueError: If ignore_noop is False and the connection is already in the
846                requested state.
847        """
848        # Always fetch fresh data to check current status
849        connection_info = self._fetch_connection_info(force_refresh=True)
850        current_status = connection_info.status
851        desired_status = "active" if enabled else "inactive"
852
853        if current_status == desired_status:
854            if ignore_noop:
855                return
856            raise ValueError(
857                f"Connection is already {'enabled' if enabled else 'disabled'}. "
858                f"Current status: {current_status}"
859            )
860
861        updated_response = api_util.patch_connection(
862            connection_id=self.connection_id,
863            api_root=self.workspace.api_root,
864            client_id=self.workspace.client_id,
865            client_secret=self.workspace.client_secret,
866            bearer_token=self.workspace.bearer_token,
867            status=desired_status,
868        )
869        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
870
871    # Scheduling
872
873    def set_schedule(
874        self,
875        cron_expression: str,
876    ) -> None:
877        """Set a cron schedule for the connection.
878
879        Args:
880            cron_expression: A cron expression defining when syncs should run.
881
882        Examples:
883                - "0 0 * * *" - Daily at midnight UTC
884                - "0 */6 * * *" - Every 6 hours
885                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
886        """
887        updated_response = api_util.patch_connection(
888            connection_id=self.connection_id,
889            api_root=self.workspace.api_root,
890            client_id=self.workspace.client_id,
891            client_secret=self.workspace.client_secret,
892            bearer_token=self.workspace.bearer_token,
893            schedule=api_util.build_connection_schedule(
894                schedule_type="cron",
895                cron_expression=cron_expression,
896            ),
897        )
898        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
899
900    def set_manual_schedule(self) -> None:
901        """Set the connection to manual scheduling.
902
903        Disables automatic syncs. Syncs will only run when manually triggered.
904        """
905        updated_response = api_util.patch_connection(
906            connection_id=self.connection_id,
907            api_root=self.workspace.api_root,
908            client_id=self.workspace.client_id,
909            client_secret=self.workspace.client_secret,
910            bearer_token=self.workspace.bearer_token,
911            schedule=api_util.build_connection_schedule(schedule_type="manual"),
912        )
913        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
914
915    # Deletions
916
917    def permanently_delete(
918        self,
919        *,
920        cascade_delete_source: bool = False,
921        cascade_delete_destination: bool = False,
922    ) -> None:
923        """Delete the connection.
924
925        Args:
926            cascade_delete_source: Whether to also delete the source.
927            cascade_delete_destination: Whether to also delete the destination.
928        """
929        self.workspace.permanently_delete_connection(self)
930
931        if cascade_delete_source:
932            self.workspace.permanently_delete_source(self.source_id)
933
934        if cascade_delete_destination:
935            self.workspace.permanently_delete_destination(self.destination_id)
logger = <Logger airbyte.cloud.connections (INFO)>
class CloudConnection:
 44class CloudConnection:  # noqa: PLR0904  # Too many public methods
 45    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 46
 47    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 48    """
 49
 50    def __init__(
 51        self,
 52        workspace: CloudWorkspace,
 53        connection_id: str,
 54        source: str | None = None,
 55        destination: str | None = None,
 56    ) -> None:
 57        """It is not recommended to create a `CloudConnection` object directly.
 58
 59        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 60        """
 61        self.connection_id = connection_id
 62        """The ID of the connection."""
 63
 64        self.workspace = workspace
 65        """The workspace that the connection belongs to."""
 66
 67        self._source_id = source
 68        """The ID of the source."""
 69
 70        self._destination_id = destination
 71        """The ID of the destination."""
 72
 73        self._connection_info: CloudConnectionInfo | None = None
 74        """The connection info object. (Cached.)"""
 75
 76        self._cloud_source_object: CloudSource | None = None
 77        """The source object. (Cached.)"""
 78
 79        self._cloud_destination_object: CloudDestination | None = None
 80        """The destination object. (Cached.)"""
 81
 82    def _fetch_connection_info(
 83        self,
 84        *,
 85        force_refresh: bool = False,
 86        verify: bool = True,
 87    ) -> CloudConnectionInfo:
 88        """Fetch and cache connection info from the API.
 89
 90        By default, this method will only fetch from the API if connection info is not
 91        already cached. It also verifies that the connection belongs to the expected
 92        workspace unless verification is explicitly disabled.
 93
 94        Args:
 95            force_refresh: If True, always fetch from the API even if cached.
 96                If False (default), only fetch if not already cached.
 97            verify: If True (default), verify that the connection is valid (e.g., that
 98                the workspace_id matches this object's workspace). Raises an error if
 99                validation fails.
100
101        Returns:
102            Information about the connection from the API.
103
104        Raises:
105            AirbyteWorkspaceMismatchError: If verify is True and the connection's
106                workspace_id doesn't match the expected workspace.
107            AirbyteMissingResourceError: If the connection doesn't exist.
108        """
109        if not force_refresh and self._connection_info is not None:
110            # Use cached info, but still verify if requested
111            if verify:
112                self._verify_workspace_match(self._connection_info)
113            return self._connection_info
114
115        # Fetch from API
116        connection_info = api_util.get_connection(
117            workspace_id=self.workspace.workspace_id,
118            connection_id=self.connection_id,
119            api_root=self.workspace.api_root,
120            client_id=self.workspace.client_id,
121            client_secret=self.workspace.client_secret,
122            bearer_token=self.workspace.bearer_token,
123        )
124        result = CloudConnectionInfo.from_api_response(connection_info)
125
126        self._connection_info = result
127
128        # Verify if requested
129        if verify:
130            self._verify_workspace_match(result)
131
132        return result
133
134    def _verify_workspace_match(self, connection_info: CloudConnectionInfo) -> None:
135        """Verify that the connection belongs to the expected workspace.
136
137        Raises:
138            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
139        """
140        if connection_info.workspace_id != self.workspace.workspace_id:
141            raise AirbyteWorkspaceMismatchError(
142                resource_type="connection",
143                resource_id=self.connection_id,
144                workspace=self.workspace,
145                expected_workspace_id=self.workspace.workspace_id,
146                actual_workspace_id=connection_info.workspace_id,
147                message=(
148                    f"Connection '{self.connection_id}' belongs to workspace "
149                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
150                ),
151            )
152
153    def check_is_valid(self) -> bool:
154        """Check if this connection exists and belongs to the expected workspace.
155
156        This method fetches connection info from the API (if not already cached) and
157        verifies that the connection's workspace_id matches the workspace associated
158        with this CloudConnection object.
159
160        Returns:
161            True if the connection exists and belongs to the expected workspace.
162
163        Raises:
164            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
165            AirbyteMissingResourceError: If the connection doesn't exist.
166        """
167        self._fetch_connection_info(force_refresh=False, verify=True)
168        return True
169
170    @classmethod
171    def _from_connection_response(
172        cls,
173        workspace: CloudWorkspace,
174        connection_response: _ConnectionResponseLike,
175    ) -> CloudConnection:
176        """Create a CloudConnection from an API connection response."""
177        connection_info = CloudConnectionInfo.from_api_response(connection_response)
178        result = cls(
179            workspace=workspace,
180            connection_id=connection_info.connection_id,
181            source=connection_info.source_id,
182            destination=connection_info.destination_id,
183        )
184        result._connection_info = connection_info  # noqa: SLF001 # Accessing Non-Public API
185        return result
186
187    # Properties
188
189    @property
190    def name(self) -> str | None:
191        """Get the display name of the connection, if available.
192
193        E.g. "My Postgres to Snowflake", not the connection ID.
194        """
195        if not self._connection_info:
196            self._connection_info = self._fetch_connection_info()
197
198        return self._connection_info.name
199
200    @property
201    def source_id(self) -> str:
202        """The ID of the source."""
203        if not self._source_id:
204            if not self._connection_info:
205                self._connection_info = self._fetch_connection_info()
206
207            self._source_id = self._connection_info.source_id
208
209        return self._source_id
210
211    @property
212    def source(self) -> CloudSource:
213        """Get the source object."""
214        if self._cloud_source_object:
215            return self._cloud_source_object
216
217        self._cloud_source_object = CloudSource(
218            workspace=self.workspace,
219            connector_id=self.source_id,
220        )
221        return self._cloud_source_object
222
223    @property
224    def destination_id(self) -> str:
225        """The ID of the destination."""
226        if not self._destination_id:
227            if not self._connection_info:
228                self._connection_info = self._fetch_connection_info()
229
230            self._destination_id = self._connection_info.destination_id
231
232        return self._destination_id
233
234    @property
235    def destination(self) -> CloudDestination:
236        """Get the destination object."""
237        if self._cloud_destination_object:
238            return self._cloud_destination_object
239
240        self._cloud_destination_object = CloudDestination(
241            workspace=self.workspace,
242            connector_id=self.destination_id,
243        )
244        return self._cloud_destination_object
245
246    @property
247    def stream_names(self) -> list[str]:
248        """The stream names."""
249        if not self._connection_info:
250            self._connection_info = self._fetch_connection_info()
251
252        return [stream.name for stream in self._connection_info.configurations.streams or []]
253
254    @property
255    def table_prefix(self) -> str:
256        """The table prefix."""
257        if not self._connection_info:
258            self._connection_info = self._fetch_connection_info()
259
260        return self._connection_info.prefix or ""
261
262    @property
263    def connection_url(self) -> str | None:
264        """The web URL to the connection."""
265        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
266
267    @property
268    def job_history_url(self) -> str | None:
269        """The URL to the job history for the connection."""
270        return f"{self.connection_url}/timeline"
271
272    # Run Sync
273
274    def run_sync(
275        self,
276        *,
277        wait: bool = True,
278        wait_timeout: int = 300,
279    ) -> SyncResult:
280        """Run a sync."""
281        connection_response = api_util.run_connection(
282            connection_id=self.connection_id,
283            api_root=self.workspace.api_root,
284            workspace_id=self.workspace.workspace_id,
285            client_id=self.workspace.client_id,
286            client_secret=self.workspace.client_secret,
287            bearer_token=self.workspace.bearer_token,
288        )
289        sync_result = SyncResult(
290            workspace=self.workspace,
291            connection=self,
292            job_id=connection_response.job_id,
293        )
294
295        if wait:
296            sync_result.wait_for_completion(
297                wait_timeout=wait_timeout,
298                raise_failure=True,
299                raise_timeout=True,
300            )
301
302        return sync_result
303
304    def __repr__(self) -> str:
305        """String representation of the connection."""
306        return (
307            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
308            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
309        )
310
311    # Logs
312
313    def get_previous_sync_logs(
314        self,
315        *,
316        limit: int = 20,
317        offset: int | None = None,
318        from_tail: bool = True,
319        job_type: str | JobTypeEnum | None = None,
320    ) -> list[SyncResult]:
321        """Get previous sync jobs for a connection with pagination support.
322
323        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
324        rows_synced, start_time). Full log text can be fetched lazily via
325        `SyncResult.get_full_log_text()`.
326
327        Args:
328            limit: Maximum number of jobs to return. Defaults to 20.
329            offset: Number of jobs to skip from the beginning. Defaults to None (0).
330            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
331                If False, returns jobs ordered oldest-first (createdAt ASC).
332                Defaults to True.
333            job_type: Filter by job type (e.g., `sync`, `refresh`).
334                If not specified, defaults to sync and reset jobs only (API default behavior).
335
336        Returns:
337            A list of SyncResult objects representing the sync jobs.
338        """
339        order_by = (
340            api_util.JOB_ORDER_BY_CREATED_AT_DESC
341            if from_tail
342            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
343        )
344        sync_logs = api_util.get_job_logs(
345            connection_id=self.connection_id,
346            api_root=self.workspace.api_root,
347            workspace_id=self.workspace.workspace_id,
348            limit=limit,
349            offset=offset,
350            order_by=order_by,
351            job_type=job_type,
352            client_id=self.workspace.client_id,
353            client_secret=self.workspace.client_secret,
354            bearer_token=self.workspace.bearer_token,
355        )
356        return [
357            SyncResult(
358                workspace=self.workspace,
359                connection=self,
360                job_id=sync_log.job_id,
361                _latest_job_info=CloudJobInfo.from_api_response(sync_log),
362            )
363            for sync_log in sync_logs
364        ]
365
366    def get_sync_result(
367        self,
368        job_id: int | None = None,
369    ) -> SyncResult | None:
370        """Get the sync result for the connection.
371
372        If `job_id` is not provided, the most recent sync job will be used.
373
374        Returns `None` if job_id is omitted and no previous jobs are found.
375        """
376        if job_id is None:
377            # Get the most recent sync job
378            results = self.get_previous_sync_logs(
379                limit=1,
380            )
381            if results:
382                return results[0]
383
384            return None
385
386        # Get the sync job by ID (lazy loaded)
387        return SyncResult(
388            workspace=self.workspace,
389            connection=self,
390            job_id=job_id,
391        )
392
393    # Artifacts
394
395    @deprecated("Use 'dump_raw_state()' instead.")
396    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
397        """Deprecated. Use `dump_raw_state()` instead."""
398        state_response = api_util.get_connection_state(
399            connection_id=self.connection_id,
400            api_root=self.workspace.api_root,
401            client_id=self.workspace.client_id,
402            client_secret=self.workspace.client_secret,
403            bearer_token=self.workspace.bearer_token,
404            config_api_root=self.workspace.config_api_root,
405        )
406        if state_response.get("stateType") == "not_set":
407            return None
408        return state_response.get("streamState", [])
409
410    @overload
411    def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ...
412
413    @overload
414    def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ...
415
416    def dump_raw_state(
417        self,
418        *,
419        normalize: bool = True,
420    ) -> dict[str, Any] | list[dict[str, Any]]:
421        """Dump the state for this connection.
422
423        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
424        with snake_case keys, suitable for passing to a connector's `--state` flag.
425
426        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
427        includes `stateType` and `connectionId`). This raw format can be passed
428        directly to `import_raw_state()` for backup/restore workflows.
429
430        Args:
431            normalize: If `True` (default), convert to Airbyte protocol format.
432                If `False`, return the raw Config API response.
433
434        Returns:
435            Normalized: list of protocol-format state message dicts (empty list if
436            no state). Raw: the full Config API state dict.
437        """
438        raw = api_util.get_connection_state(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            config_api_root=self.workspace.config_api_root,
445        )
446        if normalize:
447            return _normalize_state_to_protocol(raw)
448        return raw
449
450    def import_raw_state(
451        self,
452        connection_state: dict[str, Any] | list[dict[str, Any]],
453    ) -> dict[str, Any]:
454        """Import (restore) the full state for this connection.
455
456        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
457        > could result in broken connections, and/or incorrect sync behavior.
458
459        Replaces the entire connection state with the provided state blob.
460        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
461
462        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
463        The `connectionId` in the blob is always overridden with this connection's
464        ID, making state blobs portable across connections.
465
466        Accepts either format:
467
468        - **Config API format** (dict with `stateType`): passed through directly.
469        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
470          converted to Config API format before sending.
471
472        Args:
473            connection_state: Connection state in either Config API or Airbyte protocol format.
474
475        Returns:
476            The updated connection state as a dictionary.
477
478        Raises:
479            AirbyteConnectionSyncActiveError: If a sync is currently running on this
480                connection (HTTP 423). Wait for the sync to complete before retrying.
481        """
482        api_state: dict[str, Any]
483        if isinstance(connection_state, list):
484            if not _is_protocol_state_format(connection_state):
485                msg = (
486                    "Expected connection_state list to contain Airbyte protocol state "
487                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
488                    "or LEGACY). Got a list that does not match protocol format."
489                )
490                raise ValueError(msg)
491            api_state = _denormalize_protocol_state_to_api(
492                protocol_messages=connection_state,
493                connection_id=self.connection_id,
494            )
495        elif isinstance(connection_state, dict):
496            if _is_protocol_state_format(connection_state):
497                api_state = _denormalize_protocol_state_to_api(
498                    protocol_messages=[connection_state],
499                    connection_id=self.connection_id,
500                )
501            else:
502                api_state = connection_state
503        else:
504            msg = f"Expected a dict or list, got {type(connection_state)}"
505            raise TypeError(msg)
506
507        return api_util.replace_connection_state(
508            connection_id=self.connection_id,
509            connection_state_dict=api_state,
510            api_root=self.workspace.api_root,
511            client_id=self.workspace.client_id,
512            client_secret=self.workspace.client_secret,
513            bearer_token=self.workspace.bearer_token,
514            config_api_root=self.workspace.config_api_root,
515        )
516
517    def get_stream_state(
518        self,
519        stream_name: str,
520        stream_namespace: str | None = None,
521    ) -> dict[str, Any] | None:
522        """Get the state blob for a single stream within this connection.
523
524        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
525        not the full connection state envelope.
526
527        This is compatible with `stream`-type state and stream-level entries
528        within a `global`-type state. It is not compatible with `legacy` state.
529        To get or set the entire connection-level state artifact, use
530        `dump_raw_state` and `import_raw_state` instead.
531
532        Args:
533            stream_name: The name of the stream to get state for.
534            stream_namespace: The source-side stream namespace. This refers to the
535                namespace from the source (e.g., database schema), not any destination
536                namespace override set in connection advanced settings.
537
538        Returns:
539            The stream's state blob as a dictionary, or None if the stream is not found.
540        """
541        state_data = self.dump_raw_state(normalize=False)
542        result = ConnectionStateResponse(**state_data)
543
544        streams = _get_stream_list(result)
545        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
546
547        if not matching:
548            available = [s.stream_descriptor.name for s in streams]
549            logger.warning(
550                "Stream '%s' not found in connection state for connection '%s'. "
551                "Available streams: %s",
552                stream_name,
553                self.connection_id,
554                available,
555            )
556            return None
557
558        return matching[0].stream_state
559
560    def set_stream_state(
561        self,
562        stream_name: str,
563        state_blob_dict: dict[str, Any],
564        stream_namespace: str | None = None,
565    ) -> None:
566        """Set the state for a single stream within this connection.
567
568        Fetches the current full state, replaces only the specified stream's state,
569        then sends the full updated state back to the API. If the stream does not
570        exist in the current state, it is appended.
571
572        This is compatible with `stream`-type state and stream-level entries
573        within a `global`-type state. It is not compatible with `legacy` state.
574        To get or set the entire connection-level state artifact, use
575        `dump_raw_state` and `import_raw_state` instead.
576
577        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
578
579        Args:
580            stream_name: The name of the stream to update state for.
581            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
582            stream_namespace: The source-side stream namespace. This refers to the
583                namespace from the source (e.g., database schema), not any destination
584                namespace override set in connection advanced settings.
585
586        Raises:
587            PyAirbyteInputError: If the connection state type is not supported for
588                stream-level operations (not_set, legacy).
589            AirbyteConnectionSyncActiveError: If a sync is currently running on this
590                connection (HTTP 423). Wait for the sync to complete before retrying.
591        """
592        state_data = self.dump_raw_state(normalize=False)
593        current = ConnectionStateResponse(**state_data)
594
595        if current.state_type == "not_set":
596            raise PyAirbyteInputError(
597                message="Cannot set stream state: connection has no existing state.",
598                context={"connection_id": self.connection_id},
599            )
600
601        if current.state_type == "legacy":
602            raise PyAirbyteInputError(
603                message="Cannot set stream state on a legacy-type connection state.",
604                context={"connection_id": self.connection_id},
605            )
606
607        new_stream_entry = {
608            "streamDescriptor": {
609                "name": stream_name,
610                **(
611                    {
612                        "namespace": stream_namespace,
613                    }
614                    if stream_namespace
615                    else {}
616                ),
617            },
618            "streamState": state_blob_dict,
619        }
620
621        raw_streams: list[dict[str, Any]]
622        if current.state_type == "stream":
623            raw_streams = state_data.get("streamState", [])
624        elif current.state_type == "global":
625            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
626        else:
627            raw_streams = []
628
629        streams = _get_stream_list(current)
630        found = False
631        updated_streams_raw: list[dict[str, Any]] = []
632        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
633            if _match_stream(parsed_s, stream_name, stream_namespace):
634                updated_streams_raw.append(new_stream_entry)
635                found = True
636            else:
637                updated_streams_raw.append(raw_s)
638
639        if not found:
640            updated_streams_raw.append(new_stream_entry)
641
642        full_state: dict[str, Any] = {
643            **state_data,
644        }
645
646        if current.state_type == "stream":
647            full_state["streamState"] = updated_streams_raw
648        elif current.state_type == "global":
649            original_global = state_data.get("globalState", {})
650            full_state["globalState"] = {
651                **original_global,
652                "streamStates": updated_streams_raw,
653            }
654
655        self.import_raw_state(full_state)
656
657    @deprecated("Use 'dump_raw_catalog()' instead.")
658    def get_catalog_artifact(self) -> dict[str, Any] | None:
659        """Get the configured catalog for this connection.
660
661        Returns the full configured catalog (syncCatalog) for this connection,
662        including stream schemas, sync modes, cursor fields, and primary keys.
663
664        Uses the Config API endpoint: POST /v1/web_backend/connections/get
665
666        Returns:
667            Dictionary containing the configured catalog, or `None` if not found.
668        """
669        return self.dump_raw_catalog()
670
671    def dump_raw_catalog(
672        self,
673        *,
674        normalize: bool = True,
675    ) -> dict[str, Any] | None:
676        """Dump the configured catalog for this connection.
677
678        By default, returns the catalog in Airbyte protocol format
679        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
680        to a connector's `--catalog` flag.
681
682        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
683        Config API (camelCase keys, nested `config` block). This raw format can be
684        passed directly to `import_raw_catalog()` for backup/restore workflows.
685
686        Args:
687            normalize: If `True` (default), convert to Airbyte protocol format.
688                If `False`, return the raw Config API catalog.
689
690        Returns:
691            The configured catalog dict, or `None` if not found.
692        """
693        connection_response = api_util.get_connection_catalog(
694            connection_id=self.connection_id,
695            api_root=self.workspace.api_root,
696            client_id=self.workspace.client_id,
697            client_secret=self.workspace.client_secret,
698            bearer_token=self.workspace.bearer_token,
699            config_api_root=self.workspace.config_api_root,
700        )
701        raw = connection_response.get("syncCatalog")
702        if raw is None:
703            return None
704        if normalize:
705            return _normalize_catalog_to_protocol(raw)
706        return raw
707
708    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
709        """Replace the configured catalog for this connection.
710
711        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
712        > could result in broken connections, and/or incorrect sync behavior.
713
714        Accepts a configured catalog dict and replaces the connection's entire
715        catalog with it. All other connection settings remain unchanged.
716
717        Accepts either format:
718
719        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
720          passed through directly.
721        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
722          automatically converted to Config API format before sending.
723
724        Args:
725            catalog: The configured catalog dict in either format.
726        """
727        if _is_protocol_catalog_format(catalog):
728            catalog = _denormalize_catalog_to_api(catalog)
729
730        api_util.replace_connection_catalog(
731            connection_id=self.connection_id,
732            configured_catalog_dict=catalog,
733            api_root=self.workspace.api_root,
734            client_id=self.workspace.client_id,
735            client_secret=self.workspace.client_secret,
736            bearer_token=self.workspace.bearer_token,
737            config_api_root=self.workspace.config_api_root,
738        )
739
740    def rename(self, name: str) -> CloudConnection:
741        """Rename the connection.
742
743        Args:
744            name: New name for the connection
745
746        Returns:
747            Updated CloudConnection object with refreshed info
748        """
749        updated_response = api_util.patch_connection(
750            connection_id=self.connection_id,
751            api_root=self.workspace.api_root,
752            client_id=self.workspace.client_id,
753            client_secret=self.workspace.client_secret,
754            bearer_token=self.workspace.bearer_token,
755            name=name,
756        )
757        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
758        return self
759
760    def set_table_prefix(self, prefix: str) -> CloudConnection:
761        """Set the table prefix for the connection.
762
763        Args:
764            prefix: New table prefix to use when syncing to the destination
765
766        Returns:
767            Updated CloudConnection object with refreshed info
768        """
769        updated_response = api_util.patch_connection(
770            connection_id=self.connection_id,
771            api_root=self.workspace.api_root,
772            client_id=self.workspace.client_id,
773            client_secret=self.workspace.client_secret,
774            bearer_token=self.workspace.bearer_token,
775            prefix=prefix,
776        )
777        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
778        return self
779
780    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
781        """Set the selected streams for the connection.
782
783        This is a destructive operation that can break existing connections if the
784        stream selection is changed incorrectly. Use with caution.
785
786        Args:
787            stream_names: List of stream names to sync
788
789        Returns:
790            Updated CloudConnection object with refreshed info
791        """
792        configurations = api_util.build_stream_configurations(stream_names)
793
794        updated_response = api_util.patch_connection(
795            connection_id=self.connection_id,
796            api_root=self.workspace.api_root,
797            client_id=self.workspace.client_id,
798            client_secret=self.workspace.client_secret,
799            bearer_token=self.workspace.bearer_token,
800            configurations=configurations,
801        )
802        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
803        return self
804
805    # Enable/Disable
806
807    @property
808    def enabled(self) -> bool:
809        """Get the current enabled status of the connection.
810
811        This property always fetches fresh data from the API to ensure accuracy,
812        as another process or user may have toggled the setting.
813
814        Returns:
815            True if the connection status is 'active', False otherwise.
816        """
817        connection_info = self._fetch_connection_info(force_refresh=True)
818        return connection_info.status == "active"
819
820    @enabled.setter
821    def enabled(self, value: bool) -> None:
822        """Set the enabled status of the connection.
823
824        Args:
825            value: True to enable (set status to 'active'), False to disable
826                (set status to 'inactive').
827        """
828        self.set_enabled(enabled=value)
829
830    def set_enabled(
831        self,
832        *,
833        enabled: bool,
834        ignore_noop: bool = True,
835    ) -> None:
836        """Set the enabled status of the connection.
837
838        Args:
839            enabled: True to enable (set status to 'active'), False to disable
840                (set status to 'inactive').
841            ignore_noop: If True (default), silently return if the connection is already
842                in the requested state. If False, raise ValueError when the requested
843                state matches the current state.
844
845        Raises:
846            ValueError: If ignore_noop is False and the connection is already in the
847                requested state.
848        """
849        # Always fetch fresh data to check current status
850        connection_info = self._fetch_connection_info(force_refresh=True)
851        current_status = connection_info.status
852        desired_status = "active" if enabled else "inactive"
853
854        if current_status == desired_status:
855            if ignore_noop:
856                return
857            raise ValueError(
858                f"Connection is already {'enabled' if enabled else 'disabled'}. "
859                f"Current status: {current_status}"
860            )
861
862        updated_response = api_util.patch_connection(
863            connection_id=self.connection_id,
864            api_root=self.workspace.api_root,
865            client_id=self.workspace.client_id,
866            client_secret=self.workspace.client_secret,
867            bearer_token=self.workspace.bearer_token,
868            status=desired_status,
869        )
870        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
871
872    # Scheduling
873
874    def set_schedule(
875        self,
876        cron_expression: str,
877    ) -> None:
878        """Set a cron schedule for the connection.
879
880        Args:
881            cron_expression: A cron expression defining when syncs should run.
882
883        Examples:
884                - "0 0 * * *" - Daily at midnight UTC
885                - "0 */6 * * *" - Every 6 hours
886                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
887        """
888        updated_response = api_util.patch_connection(
889            connection_id=self.connection_id,
890            api_root=self.workspace.api_root,
891            client_id=self.workspace.client_id,
892            client_secret=self.workspace.client_secret,
893            bearer_token=self.workspace.bearer_token,
894            schedule=api_util.build_connection_schedule(
895                schedule_type="cron",
896                cron_expression=cron_expression,
897            ),
898        )
899        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
900
901    def set_manual_schedule(self) -> None:
902        """Set the connection to manual scheduling.
903
904        Disables automatic syncs. Syncs will only run when manually triggered.
905        """
906        updated_response = api_util.patch_connection(
907            connection_id=self.connection_id,
908            api_root=self.workspace.api_root,
909            client_id=self.workspace.client_id,
910            client_secret=self.workspace.client_secret,
911            bearer_token=self.workspace.bearer_token,
912            schedule=api_util.build_connection_schedule(schedule_type="manual"),
913        )
914        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
915
916    # Deletions
917
918    def permanently_delete(
919        self,
920        *,
921        cascade_delete_source: bool = False,
922        cascade_delete_destination: bool = False,
923    ) -> None:
924        """Delete the connection.
925
926        Args:
927            cascade_delete_source: Whether to also delete the source.
928            cascade_delete_destination: Whether to also delete the destination.
929        """
930        self.workspace.permanently_delete_connection(self)
931
932        if cascade_delete_source:
933            self.workspace.permanently_delete_source(self.source_id)
934
935        if cascade_delete_destination:
936            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
50    def __init__(
51        self,
52        workspace: CloudWorkspace,
53        connection_id: str,
54        source: str | None = None,
55        destination: str | None = None,
56    ) -> None:
57        """It is not recommended to create a `CloudConnection` object directly.
58
59        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
60        """
61        self.connection_id = connection_id
62        """The ID of the connection."""
63
64        self.workspace = workspace
65        """The workspace that the connection belongs to."""
66
67        self._source_id = source
68        """The ID of the source."""
69
70        self._destination_id = destination
71        """The ID of the destination."""
72
73        self._connection_info: CloudConnectionInfo | None = None
74        """The connection info object. (Cached.)"""
75
76        self._cloud_source_object: CloudSource | None = None
77        """The source object. (Cached.)"""
78
79        self._cloud_destination_object: CloudDestination | None = None
80        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
153    def check_is_valid(self) -> bool:
154        """Check if this connection exists and belongs to the expected workspace.
155
156        This method fetches connection info from the API (if not already cached) and
157        verifies that the connection's workspace_id matches the workspace associated
158        with this CloudConnection object.
159
160        Returns:
161            True if the connection exists and belongs to the expected workspace.
162
163        Raises:
164            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
165            AirbyteMissingResourceError: If the connection doesn't exist.
166        """
167        self._fetch_connection_info(force_refresh=False, verify=True)
168        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
189    @property
190    def name(self) -> str | None:
191        """Get the display name of the connection, if available.
192
193        E.g. "My Postgres to Snowflake", not the connection ID.
194        """
195        if not self._connection_info:
196            self._connection_info = self._fetch_connection_info()
197
198        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
200    @property
201    def source_id(self) -> str:
202        """The ID of the source."""
203        if not self._source_id:
204            if not self._connection_info:
205                self._connection_info = self._fetch_connection_info()
206
207            self._source_id = self._connection_info.source_id
208
209        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
211    @property
212    def source(self) -> CloudSource:
213        """Get the source object."""
214        if self._cloud_source_object:
215            return self._cloud_source_object
216
217        self._cloud_source_object = CloudSource(
218            workspace=self.workspace,
219            connector_id=self.source_id,
220        )
221        return self._cloud_source_object

Get the source object.

destination_id: str
223    @property
224    def destination_id(self) -> str:
225        """The ID of the destination."""
226        if not self._destination_id:
227            if not self._connection_info:
228                self._connection_info = self._fetch_connection_info()
229
230            self._destination_id = self._connection_info.destination_id
231
232        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
234    @property
235    def destination(self) -> CloudDestination:
236        """Get the destination object."""
237        if self._cloud_destination_object:
238            return self._cloud_destination_object
239
240        self._cloud_destination_object = CloudDestination(
241            workspace=self.workspace,
242            connector_id=self.destination_id,
243        )
244        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
246    @property
247    def stream_names(self) -> list[str]:
248        """The stream names."""
249        if not self._connection_info:
250            self._connection_info = self._fetch_connection_info()
251
252        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
254    @property
255    def table_prefix(self) -> str:
256        """The table prefix."""
257        if not self._connection_info:
258            self._connection_info = self._fetch_connection_info()
259
260        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
262    @property
263    def connection_url(self) -> str | None:
264        """The web URL to the connection."""
265        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
267    @property
268    def job_history_url(self) -> str | None:
269        """The URL to the job history for the connection."""
270        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
274    def run_sync(
275        self,
276        *,
277        wait: bool = True,
278        wait_timeout: int = 300,
279    ) -> SyncResult:
280        """Run a sync."""
281        connection_response = api_util.run_connection(
282            connection_id=self.connection_id,
283            api_root=self.workspace.api_root,
284            workspace_id=self.workspace.workspace_id,
285            client_id=self.workspace.client_id,
286            client_secret=self.workspace.client_secret,
287            bearer_token=self.workspace.bearer_token,
288        )
289        sync_result = SyncResult(
290            workspace=self.workspace,
291            connection=self,
292            job_id=connection_response.job_id,
293        )
294
295        if wait:
296            sync_result.wait_for_completion(
297                wait_timeout=wait_timeout,
298                raise_failure=True,
299                raise_timeout=True,
300            )
301
302        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True, job_type: str | airbyte.cloud.JobTypeEnum | None = None) -> list[airbyte.cloud.SyncResult]:
313    def get_previous_sync_logs(
314        self,
315        *,
316        limit: int = 20,
317        offset: int | None = None,
318        from_tail: bool = True,
319        job_type: str | JobTypeEnum | None = None,
320    ) -> list[SyncResult]:
321        """Get previous sync jobs for a connection with pagination support.
322
323        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
324        rows_synced, start_time). Full log text can be fetched lazily via
325        `SyncResult.get_full_log_text()`.
326
327        Args:
328            limit: Maximum number of jobs to return. Defaults to 20.
329            offset: Number of jobs to skip from the beginning. Defaults to None (0).
330            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
331                If False, returns jobs ordered oldest-first (createdAt ASC).
332                Defaults to True.
333            job_type: Filter by job type (e.g., `sync`, `refresh`).
334                If not specified, defaults to sync and reset jobs only (API default behavior).
335
336        Returns:
337            A list of SyncResult objects representing the sync jobs.
338        """
339        order_by = (
340            api_util.JOB_ORDER_BY_CREATED_AT_DESC
341            if from_tail
342            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
343        )
344        sync_logs = api_util.get_job_logs(
345            connection_id=self.connection_id,
346            api_root=self.workspace.api_root,
347            workspace_id=self.workspace.workspace_id,
348            limit=limit,
349            offset=offset,
350            order_by=order_by,
351            job_type=job_type,
352            client_id=self.workspace.client_id,
353            client_secret=self.workspace.client_secret,
354            bearer_token=self.workspace.bearer_token,
355        )
356        return [
357            SyncResult(
358                workspace=self.workspace,
359                connection=self,
360                job_id=sync_log.job_id,
361                _latest_job_info=CloudJobInfo.from_api_response(sync_log),
362            )
363            for sync_log in sync_logs
364        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
  • job_type: Filter by job type (e.g., sync, refresh). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
366    def get_sync_result(
367        self,
368        job_id: int | None = None,
369    ) -> SyncResult | None:
370        """Get the sync result for the connection.
371
372        If `job_id` is not provided, the most recent sync job will be used.
373
374        Returns `None` if job_id is omitted and no previous jobs are found.
375        """
376        if job_id is None:
377            # Get the most recent sync job
378            results = self.get_previous_sync_logs(
379                limit=1,
380            )
381            if results:
382                return results[0]
383
384            return None
385
386        # Get the sync job by ID (lazy loaded)
387        return SyncResult(
388            workspace=self.workspace,
389            connection=self,
390            job_id=job_id,
391        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@deprecated("Use 'dump_raw_state()' instead.")
def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
395    @deprecated("Use 'dump_raw_state()' instead.")
396    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
397        """Deprecated. Use `dump_raw_state()` instead."""
398        state_response = api_util.get_connection_state(
399            connection_id=self.connection_id,
400            api_root=self.workspace.api_root,
401            client_id=self.workspace.client_id,
402            client_secret=self.workspace.client_secret,
403            bearer_token=self.workspace.bearer_token,
404            config_api_root=self.workspace.config_api_root,
405        )
406        if state_response.get("stateType") == "not_set":
407            return None
408        return state_response.get("streamState", [])

Deprecated. Use dump_raw_state() instead.

def dump_raw_state( self, *, normalize: bool = True) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
416    def dump_raw_state(
417        self,
418        *,
419        normalize: bool = True,
420    ) -> dict[str, Any] | list[dict[str, Any]]:
421        """Dump the state for this connection.
422
423        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
424        with snake_case keys, suitable for passing to a connector's `--state` flag.
425
426        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
427        includes `stateType` and `connectionId`). This raw format can be passed
428        directly to `import_raw_state()` for backup/restore workflows.
429
430        Args:
431            normalize: If `True` (default), convert to Airbyte protocol format.
432                If `False`, return the raw Config API response.
433
434        Returns:
435            Normalized: list of protocol-format state message dicts (empty list if
436            no state). Raw: the full Config API state dict.
437        """
438        raw = api_util.get_connection_state(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            config_api_root=self.workspace.config_api_root,
445        )
446        if normalize:
447            return _normalize_state_to_protocol(raw)
448        return raw

Dump the state for this connection.

By default, returns a list of Airbyte protocol AirbyteStateMessage dicts with snake_case keys, suitable for passing to a connector's --state flag.

When normalize is False, returns the raw Config API dict (camelCase keys, includes stateType and connectionId). This raw format can be passed directly to import_raw_state() for backup/restore workflows.

Arguments:
  • normalize: If True (default), convert to Airbyte protocol format. If False, return the raw Config API response.
Returns:

Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.

def import_raw_state( self, connection_state: dict[str, typing.Any] | list[dict[str, typing.Any]]) -> dict[str, typing.Any]:
450    def import_raw_state(
451        self,
452        connection_state: dict[str, Any] | list[dict[str, Any]],
453    ) -> dict[str, Any]:
454        """Import (restore) the full state for this connection.
455
456        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
457        > could result in broken connections, and/or incorrect sync behavior.
458
459        Replaces the entire connection state with the provided state blob.
460        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
461
462        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
463        The `connectionId` in the blob is always overridden with this connection's
464        ID, making state blobs portable across connections.
465
466        Accepts either format:
467
468        - **Config API format** (dict with `stateType`): passed through directly.
469        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
470          converted to Config API format before sending.
471
472        Args:
473            connection_state: Connection state in either Config API or Airbyte protocol format.
474
475        Returns:
476            The updated connection state as a dictionary.
477
478        Raises:
479            AirbyteConnectionSyncActiveError: If a sync is currently running on this
480                connection (HTTP 423). Wait for the sync to complete before retrying.
481        """
482        api_state: dict[str, Any]
483        if isinstance(connection_state, list):
484            if not _is_protocol_state_format(connection_state):
485                msg = (
486                    "Expected connection_state list to contain Airbyte protocol state "
487                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
488                    "or LEGACY). Got a list that does not match protocol format."
489                )
490                raise ValueError(msg)
491            api_state = _denormalize_protocol_state_to_api(
492                protocol_messages=connection_state,
493                connection_id=self.connection_id,
494            )
495        elif isinstance(connection_state, dict):
496            if _is_protocol_state_format(connection_state):
497                api_state = _denormalize_protocol_state_to_api(
498                    protocol_messages=[connection_state],
499                    connection_id=self.connection_id,
500                )
501            else:
502                api_state = connection_state
503        else:
504            msg = f"Expected a dict or list, got {type(connection_state)}"
505            raise TypeError(msg)
506
507        return api_util.replace_connection_state(
508            connection_id=self.connection_id,
509            connection_state_dict=api_state,
510            api_root=self.workspace.api_root,
511            client_id=self.workspace.client_id,
512            client_secret=self.workspace.client_secret,
513            bearer_token=self.workspace.bearer_token,
514            config_api_root=self.workspace.config_api_root,
515        )

Import (restore) the full state for this connection.

⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

This is the counterpart to dump_raw_state() for backup/restore workflows. The connectionId in the blob is always overridden with this connection's ID, making state blobs portable across connections.

Accepts either format:

  • Config API format (dict with stateType): passed through directly.
  • Airbyte protocol format (list of AirbyteStateMessage dicts): automatically converted to Config API format before sending.
Arguments:
  • connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:

The updated connection state as a dictionary.

Raises:
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
def get_stream_state( self, stream_name: str, stream_namespace: str | None = None) -> dict[str, typing.Any] | None:
517    def get_stream_state(
518        self,
519        stream_name: str,
520        stream_namespace: str | None = None,
521    ) -> dict[str, Any] | None:
522        """Get the state blob for a single stream within this connection.
523
524        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
525        not the full connection state envelope.
526
527        This is compatible with `stream`-type state and stream-level entries
528        within a `global`-type state. It is not compatible with `legacy` state.
529        To get or set the entire connection-level state artifact, use
530        `dump_raw_state` and `import_raw_state` instead.
531
532        Args:
533            stream_name: The name of the stream to get state for.
534            stream_namespace: The source-side stream namespace. This refers to the
535                namespace from the source (e.g., database schema), not any destination
536                namespace override set in connection advanced settings.
537
538        Returns:
539            The stream's state blob as a dictionary, or None if the stream is not found.
540        """
541        state_data = self.dump_raw_state(normalize=False)
542        result = ConnectionStateResponse(**state_data)
543
544        streams = _get_stream_list(result)
545        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
546
547        if not matching:
548            available = [s.stream_descriptor.name for s in streams]
549            logger.warning(
550                "Stream '%s' not found in connection state for connection '%s'. "
551                "Available streams: %s",
552                stream_name,
553                self.connection_id,
554                available,
555            )
556            return None
557
558        return matching[0].stream_state

Get the state blob for a single stream within this connection.

Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Arguments:
  • stream_name: The name of the stream to get state for.
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:

The stream's state blob as a dictionary, or None if the stream is not found.

def set_stream_state( self, stream_name: str, state_blob_dict: dict[str, typing.Any], stream_namespace: str | None = None) -> None:
560    def set_stream_state(
561        self,
562        stream_name: str,
563        state_blob_dict: dict[str, Any],
564        stream_namespace: str | None = None,
565    ) -> None:
566        """Set the state for a single stream within this connection.
567
568        Fetches the current full state, replaces only the specified stream's state,
569        then sends the full updated state back to the API. If the stream does not
570        exist in the current state, it is appended.
571
572        This is compatible with `stream`-type state and stream-level entries
573        within a `global`-type state. It is not compatible with `legacy` state.
574        To get or set the entire connection-level state artifact, use
575        `dump_raw_state` and `import_raw_state` instead.
576
577        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
578
579        Args:
580            stream_name: The name of the stream to update state for.
581            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
582            stream_namespace: The source-side stream namespace. This refers to the
583                namespace from the source (e.g., database schema), not any destination
584                namespace override set in connection advanced settings.
585
586        Raises:
587            PyAirbyteInputError: If the connection state type is not supported for
588                stream-level operations (not_set, legacy).
589            AirbyteConnectionSyncActiveError: If a sync is currently running on this
590                connection (HTTP 423). Wait for the sync to complete before retrying.
591        """
592        state_data = self.dump_raw_state(normalize=False)
593        current = ConnectionStateResponse(**state_data)
594
595        if current.state_type == "not_set":
596            raise PyAirbyteInputError(
597                message="Cannot set stream state: connection has no existing state.",
598                context={"connection_id": self.connection_id},
599            )
600
601        if current.state_type == "legacy":
602            raise PyAirbyteInputError(
603                message="Cannot set stream state on a legacy-type connection state.",
604                context={"connection_id": self.connection_id},
605            )
606
607        new_stream_entry = {
608            "streamDescriptor": {
609                "name": stream_name,
610                **(
611                    {
612                        "namespace": stream_namespace,
613                    }
614                    if stream_namespace
615                    else {}
616                ),
617            },
618            "streamState": state_blob_dict,
619        }
620
621        raw_streams: list[dict[str, Any]]
622        if current.state_type == "stream":
623            raw_streams = state_data.get("streamState", [])
624        elif current.state_type == "global":
625            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
626        else:
627            raw_streams = []
628
629        streams = _get_stream_list(current)
630        found = False
631        updated_streams_raw: list[dict[str, Any]] = []
632        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
633            if _match_stream(parsed_s, stream_name, stream_namespace):
634                updated_streams_raw.append(new_stream_entry)
635                found = True
636            else:
637                updated_streams_raw.append(raw_s)
638
639        if not found:
640            updated_streams_raw.append(new_stream_entry)
641
642        full_state: dict[str, Any] = {
643            **state_data,
644        }
645
646        if current.state_type == "stream":
647            full_state["streamState"] = updated_streams_raw
648        elif current.state_type == "global":
649            original_global = state_data.get("globalState", {})
650            full_state["globalState"] = {
651                **original_global,
652                "streamStates": updated_streams_raw,
653            }
654
655        self.import_raw_state(full_state)

Set the state for a single stream within this connection.

Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Uses the safe variant that prevents updates while a sync is running (HTTP 423).

Arguments:
  • stream_name: The name of the stream to update state for.
  • state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
  • PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
@deprecated("Use 'dump_raw_catalog()' instead.")
def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
657    @deprecated("Use 'dump_raw_catalog()' instead.")
658    def get_catalog_artifact(self) -> dict[str, Any] | None:
659        """Get the configured catalog for this connection.
660
661        Returns the full configured catalog (syncCatalog) for this connection,
662        including stream schemas, sync modes, cursor fields, and primary keys.
663
664        Uses the Config API endpoint: POST /v1/web_backend/connections/get
665
666        Returns:
667            Dictionary containing the configured catalog, or `None` if not found.
668        """
669        return self.dump_raw_catalog()

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def dump_raw_catalog(self, *, normalize: bool = True) -> dict[str, typing.Any] | None:
671    def dump_raw_catalog(
672        self,
673        *,
674        normalize: bool = True,
675    ) -> dict[str, Any] | None:
676        """Dump the configured catalog for this connection.
677
678        By default, returns the catalog in Airbyte protocol format
679        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
680        to a connector's `--catalog` flag.
681
682        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
683        Config API (camelCase keys, nested `config` block). This raw format can be
684        passed directly to `import_raw_catalog()` for backup/restore workflows.
685
686        Args:
687            normalize: If `True` (default), convert to Airbyte protocol format.
688                If `False`, return the raw Config API catalog.
689
690        Returns:
691            The configured catalog dict, or `None` if not found.
692        """
693        connection_response = api_util.get_connection_catalog(
694            connection_id=self.connection_id,
695            api_root=self.workspace.api_root,
696            client_id=self.workspace.client_id,
697            client_secret=self.workspace.client_secret,
698            bearer_token=self.workspace.bearer_token,
699            config_api_root=self.workspace.config_api_root,
700        )
701        raw = connection_response.get("syncCatalog")
702        if raw is None:
703            return None
704        if normalize:
705            return _normalize_catalog_to_protocol(raw)
706        return raw

Dump the configured catalog for this connection.

By default, returns the catalog in Airbyte protocol format (ConfiguredAirbyteCatalog with snake_case keys), suitable for passing to a connector's --catalog flag.

When normalize is False, returns the raw syncCatalog dict from the Config API (camelCase keys, nested config block). This raw format can be passed directly to import_raw_catalog() for backup/restore workflows.

Arguments:
  • normalize: If True (default), convert to Airbyte protocol format. If False, return the raw Config API catalog.
Returns:

The configured catalog dict, or None if not found.

def import_raw_catalog(self, catalog: dict[str, typing.Any]) -> None:
708    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
709        """Replace the configured catalog for this connection.
710
711        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
712        > could result in broken connections, and/or incorrect sync behavior.
713
714        Accepts a configured catalog dict and replaces the connection's entire
715        catalog with it. All other connection settings remain unchanged.
716
717        Accepts either format:
718
719        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
720          passed through directly.
721        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
722          automatically converted to Config API format before sending.
723
724        Args:
725            catalog: The configured catalog dict in either format.
726        """
727        if _is_protocol_catalog_format(catalog):
728            catalog = _denormalize_catalog_to_api(catalog)
729
730        api_util.replace_connection_catalog(
731            connection_id=self.connection_id,
732            configured_catalog_dict=catalog,
733            api_root=self.workspace.api_root,
734            client_id=self.workspace.client_id,
735            client_secret=self.workspace.client_secret,
736            bearer_token=self.workspace.bearer_token,
737            config_api_root=self.workspace.config_api_root,
738        )

Replace the configured catalog for this connection.

⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.

Accepts either format:

  • Config API format (syncCatalog with camelCase keys and nested config): passed through directly.
  • Airbyte protocol format (ConfiguredAirbyteCatalog with snake_case keys): automatically converted to Config API format before sending.
Arguments:
  • catalog: The configured catalog dict in either format.
def rename(self, name: str) -> CloudConnection:
740    def rename(self, name: str) -> CloudConnection:
741        """Rename the connection.
742
743        Args:
744            name: New name for the connection
745
746        Returns:
747            Updated CloudConnection object with refreshed info
748        """
749        updated_response = api_util.patch_connection(
750            connection_id=self.connection_id,
751            api_root=self.workspace.api_root,
752            client_id=self.workspace.client_id,
753            client_secret=self.workspace.client_secret,
754            bearer_token=self.workspace.bearer_token,
755            name=name,
756        )
757        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
758        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
760    def set_table_prefix(self, prefix: str) -> CloudConnection:
761        """Set the table prefix for the connection.
762
763        Args:
764            prefix: New table prefix to use when syncing to the destination
765
766        Returns:
767            Updated CloudConnection object with refreshed info
768        """
769        updated_response = api_util.patch_connection(
770            connection_id=self.connection_id,
771            api_root=self.workspace.api_root,
772            client_id=self.workspace.client_id,
773            client_secret=self.workspace.client_secret,
774            bearer_token=self.workspace.bearer_token,
775            prefix=prefix,
776        )
777        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
778        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
780    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
781        """Set the selected streams for the connection.
782
783        This is a destructive operation that can break existing connections if the
784        stream selection is changed incorrectly. Use with caution.
785
786        Args:
787            stream_names: List of stream names to sync
788
789        Returns:
790            Updated CloudConnection object with refreshed info
791        """
792        configurations = api_util.build_stream_configurations(stream_names)
793
794        updated_response = api_util.patch_connection(
795            connection_id=self.connection_id,
796            api_root=self.workspace.api_root,
797            client_id=self.workspace.client_id,
798            client_secret=self.workspace.client_secret,
799            bearer_token=self.workspace.bearer_token,
800            configurations=configurations,
801        )
802        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
803        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
807    @property
808    def enabled(self) -> bool:
809        """Get the current enabled status of the connection.
810
811        This property always fetches fresh data from the API to ensure accuracy,
812        as another process or user may have toggled the setting.
813
814        Returns:
815            True if the connection status is 'active', False otherwise.
816        """
817        connection_info = self._fetch_connection_info(force_refresh=True)
818        return connection_info.status == "active"

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
830    def set_enabled(
831        self,
832        *,
833        enabled: bool,
834        ignore_noop: bool = True,
835    ) -> None:
836        """Set the enabled status of the connection.
837
838        Args:
839            enabled: True to enable (set status to 'active'), False to disable
840                (set status to 'inactive').
841            ignore_noop: If True (default), silently return if the connection is already
842                in the requested state. If False, raise ValueError when the requested
843                state matches the current state.
844
845        Raises:
846            ValueError: If ignore_noop is False and the connection is already in the
847                requested state.
848        """
849        # Always fetch fresh data to check current status
850        connection_info = self._fetch_connection_info(force_refresh=True)
851        current_status = connection_info.status
852        desired_status = "active" if enabled else "inactive"
853
854        if current_status == desired_status:
855            if ignore_noop:
856                return
857            raise ValueError(
858                f"Connection is already {'enabled' if enabled else 'disabled'}. "
859                f"Current status: {current_status}"
860            )
861
862        updated_response = api_util.patch_connection(
863            connection_id=self.connection_id,
864            api_root=self.workspace.api_root,
865            client_id=self.workspace.client_id,
866            client_secret=self.workspace.client_secret,
867            bearer_token=self.workspace.bearer_token,
868            status=desired_status,
869        )
870        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
874    def set_schedule(
875        self,
876        cron_expression: str,
877    ) -> None:
878        """Set a cron schedule for the connection.
879
880        Args:
881            cron_expression: A cron expression defining when syncs should run.
882
883        Examples:
884                - "0 0 * * *" - Daily at midnight UTC
885                - "0 */6 * * *" - Every 6 hours
886                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
887        """
888        updated_response = api_util.patch_connection(
889            connection_id=self.connection_id,
890            api_root=self.workspace.api_root,
891            client_id=self.workspace.client_id,
892            client_secret=self.workspace.client_secret,
893            bearer_token=self.workspace.bearer_token,
894            schedule=api_util.build_connection_schedule(
895                schedule_type="cron",
896                cron_expression=cron_expression,
897            ),
898        )
899        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
901    def set_manual_schedule(self) -> None:
902        """Set the connection to manual scheduling.
903
904        Disables automatic syncs. Syncs will only run when manually triggered.
905        """
906        updated_response = api_util.patch_connection(
907            connection_id=self.connection_id,
908            api_root=self.workspace.api_root,
909            client_id=self.workspace.client_id,
910            client_secret=self.workspace.client_secret,
911            bearer_token=self.workspace.bearer_token,
912            schedule=api_util.build_connection_schedule(schedule_type="manual"),
913        )
914        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
918    def permanently_delete(
919        self,
920        *,
921        cascade_delete_source: bool = False,
922        cascade_delete_destination: bool = False,
923    ) -> None:
924        """Delete the connection.
925
926        Args:
927            cascade_delete_source: Whether to also delete the source.
928            cascade_delete_destination: Whether to also delete the destination.
929        """
930        self.workspace.permanently_delete_connection(self)
931
932        if cascade_delete_source:
933            self.workspace.permanently_delete_source(self.source_id)
934
935        if cascade_delete_destination:
936            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.