airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6import logging
  7from typing import TYPE_CHECKING, Any, Literal, overload
  8
  9from typing_extensions import deprecated
 10
 11from airbyte._util import api_util
 12from airbyte.cloud._connection_catalog import (
 13    _denormalize_catalog_to_api,
 14    _is_protocol_catalog_format,
 15    _normalize_catalog_to_protocol,
 16)
 17from airbyte.cloud._connection_state import (
 18    ConnectionStateResponse,
 19    _denormalize_protocol_state_to_api,
 20    _get_stream_list,
 21    _is_protocol_state_format,
 22    _match_stream,
 23    _normalize_state_to_protocol,
 24)
 25from airbyte.cloud.connectors import CloudDestination, CloudSource
 26from airbyte.cloud.sync_results import SyncResult
 27from airbyte.exceptions import AirbyteWorkspaceMismatchError, PyAirbyteInputError
 28
 29
 30logger = logging.getLogger(__name__)
 31
 32
 33if TYPE_CHECKING:
 34    from airbyte_api.models import ConnectionResponse, JobResponse, JobTypeEnum
 35
 36    from airbyte.cloud.workspaces import CloudWorkspace
 37
 38
 39class CloudConnection:  # noqa: PLR0904  # Too many public methods
 40    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 41
 42    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 43    """
 44
 45    def __init__(
 46        self,
 47        workspace: CloudWorkspace,
 48        connection_id: str,
 49        source: str | None = None,
 50        destination: str | None = None,
 51    ) -> None:
 52        """It is not recommended to create a `CloudConnection` object directly.
 53
 54        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 55        """
 56        self.connection_id = connection_id
 57        """The ID of the connection."""
 58
 59        self.workspace = workspace
 60        """The workspace that the connection belongs to."""
 61
 62        self._source_id = source
 63        """The ID of the source."""
 64
 65        self._destination_id = destination
 66        """The ID of the destination."""
 67
 68        self._connection_info: ConnectionResponse | None = None
 69        """The connection info object. (Cached.)"""
 70
 71        self._cloud_source_object: CloudSource | None = None
 72        """The source object. (Cached.)"""
 73
 74        self._cloud_destination_object: CloudDestination | None = None
 75        """The destination object. (Cached.)"""
 76
 77    def _fetch_connection_info(
 78        self,
 79        *,
 80        force_refresh: bool = False,
 81        verify: bool = True,
 82    ) -> ConnectionResponse:
 83        """Fetch and cache connection info from the API.
 84
 85        By default, this method will only fetch from the API if connection info is not
 86        already cached. It also verifies that the connection belongs to the expected
 87        workspace unless verification is explicitly disabled.
 88
 89        Args:
 90            force_refresh: If True, always fetch from the API even if cached.
 91                If False (default), only fetch if not already cached.
 92            verify: If True (default), verify that the connection is valid (e.g., that
 93                the workspace_id matches this object's workspace). Raises an error if
 94                validation fails.
 95
 96        Returns:
 97            The ConnectionResponse from the API.
 98
 99        Raises:
100            AirbyteWorkspaceMismatchError: If verify is True and the connection's
101                workspace_id doesn't match the expected workspace.
102            AirbyteMissingResourceError: If the connection doesn't exist.
103        """
104        if not force_refresh and self._connection_info is not None:
105            # Use cached info, but still verify if requested
106            if verify:
107                self._verify_workspace_match(self._connection_info)
108            return self._connection_info
109
110        # Fetch from API
111        connection_info = api_util.get_connection(
112            workspace_id=self.workspace.workspace_id,
113            connection_id=self.connection_id,
114            api_root=self.workspace.api_root,
115            client_id=self.workspace.client_id,
116            client_secret=self.workspace.client_secret,
117            bearer_token=self.workspace.bearer_token,
118        )
119
120        # Cache the result first (before verification may raise)
121        self._connection_info = connection_info
122
123        # Verify if requested
124        if verify:
125            self._verify_workspace_match(connection_info)
126
127        return connection_info
128
129    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
130        """Verify that the connection belongs to the expected workspace.
131
132        Raises:
133            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
134        """
135        if connection_info.workspace_id != self.workspace.workspace_id:
136            raise AirbyteWorkspaceMismatchError(
137                resource_type="connection",
138                resource_id=self.connection_id,
139                workspace=self.workspace,
140                expected_workspace_id=self.workspace.workspace_id,
141                actual_workspace_id=connection_info.workspace_id,
142                message=(
143                    f"Connection '{self.connection_id}' belongs to workspace "
144                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
145                ),
146            )
147
148    def check_is_valid(self) -> bool:
149        """Check if this connection exists and belongs to the expected workspace.
150
151        This method fetches connection info from the API (if not already cached) and
152        verifies that the connection's workspace_id matches the workspace associated
153        with this CloudConnection object.
154
155        Returns:
156            True if the connection exists and belongs to the expected workspace.
157
158        Raises:
159            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
160            AirbyteMissingResourceError: If the connection doesn't exist.
161        """
162        self._fetch_connection_info(force_refresh=False, verify=True)
163        return True
164
165    @classmethod
166    def _from_connection_response(
167        cls,
168        workspace: CloudWorkspace,
169        connection_response: ConnectionResponse,
170    ) -> CloudConnection:
171        """Create a CloudConnection from a ConnectionResponse."""
172        result = cls(
173            workspace=workspace,
174            connection_id=connection_response.connection_id,
175            source=connection_response.source_id,
176            destination=connection_response.destination_id,
177        )
178        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
179        return result
180
181    # Properties
182
183    @property
184    def name(self) -> str | None:
185        """Get the display name of the connection, if available.
186
187        E.g. "My Postgres to Snowflake", not the connection ID.
188        """
189        if not self._connection_info:
190            self._connection_info = self._fetch_connection_info()
191
192        return self._connection_info.name
193
194    @property
195    def source_id(self) -> str:
196        """The ID of the source."""
197        if not self._source_id:
198            if not self._connection_info:
199                self._connection_info = self._fetch_connection_info()
200
201            self._source_id = self._connection_info.source_id
202
203        return self._source_id
204
205    @property
206    def source(self) -> CloudSource:
207        """Get the source object."""
208        if self._cloud_source_object:
209            return self._cloud_source_object
210
211        self._cloud_source_object = CloudSource(
212            workspace=self.workspace,
213            connector_id=self.source_id,
214        )
215        return self._cloud_source_object
216
217    @property
218    def destination_id(self) -> str:
219        """The ID of the destination."""
220        if not self._destination_id:
221            if not self._connection_info:
222                self._connection_info = self._fetch_connection_info()
223
224            self._destination_id = self._connection_info.destination_id
225
226        return self._destination_id
227
228    @property
229    def destination(self) -> CloudDestination:
230        """Get the destination object."""
231        if self._cloud_destination_object:
232            return self._cloud_destination_object
233
234        self._cloud_destination_object = CloudDestination(
235            workspace=self.workspace,
236            connector_id=self.destination_id,
237        )
238        return self._cloud_destination_object
239
240    @property
241    def stream_names(self) -> list[str]:
242        """The stream names."""
243        if not self._connection_info:
244            self._connection_info = self._fetch_connection_info()
245
246        return [stream.name for stream in self._connection_info.configurations.streams or []]
247
248    @property
249    def table_prefix(self) -> str:
250        """The table prefix."""
251        if not self._connection_info:
252            self._connection_info = self._fetch_connection_info()
253
254        return self._connection_info.prefix or ""
255
256    @property
257    def connection_url(self) -> str | None:
258        """The web URL to the connection."""
259        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
260
261    @property
262    def job_history_url(self) -> str | None:
263        """The URL to the job history for the connection."""
264        return f"{self.connection_url}/timeline"
265
266    # Run Sync
267
268    def run_sync(
269        self,
270        *,
271        wait: bool = True,
272        wait_timeout: int = 300,
273    ) -> SyncResult:
274        """Run a sync."""
275        connection_response = api_util.run_connection(
276            connection_id=self.connection_id,
277            api_root=self.workspace.api_root,
278            workspace_id=self.workspace.workspace_id,
279            client_id=self.workspace.client_id,
280            client_secret=self.workspace.client_secret,
281            bearer_token=self.workspace.bearer_token,
282        )
283        sync_result = SyncResult(
284            workspace=self.workspace,
285            connection=self,
286            job_id=connection_response.job_id,
287        )
288
289        if wait:
290            sync_result.wait_for_completion(
291                wait_timeout=wait_timeout,
292                raise_failure=True,
293                raise_timeout=True,
294            )
295
296        return sync_result
297
298    def __repr__(self) -> str:
299        """String representation of the connection."""
300        return (
301            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
302            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
303        )
304
305    # Logs
306
307    def get_previous_sync_logs(
308        self,
309        *,
310        limit: int = 20,
311        offset: int | None = None,
312        from_tail: bool = True,
313        job_type: JobTypeEnum | None = None,
314    ) -> list[SyncResult]:
315        """Get previous sync jobs for a connection with pagination support.
316
317        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
318        rows_synced, start_time). Full log text can be fetched lazily via
319        `SyncResult.get_full_log_text()`.
320
321        Args:
322            limit: Maximum number of jobs to return. Defaults to 20.
323            offset: Number of jobs to skip from the beginning. Defaults to None (0).
324            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
325                If False, returns jobs ordered oldest-first (createdAt ASC).
326                Defaults to True.
327            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
328                If not specified, defaults to sync and reset jobs only (API default behavior).
329
330        Returns:
331            A list of SyncResult objects representing the sync jobs.
332        """
333        order_by = (
334            api_util.JOB_ORDER_BY_CREATED_AT_DESC
335            if from_tail
336            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
337        )
338        sync_logs: list[JobResponse] = api_util.get_job_logs(
339            connection_id=self.connection_id,
340            api_root=self.workspace.api_root,
341            workspace_id=self.workspace.workspace_id,
342            limit=limit,
343            offset=offset,
344            order_by=order_by,
345            job_type=job_type,
346            client_id=self.workspace.client_id,
347            client_secret=self.workspace.client_secret,
348            bearer_token=self.workspace.bearer_token,
349        )
350        return [
351            SyncResult(
352                workspace=self.workspace,
353                connection=self,
354                job_id=sync_log.job_id,
355                _latest_job_info=sync_log,
356            )
357            for sync_log in sync_logs
358        ]
359
360    def get_sync_result(
361        self,
362        job_id: int | None = None,
363    ) -> SyncResult | None:
364        """Get the sync result for the connection.
365
366        If `job_id` is not provided, the most recent sync job will be used.
367
368        Returns `None` if job_id is omitted and no previous jobs are found.
369        """
370        if job_id is None:
371            # Get the most recent sync job
372            results = self.get_previous_sync_logs(
373                limit=1,
374            )
375            if results:
376                return results[0]
377
378            return None
379
380        # Get the sync job by ID (lazy loaded)
381        return SyncResult(
382            workspace=self.workspace,
383            connection=self,
384            job_id=job_id,
385        )
386
387    # Artifacts
388
389    @deprecated("Use 'dump_raw_state()' instead.")
390    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
391        """Deprecated. Use `dump_raw_state()` instead."""
392        state_response = api_util.get_connection_state(
393            connection_id=self.connection_id,
394            api_root=self.workspace.api_root,
395            client_id=self.workspace.client_id,
396            client_secret=self.workspace.client_secret,
397            bearer_token=self.workspace.bearer_token,
398        )
399        if state_response.get("stateType") == "not_set":
400            return None
401        return state_response.get("streamState", [])
402
403    @overload
404    def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ...
405
406    @overload
407    def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ...
408
409    def dump_raw_state(
410        self,
411        *,
412        normalize: bool = True,
413    ) -> dict[str, Any] | list[dict[str, Any]]:
414        """Dump the state for this connection.
415
416        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
417        with snake_case keys, suitable for passing to a connector's `--state` flag.
418
419        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
420        includes `stateType` and `connectionId`). This raw format can be passed
421        directly to `import_raw_state()` for backup/restore workflows.
422
423        Args:
424            normalize: If `True` (default), convert to Airbyte protocol format.
425                If `False`, return the raw Config API response.
426
427        Returns:
428            Normalized: list of protocol-format state message dicts (empty list if
429            no state). Raw: the full Config API state dict.
430        """
431        raw = api_util.get_connection_state(
432            connection_id=self.connection_id,
433            api_root=self.workspace.api_root,
434            client_id=self.workspace.client_id,
435            client_secret=self.workspace.client_secret,
436            bearer_token=self.workspace.bearer_token,
437        )
438        if normalize:
439            return _normalize_state_to_protocol(raw)
440        return raw
441
442    def import_raw_state(
443        self,
444        connection_state: dict[str, Any] | list[dict[str, Any]],
445    ) -> dict[str, Any]:
446        """Import (restore) the full state for this connection.
447
448        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
449        > could result in broken connections, and/or incorrect sync behavior.
450
451        Replaces the entire connection state with the provided state blob.
452        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
453
454        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
455        The `connectionId` in the blob is always overridden with this connection's
456        ID, making state blobs portable across connections.
457
458        Accepts either format:
459
460        - **Config API format** (dict with `stateType`): passed through directly.
461        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
462          converted to Config API format before sending.
463
464        Args:
465            connection_state: Connection state in either Config API or Airbyte protocol format.
466
467        Returns:
468            The updated connection state as a dictionary.
469
470        Raises:
471            AirbyteConnectionSyncActiveError: If a sync is currently running on this
472                connection (HTTP 423). Wait for the sync to complete before retrying.
473        """
474        api_state: dict[str, Any]
475        if isinstance(connection_state, list):
476            if not _is_protocol_state_format(connection_state):
477                msg = (
478                    "Expected connection_state list to contain Airbyte protocol state "
479                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
480                    "or LEGACY). Got a list that does not match protocol format."
481                )
482                raise ValueError(msg)
483            api_state = _denormalize_protocol_state_to_api(
484                protocol_messages=connection_state,
485                connection_id=self.connection_id,
486            )
487        elif isinstance(connection_state, dict):
488            if _is_protocol_state_format(connection_state):
489                api_state = _denormalize_protocol_state_to_api(
490                    protocol_messages=[connection_state],
491                    connection_id=self.connection_id,
492                )
493            else:
494                api_state = connection_state
495        else:
496            msg = f"Expected a dict or list, got {type(connection_state)}"
497            raise TypeError(msg)
498
499        return api_util.replace_connection_state(
500            connection_id=self.connection_id,
501            connection_state_dict=api_state,
502            api_root=self.workspace.api_root,
503            client_id=self.workspace.client_id,
504            client_secret=self.workspace.client_secret,
505            bearer_token=self.workspace.bearer_token,
506        )
507
508    def get_stream_state(
509        self,
510        stream_name: str,
511        stream_namespace: str | None = None,
512    ) -> dict[str, Any] | None:
513        """Get the state blob for a single stream within this connection.
514
515        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
516        not the full connection state envelope.
517
518        This is compatible with `stream`-type state and stream-level entries
519        within a `global`-type state. It is not compatible with `legacy` state.
520        To get or set the entire connection-level state artifact, use
521        `dump_raw_state` and `import_raw_state` instead.
522
523        Args:
524            stream_name: The name of the stream to get state for.
525            stream_namespace: The source-side stream namespace. This refers to the
526                namespace from the source (e.g., database schema), not any destination
527                namespace override set in connection advanced settings.
528
529        Returns:
530            The stream's state blob as a dictionary, or None if the stream is not found.
531        """
532        state_data = self.dump_raw_state(normalize=False)
533        result = ConnectionStateResponse(**state_data)
534
535        streams = _get_stream_list(result)
536        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
537
538        if not matching:
539            available = [s.stream_descriptor.name for s in streams]
540            logger.warning(
541                "Stream '%s' not found in connection state for connection '%s'. "
542                "Available streams: %s",
543                stream_name,
544                self.connection_id,
545                available,
546            )
547            return None
548
549        return matching[0].stream_state
550
551    def set_stream_state(
552        self,
553        stream_name: str,
554        state_blob_dict: dict[str, Any],
555        stream_namespace: str | None = None,
556    ) -> None:
557        """Set the state for a single stream within this connection.
558
559        Fetches the current full state, replaces only the specified stream's state,
560        then sends the full updated state back to the API. If the stream does not
561        exist in the current state, it is appended.
562
563        This is compatible with `stream`-type state and stream-level entries
564        within a `global`-type state. It is not compatible with `legacy` state.
565        To get or set the entire connection-level state artifact, use
566        `dump_raw_state` and `import_raw_state` instead.
567
568        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
569
570        Args:
571            stream_name: The name of the stream to update state for.
572            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
573            stream_namespace: The source-side stream namespace. This refers to the
574                namespace from the source (e.g., database schema), not any destination
575                namespace override set in connection advanced settings.
576
577        Raises:
578            PyAirbyteInputError: If the connection state type is not supported for
579                stream-level operations (not_set, legacy).
580            AirbyteConnectionSyncActiveError: If a sync is currently running on this
581                connection (HTTP 423). Wait for the sync to complete before retrying.
582        """
583        state_data = self.dump_raw_state(normalize=False)
584        current = ConnectionStateResponse(**state_data)
585
586        if current.state_type == "not_set":
587            raise PyAirbyteInputError(
588                message="Cannot set stream state: connection has no existing state.",
589                context={"connection_id": self.connection_id},
590            )
591
592        if current.state_type == "legacy":
593            raise PyAirbyteInputError(
594                message="Cannot set stream state on a legacy-type connection state.",
595                context={"connection_id": self.connection_id},
596            )
597
598        new_stream_entry = {
599            "streamDescriptor": {
600                "name": stream_name,
601                **(
602                    {
603                        "namespace": stream_namespace,
604                    }
605                    if stream_namespace
606                    else {}
607                ),
608            },
609            "streamState": state_blob_dict,
610        }
611
612        raw_streams: list[dict[str, Any]]
613        if current.state_type == "stream":
614            raw_streams = state_data.get("streamState", [])
615        elif current.state_type == "global":
616            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
617        else:
618            raw_streams = []
619
620        streams = _get_stream_list(current)
621        found = False
622        updated_streams_raw: list[dict[str, Any]] = []
623        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
624            if _match_stream(parsed_s, stream_name, stream_namespace):
625                updated_streams_raw.append(new_stream_entry)
626                found = True
627            else:
628                updated_streams_raw.append(raw_s)
629
630        if not found:
631            updated_streams_raw.append(new_stream_entry)
632
633        full_state: dict[str, Any] = {
634            **state_data,
635        }
636
637        if current.state_type == "stream":
638            full_state["streamState"] = updated_streams_raw
639        elif current.state_type == "global":
640            original_global = state_data.get("globalState", {})
641            full_state["globalState"] = {
642                **original_global,
643                "streamStates": updated_streams_raw,
644            }
645
646        self.import_raw_state(full_state)
647
648    @deprecated("Use 'dump_raw_catalog()' instead.")
649    def get_catalog_artifact(self) -> dict[str, Any] | None:
650        """Get the configured catalog for this connection.
651
652        Returns the full configured catalog (syncCatalog) for this connection,
653        including stream schemas, sync modes, cursor fields, and primary keys.
654
655        Uses the Config API endpoint: POST /v1/web_backend/connections/get
656
657        Returns:
658            Dictionary containing the configured catalog, or `None` if not found.
659        """
660        return self.dump_raw_catalog()
661
662    def dump_raw_catalog(
663        self,
664        *,
665        normalize: bool = True,
666    ) -> dict[str, Any] | None:
667        """Dump the configured catalog for this connection.
668
669        By default, returns the catalog in Airbyte protocol format
670        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
671        to a connector's `--catalog` flag.
672
673        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
674        Config API (camelCase keys, nested `config` block). This raw format can be
675        passed directly to `import_raw_catalog()` for backup/restore workflows.
676
677        Args:
678            normalize: If `True` (default), convert to Airbyte protocol format.
679                If `False`, return the raw Config API catalog.
680
681        Returns:
682            The configured catalog dict, or `None` if not found.
683        """
684        connection_response = api_util.get_connection_catalog(
685            connection_id=self.connection_id,
686            api_root=self.workspace.api_root,
687            client_id=self.workspace.client_id,
688            client_secret=self.workspace.client_secret,
689            bearer_token=self.workspace.bearer_token,
690        )
691        raw = connection_response.get("syncCatalog")
692        if raw is None:
693            return None
694        if normalize:
695            return _normalize_catalog_to_protocol(raw)
696        return raw
697
698    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
699        """Replace the configured catalog for this connection.
700
701        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
702        > could result in broken connections, and/or incorrect sync behavior.
703
704        Accepts a configured catalog dict and replaces the connection's entire
705        catalog with it. All other connection settings remain unchanged.
706
707        Accepts either format:
708
709        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
710          passed through directly.
711        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
712          automatically converted to Config API format before sending.
713
714        Args:
715            catalog: The configured catalog dict in either format.
716        """
717        if _is_protocol_catalog_format(catalog):
718            catalog = _denormalize_catalog_to_api(catalog)
719
720        api_util.replace_connection_catalog(
721            connection_id=self.connection_id,
722            configured_catalog_dict=catalog,
723            api_root=self.workspace.api_root,
724            client_id=self.workspace.client_id,
725            client_secret=self.workspace.client_secret,
726            bearer_token=self.workspace.bearer_token,
727        )
728
729    def rename(self, name: str) -> CloudConnection:
730        """Rename the connection.
731
732        Args:
733            name: New name for the connection
734
735        Returns:
736            Updated CloudConnection object with refreshed info
737        """
738        updated_response = api_util.patch_connection(
739            connection_id=self.connection_id,
740            api_root=self.workspace.api_root,
741            client_id=self.workspace.client_id,
742            client_secret=self.workspace.client_secret,
743            bearer_token=self.workspace.bearer_token,
744            name=name,
745        )
746        self._connection_info = updated_response
747        return self
748
749    def set_table_prefix(self, prefix: str) -> CloudConnection:
750        """Set the table prefix for the connection.
751
752        Args:
753            prefix: New table prefix to use when syncing to the destination
754
755        Returns:
756            Updated CloudConnection object with refreshed info
757        """
758        updated_response = api_util.patch_connection(
759            connection_id=self.connection_id,
760            api_root=self.workspace.api_root,
761            client_id=self.workspace.client_id,
762            client_secret=self.workspace.client_secret,
763            bearer_token=self.workspace.bearer_token,
764            prefix=prefix,
765        )
766        self._connection_info = updated_response
767        return self
768
769    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
770        """Set the selected streams for the connection.
771
772        This is a destructive operation that can break existing connections if the
773        stream selection is changed incorrectly. Use with caution.
774
775        Args:
776            stream_names: List of stream names to sync
777
778        Returns:
779            Updated CloudConnection object with refreshed info
780        """
781        configurations = api_util.build_stream_configurations(stream_names)
782
783        updated_response = api_util.patch_connection(
784            connection_id=self.connection_id,
785            api_root=self.workspace.api_root,
786            client_id=self.workspace.client_id,
787            client_secret=self.workspace.client_secret,
788            bearer_token=self.workspace.bearer_token,
789            configurations=configurations,
790        )
791        self._connection_info = updated_response
792        return self
793
794    # Enable/Disable
795
796    @property
797    def enabled(self) -> bool:
798        """Get the current enabled status of the connection.
799
800        This property always fetches fresh data from the API to ensure accuracy,
801        as another process or user may have toggled the setting.
802
803        Returns:
804            True if the connection status is 'active', False otherwise.
805        """
806        connection_info = self._fetch_connection_info(force_refresh=True)
807        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
808
809    @enabled.setter
810    def enabled(self, value: bool) -> None:
811        """Set the enabled status of the connection.
812
813        Args:
814            value: True to enable (set status to 'active'), False to disable
815                (set status to 'inactive').
816        """
817        self.set_enabled(enabled=value)
818
819    def set_enabled(
820        self,
821        *,
822        enabled: bool,
823        ignore_noop: bool = True,
824    ) -> None:
825        """Set the enabled status of the connection.
826
827        Args:
828            enabled: True to enable (set status to 'active'), False to disable
829                (set status to 'inactive').
830            ignore_noop: If True (default), silently return if the connection is already
831                in the requested state. If False, raise ValueError when the requested
832                state matches the current state.
833
834        Raises:
835            ValueError: If ignore_noop is False and the connection is already in the
836                requested state.
837        """
838        # Always fetch fresh data to check current status
839        connection_info = self._fetch_connection_info(force_refresh=True)
840        current_status = connection_info.status
841        desired_status = (
842            api_util.models.ConnectionStatusEnum.ACTIVE
843            if enabled
844            else api_util.models.ConnectionStatusEnum.INACTIVE
845        )
846
847        if current_status == desired_status:
848            if ignore_noop:
849                return
850            raise ValueError(
851                f"Connection is already {'enabled' if enabled else 'disabled'}. "
852                f"Current status: {current_status}"
853            )
854
855        updated_response = api_util.patch_connection(
856            connection_id=self.connection_id,
857            api_root=self.workspace.api_root,
858            client_id=self.workspace.client_id,
859            client_secret=self.workspace.client_secret,
860            bearer_token=self.workspace.bearer_token,
861            status=desired_status,
862        )
863        self._connection_info = updated_response
864
865    # Scheduling
866
867    def set_schedule(
868        self,
869        cron_expression: str,
870    ) -> None:
871        """Set a cron schedule for the connection.
872
873        Args:
874            cron_expression: A cron expression defining when syncs should run.
875
876        Examples:
877                - "0 0 * * *" - Daily at midnight UTC
878                - "0 */6 * * *" - Every 6 hours
879                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
880        """
881        schedule = api_util.models.AirbyteAPIConnectionSchedule(
882            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
883            cron_expression=cron_expression,
884        )
885        updated_response = api_util.patch_connection(
886            connection_id=self.connection_id,
887            api_root=self.workspace.api_root,
888            client_id=self.workspace.client_id,
889            client_secret=self.workspace.client_secret,
890            bearer_token=self.workspace.bearer_token,
891            schedule=schedule,
892        )
893        self._connection_info = updated_response
894
895    def set_manual_schedule(self) -> None:
896        """Set the connection to manual scheduling.
897
898        Disables automatic syncs. Syncs will only run when manually triggered.
899        """
900        schedule = api_util.models.AirbyteAPIConnectionSchedule(
901            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
902        )
903        updated_response = api_util.patch_connection(
904            connection_id=self.connection_id,
905            api_root=self.workspace.api_root,
906            client_id=self.workspace.client_id,
907            client_secret=self.workspace.client_secret,
908            bearer_token=self.workspace.bearer_token,
909            schedule=schedule,
910        )
911        self._connection_info = updated_response
912
913    # Deletions
914
915    def permanently_delete(
916        self,
917        *,
918        cascade_delete_source: bool = False,
919        cascade_delete_destination: bool = False,
920    ) -> None:
921        """Delete the connection.
922
923        Args:
924            cascade_delete_source: Whether to also delete the source.
925            cascade_delete_destination: Whether to also delete the destination.
926        """
927        self.workspace.permanently_delete_connection(self)
928
929        if cascade_delete_source:
930            self.workspace.permanently_delete_source(self.source_id)
931
932        if cascade_delete_destination:
933            self.workspace.permanently_delete_destination(self.destination_id)
logger = <Logger airbyte.cloud.connections (INFO)>
class CloudConnection:
 40class CloudConnection:  # noqa: PLR0904  # Too many public methods
 41    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 42
 43    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 44    """
 45
 46    def __init__(
 47        self,
 48        workspace: CloudWorkspace,
 49        connection_id: str,
 50        source: str | None = None,
 51        destination: str | None = None,
 52    ) -> None:
 53        """It is not recommended to create a `CloudConnection` object directly.
 54
 55        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 56        """
 57        self.connection_id = connection_id
 58        """The ID of the connection."""
 59
 60        self.workspace = workspace
 61        """The workspace that the connection belongs to."""
 62
 63        self._source_id = source
 64        """The ID of the source."""
 65
 66        self._destination_id = destination
 67        """The ID of the destination."""
 68
 69        self._connection_info: ConnectionResponse | None = None
 70        """The connection info object. (Cached.)"""
 71
 72        self._cloud_source_object: CloudSource | None = None
 73        """The source object. (Cached.)"""
 74
 75        self._cloud_destination_object: CloudDestination | None = None
 76        """The destination object. (Cached.)"""
 77
 78    def _fetch_connection_info(
 79        self,
 80        *,
 81        force_refresh: bool = False,
 82        verify: bool = True,
 83    ) -> ConnectionResponse:
 84        """Fetch and cache connection info from the API.
 85
 86        By default, this method will only fetch from the API if connection info is not
 87        already cached. It also verifies that the connection belongs to the expected
 88        workspace unless verification is explicitly disabled.
 89
 90        Args:
 91            force_refresh: If True, always fetch from the API even if cached.
 92                If False (default), only fetch if not already cached.
 93            verify: If True (default), verify that the connection is valid (e.g., that
 94                the workspace_id matches this object's workspace). Raises an error if
 95                validation fails.
 96
 97        Returns:
 98            The ConnectionResponse from the API.
 99
100        Raises:
101            AirbyteWorkspaceMismatchError: If verify is True and the connection's
102                workspace_id doesn't match the expected workspace.
103            AirbyteMissingResourceError: If the connection doesn't exist.
104        """
105        if not force_refresh and self._connection_info is not None:
106            # Use cached info, but still verify if requested
107            if verify:
108                self._verify_workspace_match(self._connection_info)
109            return self._connection_info
110
111        # Fetch from API
112        connection_info = api_util.get_connection(
113            workspace_id=self.workspace.workspace_id,
114            connection_id=self.connection_id,
115            api_root=self.workspace.api_root,
116            client_id=self.workspace.client_id,
117            client_secret=self.workspace.client_secret,
118            bearer_token=self.workspace.bearer_token,
119        )
120
121        # Cache the result first (before verification may raise)
122        self._connection_info = connection_info
123
124        # Verify if requested
125        if verify:
126            self._verify_workspace_match(connection_info)
127
128        return connection_info
129
130    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
131        """Verify that the connection belongs to the expected workspace.
132
133        Raises:
134            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
135        """
136        if connection_info.workspace_id != self.workspace.workspace_id:
137            raise AirbyteWorkspaceMismatchError(
138                resource_type="connection",
139                resource_id=self.connection_id,
140                workspace=self.workspace,
141                expected_workspace_id=self.workspace.workspace_id,
142                actual_workspace_id=connection_info.workspace_id,
143                message=(
144                    f"Connection '{self.connection_id}' belongs to workspace "
145                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
146                ),
147            )
148
149    def check_is_valid(self) -> bool:
150        """Check if this connection exists and belongs to the expected workspace.
151
152        This method fetches connection info from the API (if not already cached) and
153        verifies that the connection's workspace_id matches the workspace associated
154        with this CloudConnection object.
155
156        Returns:
157            True if the connection exists and belongs to the expected workspace.
158
159        Raises:
160            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
161            AirbyteMissingResourceError: If the connection doesn't exist.
162        """
163        self._fetch_connection_info(force_refresh=False, verify=True)
164        return True
165
166    @classmethod
167    def _from_connection_response(
168        cls,
169        workspace: CloudWorkspace,
170        connection_response: ConnectionResponse,
171    ) -> CloudConnection:
172        """Create a CloudConnection from a ConnectionResponse."""
173        result = cls(
174            workspace=workspace,
175            connection_id=connection_response.connection_id,
176            source=connection_response.source_id,
177            destination=connection_response.destination_id,
178        )
179        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
180        return result
181
182    # Properties
183
184    @property
185    def name(self) -> str | None:
186        """Get the display name of the connection, if available.
187
188        E.g. "My Postgres to Snowflake", not the connection ID.
189        """
190        if not self._connection_info:
191            self._connection_info = self._fetch_connection_info()
192
193        return self._connection_info.name
194
195    @property
196    def source_id(self) -> str:
197        """The ID of the source."""
198        if not self._source_id:
199            if not self._connection_info:
200                self._connection_info = self._fetch_connection_info()
201
202            self._source_id = self._connection_info.source_id
203
204        return self._source_id
205
206    @property
207    def source(self) -> CloudSource:
208        """Get the source object."""
209        if self._cloud_source_object:
210            return self._cloud_source_object
211
212        self._cloud_source_object = CloudSource(
213            workspace=self.workspace,
214            connector_id=self.source_id,
215        )
216        return self._cloud_source_object
217
218    @property
219    def destination_id(self) -> str:
220        """The ID of the destination."""
221        if not self._destination_id:
222            if not self._connection_info:
223                self._connection_info = self._fetch_connection_info()
224
225            self._destination_id = self._connection_info.destination_id
226
227        return self._destination_id
228
229    @property
230    def destination(self) -> CloudDestination:
231        """Get the destination object."""
232        if self._cloud_destination_object:
233            return self._cloud_destination_object
234
235        self._cloud_destination_object = CloudDestination(
236            workspace=self.workspace,
237            connector_id=self.destination_id,
238        )
239        return self._cloud_destination_object
240
241    @property
242    def stream_names(self) -> list[str]:
243        """The stream names."""
244        if not self._connection_info:
245            self._connection_info = self._fetch_connection_info()
246
247        return [stream.name for stream in self._connection_info.configurations.streams or []]
248
249    @property
250    def table_prefix(self) -> str:
251        """The table prefix."""
252        if not self._connection_info:
253            self._connection_info = self._fetch_connection_info()
254
255        return self._connection_info.prefix or ""
256
257    @property
258    def connection_url(self) -> str | None:
259        """The web URL to the connection."""
260        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
261
262    @property
263    def job_history_url(self) -> str | None:
264        """The URL to the job history for the connection."""
265        return f"{self.connection_url}/timeline"
266
267    # Run Sync
268
269    def run_sync(
270        self,
271        *,
272        wait: bool = True,
273        wait_timeout: int = 300,
274    ) -> SyncResult:
275        """Run a sync."""
276        connection_response = api_util.run_connection(
277            connection_id=self.connection_id,
278            api_root=self.workspace.api_root,
279            workspace_id=self.workspace.workspace_id,
280            client_id=self.workspace.client_id,
281            client_secret=self.workspace.client_secret,
282            bearer_token=self.workspace.bearer_token,
283        )
284        sync_result = SyncResult(
285            workspace=self.workspace,
286            connection=self,
287            job_id=connection_response.job_id,
288        )
289
290        if wait:
291            sync_result.wait_for_completion(
292                wait_timeout=wait_timeout,
293                raise_failure=True,
294                raise_timeout=True,
295            )
296
297        return sync_result
298
299    def __repr__(self) -> str:
300        """String representation of the connection."""
301        return (
302            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
303            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
304        )
305
306    # Logs
307
308    def get_previous_sync_logs(
309        self,
310        *,
311        limit: int = 20,
312        offset: int | None = None,
313        from_tail: bool = True,
314        job_type: JobTypeEnum | None = None,
315    ) -> list[SyncResult]:
316        """Get previous sync jobs for a connection with pagination support.
317
318        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
319        rows_synced, start_time). Full log text can be fetched lazily via
320        `SyncResult.get_full_log_text()`.
321
322        Args:
323            limit: Maximum number of jobs to return. Defaults to 20.
324            offset: Number of jobs to skip from the beginning. Defaults to None (0).
325            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
326                If False, returns jobs ordered oldest-first (createdAt ASC).
327                Defaults to True.
328            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
329                If not specified, defaults to sync and reset jobs only (API default behavior).
330
331        Returns:
332            A list of SyncResult objects representing the sync jobs.
333        """
334        order_by = (
335            api_util.JOB_ORDER_BY_CREATED_AT_DESC
336            if from_tail
337            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
338        )
339        sync_logs: list[JobResponse] = api_util.get_job_logs(
340            connection_id=self.connection_id,
341            api_root=self.workspace.api_root,
342            workspace_id=self.workspace.workspace_id,
343            limit=limit,
344            offset=offset,
345            order_by=order_by,
346            job_type=job_type,
347            client_id=self.workspace.client_id,
348            client_secret=self.workspace.client_secret,
349            bearer_token=self.workspace.bearer_token,
350        )
351        return [
352            SyncResult(
353                workspace=self.workspace,
354                connection=self,
355                job_id=sync_log.job_id,
356                _latest_job_info=sync_log,
357            )
358            for sync_log in sync_logs
359        ]
360
361    def get_sync_result(
362        self,
363        job_id: int | None = None,
364    ) -> SyncResult | None:
365        """Get the sync result for the connection.
366
367        If `job_id` is not provided, the most recent sync job will be used.
368
369        Returns `None` if job_id is omitted and no previous jobs are found.
370        """
371        if job_id is None:
372            # Get the most recent sync job
373            results = self.get_previous_sync_logs(
374                limit=1,
375            )
376            if results:
377                return results[0]
378
379            return None
380
381        # Get the sync job by ID (lazy loaded)
382        return SyncResult(
383            workspace=self.workspace,
384            connection=self,
385            job_id=job_id,
386        )
387
388    # Artifacts
389
390    @deprecated("Use 'dump_raw_state()' instead.")
391    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
392        """Deprecated. Use `dump_raw_state()` instead."""
393        state_response = api_util.get_connection_state(
394            connection_id=self.connection_id,
395            api_root=self.workspace.api_root,
396            client_id=self.workspace.client_id,
397            client_secret=self.workspace.client_secret,
398            bearer_token=self.workspace.bearer_token,
399        )
400        if state_response.get("stateType") == "not_set":
401            return None
402        return state_response.get("streamState", [])
403
404    @overload
405    def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ...
406
407    @overload
408    def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ...
409
410    def dump_raw_state(
411        self,
412        *,
413        normalize: bool = True,
414    ) -> dict[str, Any] | list[dict[str, Any]]:
415        """Dump the state for this connection.
416
417        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
418        with snake_case keys, suitable for passing to a connector's `--state` flag.
419
420        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
421        includes `stateType` and `connectionId`). This raw format can be passed
422        directly to `import_raw_state()` for backup/restore workflows.
423
424        Args:
425            normalize: If `True` (default), convert to Airbyte protocol format.
426                If `False`, return the raw Config API response.
427
428        Returns:
429            Normalized: list of protocol-format state message dicts (empty list if
430            no state). Raw: the full Config API state dict.
431        """
432        raw = api_util.get_connection_state(
433            connection_id=self.connection_id,
434            api_root=self.workspace.api_root,
435            client_id=self.workspace.client_id,
436            client_secret=self.workspace.client_secret,
437            bearer_token=self.workspace.bearer_token,
438        )
439        if normalize:
440            return _normalize_state_to_protocol(raw)
441        return raw
442
443    def import_raw_state(
444        self,
445        connection_state: dict[str, Any] | list[dict[str, Any]],
446    ) -> dict[str, Any]:
447        """Import (restore) the full state for this connection.
448
449        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
450        > could result in broken connections, and/or incorrect sync behavior.
451
452        Replaces the entire connection state with the provided state blob.
453        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
454
455        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
456        The `connectionId` in the blob is always overridden with this connection's
457        ID, making state blobs portable across connections.
458
459        Accepts either format:
460
461        - **Config API format** (dict with `stateType`): passed through directly.
462        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
463          converted to Config API format before sending.
464
465        Args:
466            connection_state: Connection state in either Config API or Airbyte protocol format.
467
468        Returns:
469            The updated connection state as a dictionary.
470
471        Raises:
472            AirbyteConnectionSyncActiveError: If a sync is currently running on this
473                connection (HTTP 423). Wait for the sync to complete before retrying.
474        """
475        api_state: dict[str, Any]
476        if isinstance(connection_state, list):
477            if not _is_protocol_state_format(connection_state):
478                msg = (
479                    "Expected connection_state list to contain Airbyte protocol state "
480                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
481                    "or LEGACY). Got a list that does not match protocol format."
482                )
483                raise ValueError(msg)
484            api_state = _denormalize_protocol_state_to_api(
485                protocol_messages=connection_state,
486                connection_id=self.connection_id,
487            )
488        elif isinstance(connection_state, dict):
489            if _is_protocol_state_format(connection_state):
490                api_state = _denormalize_protocol_state_to_api(
491                    protocol_messages=[connection_state],
492                    connection_id=self.connection_id,
493                )
494            else:
495                api_state = connection_state
496        else:
497            msg = f"Expected a dict or list, got {type(connection_state)}"
498            raise TypeError(msg)
499
500        return api_util.replace_connection_state(
501            connection_id=self.connection_id,
502            connection_state_dict=api_state,
503            api_root=self.workspace.api_root,
504            client_id=self.workspace.client_id,
505            client_secret=self.workspace.client_secret,
506            bearer_token=self.workspace.bearer_token,
507        )
508
509    def get_stream_state(
510        self,
511        stream_name: str,
512        stream_namespace: str | None = None,
513    ) -> dict[str, Any] | None:
514        """Get the state blob for a single stream within this connection.
515
516        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
517        not the full connection state envelope.
518
519        This is compatible with `stream`-type state and stream-level entries
520        within a `global`-type state. It is not compatible with `legacy` state.
521        To get or set the entire connection-level state artifact, use
522        `dump_raw_state` and `import_raw_state` instead.
523
524        Args:
525            stream_name: The name of the stream to get state for.
526            stream_namespace: The source-side stream namespace. This refers to the
527                namespace from the source (e.g., database schema), not any destination
528                namespace override set in connection advanced settings.
529
530        Returns:
531            The stream's state blob as a dictionary, or None if the stream is not found.
532        """
533        state_data = self.dump_raw_state(normalize=False)
534        result = ConnectionStateResponse(**state_data)
535
536        streams = _get_stream_list(result)
537        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
538
539        if not matching:
540            available = [s.stream_descriptor.name for s in streams]
541            logger.warning(
542                "Stream '%s' not found in connection state for connection '%s'. "
543                "Available streams: %s",
544                stream_name,
545                self.connection_id,
546                available,
547            )
548            return None
549
550        return matching[0].stream_state
551
552    def set_stream_state(
553        self,
554        stream_name: str,
555        state_blob_dict: dict[str, Any],
556        stream_namespace: str | None = None,
557    ) -> None:
558        """Set the state for a single stream within this connection.
559
560        Fetches the current full state, replaces only the specified stream's state,
561        then sends the full updated state back to the API. If the stream does not
562        exist in the current state, it is appended.
563
564        This is compatible with `stream`-type state and stream-level entries
565        within a `global`-type state. It is not compatible with `legacy` state.
566        To get or set the entire connection-level state artifact, use
567        `dump_raw_state` and `import_raw_state` instead.
568
569        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
570
571        Args:
572            stream_name: The name of the stream to update state for.
573            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
574            stream_namespace: The source-side stream namespace. This refers to the
575                namespace from the source (e.g., database schema), not any destination
576                namespace override set in connection advanced settings.
577
578        Raises:
579            PyAirbyteInputError: If the connection state type is not supported for
580                stream-level operations (not_set, legacy).
581            AirbyteConnectionSyncActiveError: If a sync is currently running on this
582                connection (HTTP 423). Wait for the sync to complete before retrying.
583        """
584        state_data = self.dump_raw_state(normalize=False)
585        current = ConnectionStateResponse(**state_data)
586
587        if current.state_type == "not_set":
588            raise PyAirbyteInputError(
589                message="Cannot set stream state: connection has no existing state.",
590                context={"connection_id": self.connection_id},
591            )
592
593        if current.state_type == "legacy":
594            raise PyAirbyteInputError(
595                message="Cannot set stream state on a legacy-type connection state.",
596                context={"connection_id": self.connection_id},
597            )
598
599        new_stream_entry = {
600            "streamDescriptor": {
601                "name": stream_name,
602                **(
603                    {
604                        "namespace": stream_namespace,
605                    }
606                    if stream_namespace
607                    else {}
608                ),
609            },
610            "streamState": state_blob_dict,
611        }
612
613        raw_streams: list[dict[str, Any]]
614        if current.state_type == "stream":
615            raw_streams = state_data.get("streamState", [])
616        elif current.state_type == "global":
617            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
618        else:
619            raw_streams = []
620
621        streams = _get_stream_list(current)
622        found = False
623        updated_streams_raw: list[dict[str, Any]] = []
624        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
625            if _match_stream(parsed_s, stream_name, stream_namespace):
626                updated_streams_raw.append(new_stream_entry)
627                found = True
628            else:
629                updated_streams_raw.append(raw_s)
630
631        if not found:
632            updated_streams_raw.append(new_stream_entry)
633
634        full_state: dict[str, Any] = {
635            **state_data,
636        }
637
638        if current.state_type == "stream":
639            full_state["streamState"] = updated_streams_raw
640        elif current.state_type == "global":
641            original_global = state_data.get("globalState", {})
642            full_state["globalState"] = {
643                **original_global,
644                "streamStates": updated_streams_raw,
645            }
646
647        self.import_raw_state(full_state)
648
649    @deprecated("Use 'dump_raw_catalog()' instead.")
650    def get_catalog_artifact(self) -> dict[str, Any] | None:
651        """Get the configured catalog for this connection.
652
653        Returns the full configured catalog (syncCatalog) for this connection,
654        including stream schemas, sync modes, cursor fields, and primary keys.
655
656        Uses the Config API endpoint: POST /v1/web_backend/connections/get
657
658        Returns:
659            Dictionary containing the configured catalog, or `None` if not found.
660        """
661        return self.dump_raw_catalog()
662
663    def dump_raw_catalog(
664        self,
665        *,
666        normalize: bool = True,
667    ) -> dict[str, Any] | None:
668        """Dump the configured catalog for this connection.
669
670        By default, returns the catalog in Airbyte protocol format
671        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
672        to a connector's `--catalog` flag.
673
674        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
675        Config API (camelCase keys, nested `config` block). This raw format can be
676        passed directly to `import_raw_catalog()` for backup/restore workflows.
677
678        Args:
679            normalize: If `True` (default), convert to Airbyte protocol format.
680                If `False`, return the raw Config API catalog.
681
682        Returns:
683            The configured catalog dict, or `None` if not found.
684        """
685        connection_response = api_util.get_connection_catalog(
686            connection_id=self.connection_id,
687            api_root=self.workspace.api_root,
688            client_id=self.workspace.client_id,
689            client_secret=self.workspace.client_secret,
690            bearer_token=self.workspace.bearer_token,
691        )
692        raw = connection_response.get("syncCatalog")
693        if raw is None:
694            return None
695        if normalize:
696            return _normalize_catalog_to_protocol(raw)
697        return raw
698
699    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
700        """Replace the configured catalog for this connection.
701
702        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
703        > could result in broken connections, and/or incorrect sync behavior.
704
705        Accepts a configured catalog dict and replaces the connection's entire
706        catalog with it. All other connection settings remain unchanged.
707
708        Accepts either format:
709
710        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
711          passed through directly.
712        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
713          automatically converted to Config API format before sending.
714
715        Args:
716            catalog: The configured catalog dict in either format.
717        """
718        if _is_protocol_catalog_format(catalog):
719            catalog = _denormalize_catalog_to_api(catalog)
720
721        api_util.replace_connection_catalog(
722            connection_id=self.connection_id,
723            configured_catalog_dict=catalog,
724            api_root=self.workspace.api_root,
725            client_id=self.workspace.client_id,
726            client_secret=self.workspace.client_secret,
727            bearer_token=self.workspace.bearer_token,
728        )
729
730    def rename(self, name: str) -> CloudConnection:
731        """Rename the connection.
732
733        Args:
734            name: New name for the connection
735
736        Returns:
737            Updated CloudConnection object with refreshed info
738        """
739        updated_response = api_util.patch_connection(
740            connection_id=self.connection_id,
741            api_root=self.workspace.api_root,
742            client_id=self.workspace.client_id,
743            client_secret=self.workspace.client_secret,
744            bearer_token=self.workspace.bearer_token,
745            name=name,
746        )
747        self._connection_info = updated_response
748        return self
749
750    def set_table_prefix(self, prefix: str) -> CloudConnection:
751        """Set the table prefix for the connection.
752
753        Args:
754            prefix: New table prefix to use when syncing to the destination
755
756        Returns:
757            Updated CloudConnection object with refreshed info
758        """
759        updated_response = api_util.patch_connection(
760            connection_id=self.connection_id,
761            api_root=self.workspace.api_root,
762            client_id=self.workspace.client_id,
763            client_secret=self.workspace.client_secret,
764            bearer_token=self.workspace.bearer_token,
765            prefix=prefix,
766        )
767        self._connection_info = updated_response
768        return self
769
770    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
771        """Set the selected streams for the connection.
772
773        This is a destructive operation that can break existing connections if the
774        stream selection is changed incorrectly. Use with caution.
775
776        Args:
777            stream_names: List of stream names to sync
778
779        Returns:
780            Updated CloudConnection object with refreshed info
781        """
782        configurations = api_util.build_stream_configurations(stream_names)
783
784        updated_response = api_util.patch_connection(
785            connection_id=self.connection_id,
786            api_root=self.workspace.api_root,
787            client_id=self.workspace.client_id,
788            client_secret=self.workspace.client_secret,
789            bearer_token=self.workspace.bearer_token,
790            configurations=configurations,
791        )
792        self._connection_info = updated_response
793        return self
794
795    # Enable/Disable
796
797    @property
798    def enabled(self) -> bool:
799        """Get the current enabled status of the connection.
800
801        This property always fetches fresh data from the API to ensure accuracy,
802        as another process or user may have toggled the setting.
803
804        Returns:
805            True if the connection status is 'active', False otherwise.
806        """
807        connection_info = self._fetch_connection_info(force_refresh=True)
808        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
809
810    @enabled.setter
811    def enabled(self, value: bool) -> None:
812        """Set the enabled status of the connection.
813
814        Args:
815            value: True to enable (set status to 'active'), False to disable
816                (set status to 'inactive').
817        """
818        self.set_enabled(enabled=value)
819
820    def set_enabled(
821        self,
822        *,
823        enabled: bool,
824        ignore_noop: bool = True,
825    ) -> None:
826        """Set the enabled status of the connection.
827
828        Args:
829            enabled: True to enable (set status to 'active'), False to disable
830                (set status to 'inactive').
831            ignore_noop: If True (default), silently return if the connection is already
832                in the requested state. If False, raise ValueError when the requested
833                state matches the current state.
834
835        Raises:
836            ValueError: If ignore_noop is False and the connection is already in the
837                requested state.
838        """
839        # Always fetch fresh data to check current status
840        connection_info = self._fetch_connection_info(force_refresh=True)
841        current_status = connection_info.status
842        desired_status = (
843            api_util.models.ConnectionStatusEnum.ACTIVE
844            if enabled
845            else api_util.models.ConnectionStatusEnum.INACTIVE
846        )
847
848        if current_status == desired_status:
849            if ignore_noop:
850                return
851            raise ValueError(
852                f"Connection is already {'enabled' if enabled else 'disabled'}. "
853                f"Current status: {current_status}"
854            )
855
856        updated_response = api_util.patch_connection(
857            connection_id=self.connection_id,
858            api_root=self.workspace.api_root,
859            client_id=self.workspace.client_id,
860            client_secret=self.workspace.client_secret,
861            bearer_token=self.workspace.bearer_token,
862            status=desired_status,
863        )
864        self._connection_info = updated_response
865
866    # Scheduling
867
868    def set_schedule(
869        self,
870        cron_expression: str,
871    ) -> None:
872        """Set a cron schedule for the connection.
873
874        Args:
875            cron_expression: A cron expression defining when syncs should run.
876
877        Examples:
878                - "0 0 * * *" - Daily at midnight UTC
879                - "0 */6 * * *" - Every 6 hours
880                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
881        """
882        schedule = api_util.models.AirbyteAPIConnectionSchedule(
883            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
884            cron_expression=cron_expression,
885        )
886        updated_response = api_util.patch_connection(
887            connection_id=self.connection_id,
888            api_root=self.workspace.api_root,
889            client_id=self.workspace.client_id,
890            client_secret=self.workspace.client_secret,
891            bearer_token=self.workspace.bearer_token,
892            schedule=schedule,
893        )
894        self._connection_info = updated_response
895
896    def set_manual_schedule(self) -> None:
897        """Set the connection to manual scheduling.
898
899        Disables automatic syncs. Syncs will only run when manually triggered.
900        """
901        schedule = api_util.models.AirbyteAPIConnectionSchedule(
902            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
903        )
904        updated_response = api_util.patch_connection(
905            connection_id=self.connection_id,
906            api_root=self.workspace.api_root,
907            client_id=self.workspace.client_id,
908            client_secret=self.workspace.client_secret,
909            bearer_token=self.workspace.bearer_token,
910            schedule=schedule,
911        )
912        self._connection_info = updated_response
913
914    # Deletions
915
916    def permanently_delete(
917        self,
918        *,
919        cascade_delete_source: bool = False,
920        cascade_delete_destination: bool = False,
921    ) -> None:
922        """Delete the connection.
923
924        Args:
925            cascade_delete_source: Whether to also delete the source.
926            cascade_delete_destination: Whether to also delete the destination.
927        """
928        self.workspace.permanently_delete_connection(self)
929
930        if cascade_delete_source:
931            self.workspace.permanently_delete_source(self.source_id)
932
933        if cascade_delete_destination:
934            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
46    def __init__(
47        self,
48        workspace: CloudWorkspace,
49        connection_id: str,
50        source: str | None = None,
51        destination: str | None = None,
52    ) -> None:
53        """It is not recommended to create a `CloudConnection` object directly.
54
55        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
56        """
57        self.connection_id = connection_id
58        """The ID of the connection."""
59
60        self.workspace = workspace
61        """The workspace that the connection belongs to."""
62
63        self._source_id = source
64        """The ID of the source."""
65
66        self._destination_id = destination
67        """The ID of the destination."""
68
69        self._connection_info: ConnectionResponse | None = None
70        """The connection info object. (Cached.)"""
71
72        self._cloud_source_object: CloudSource | None = None
73        """The source object. (Cached.)"""
74
75        self._cloud_destination_object: CloudDestination | None = None
76        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
149    def check_is_valid(self) -> bool:
150        """Check if this connection exists and belongs to the expected workspace.
151
152        This method fetches connection info from the API (if not already cached) and
153        verifies that the connection's workspace_id matches the workspace associated
154        with this CloudConnection object.
155
156        Returns:
157            True if the connection exists and belongs to the expected workspace.
158
159        Raises:
160            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
161            AirbyteMissingResourceError: If the connection doesn't exist.
162        """
163        self._fetch_connection_info(force_refresh=False, verify=True)
164        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
184    @property
185    def name(self) -> str | None:
186        """Get the display name of the connection, if available.
187
188        E.g. "My Postgres to Snowflake", not the connection ID.
189        """
190        if not self._connection_info:
191            self._connection_info = self._fetch_connection_info()
192
193        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
195    @property
196    def source_id(self) -> str:
197        """The ID of the source."""
198        if not self._source_id:
199            if not self._connection_info:
200                self._connection_info = self._fetch_connection_info()
201
202            self._source_id = self._connection_info.source_id
203
204        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
206    @property
207    def source(self) -> CloudSource:
208        """Get the source object."""
209        if self._cloud_source_object:
210            return self._cloud_source_object
211
212        self._cloud_source_object = CloudSource(
213            workspace=self.workspace,
214            connector_id=self.source_id,
215        )
216        return self._cloud_source_object

Get the source object.

destination_id: str
218    @property
219    def destination_id(self) -> str:
220        """The ID of the destination."""
221        if not self._destination_id:
222            if not self._connection_info:
223                self._connection_info = self._fetch_connection_info()
224
225            self._destination_id = self._connection_info.destination_id
226
227        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
229    @property
230    def destination(self) -> CloudDestination:
231        """Get the destination object."""
232        if self._cloud_destination_object:
233            return self._cloud_destination_object
234
235        self._cloud_destination_object = CloudDestination(
236            workspace=self.workspace,
237            connector_id=self.destination_id,
238        )
239        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
241    @property
242    def stream_names(self) -> list[str]:
243        """The stream names."""
244        if not self._connection_info:
245            self._connection_info = self._fetch_connection_info()
246
247        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
249    @property
250    def table_prefix(self) -> str:
251        """The table prefix."""
252        if not self._connection_info:
253            self._connection_info = self._fetch_connection_info()
254
255        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
257    @property
258    def connection_url(self) -> str | None:
259        """The web URL to the connection."""
260        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
262    @property
263    def job_history_url(self) -> str | None:
264        """The URL to the job history for the connection."""
265        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
269    def run_sync(
270        self,
271        *,
272        wait: bool = True,
273        wait_timeout: int = 300,
274    ) -> SyncResult:
275        """Run a sync."""
276        connection_response = api_util.run_connection(
277            connection_id=self.connection_id,
278            api_root=self.workspace.api_root,
279            workspace_id=self.workspace.workspace_id,
280            client_id=self.workspace.client_id,
281            client_secret=self.workspace.client_secret,
282            bearer_token=self.workspace.bearer_token,
283        )
284        sync_result = SyncResult(
285            workspace=self.workspace,
286            connection=self,
287            job_id=connection_response.job_id,
288        )
289
290        if wait:
291            sync_result.wait_for_completion(
292                wait_timeout=wait_timeout,
293                raise_failure=True,
294                raise_timeout=True,
295            )
296
297        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True, job_type: airbyte_api.models.jobtypeenum.JobTypeEnum | None = None) -> list[airbyte.cloud.SyncResult]:
308    def get_previous_sync_logs(
309        self,
310        *,
311        limit: int = 20,
312        offset: int | None = None,
313        from_tail: bool = True,
314        job_type: JobTypeEnum | None = None,
315    ) -> list[SyncResult]:
316        """Get previous sync jobs for a connection with pagination support.
317
318        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
319        rows_synced, start_time). Full log text can be fetched lazily via
320        `SyncResult.get_full_log_text()`.
321
322        Args:
323            limit: Maximum number of jobs to return. Defaults to 20.
324            offset: Number of jobs to skip from the beginning. Defaults to None (0).
325            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
326                If False, returns jobs ordered oldest-first (createdAt ASC).
327                Defaults to True.
328            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
329                If not specified, defaults to sync and reset jobs only (API default behavior).
330
331        Returns:
332            A list of SyncResult objects representing the sync jobs.
333        """
334        order_by = (
335            api_util.JOB_ORDER_BY_CREATED_AT_DESC
336            if from_tail
337            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
338        )
339        sync_logs: list[JobResponse] = api_util.get_job_logs(
340            connection_id=self.connection_id,
341            api_root=self.workspace.api_root,
342            workspace_id=self.workspace.workspace_id,
343            limit=limit,
344            offset=offset,
345            order_by=order_by,
346            job_type=job_type,
347            client_id=self.workspace.client_id,
348            client_secret=self.workspace.client_secret,
349            bearer_token=self.workspace.bearer_token,
350        )
351        return [
352            SyncResult(
353                workspace=self.workspace,
354                connection=self,
355                job_id=sync_log.job_id,
356                _latest_job_info=sync_log,
357            )
358            for sync_log in sync_logs
359        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
  • job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
361    def get_sync_result(
362        self,
363        job_id: int | None = None,
364    ) -> SyncResult | None:
365        """Get the sync result for the connection.
366
367        If `job_id` is not provided, the most recent sync job will be used.
368
369        Returns `None` if job_id is omitted and no previous jobs are found.
370        """
371        if job_id is None:
372            # Get the most recent sync job
373            results = self.get_previous_sync_logs(
374                limit=1,
375            )
376            if results:
377                return results[0]
378
379            return None
380
381        # Get the sync job by ID (lazy loaded)
382        return SyncResult(
383            workspace=self.workspace,
384            connection=self,
385            job_id=job_id,
386        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@deprecated("Use 'dump_raw_state()' instead.")
def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
390    @deprecated("Use 'dump_raw_state()' instead.")
391    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
392        """Deprecated. Use `dump_raw_state()` instead."""
393        state_response = api_util.get_connection_state(
394            connection_id=self.connection_id,
395            api_root=self.workspace.api_root,
396            client_id=self.workspace.client_id,
397            client_secret=self.workspace.client_secret,
398            bearer_token=self.workspace.bearer_token,
399        )
400        if state_response.get("stateType") == "not_set":
401            return None
402        return state_response.get("streamState", [])

Deprecated. Use dump_raw_state() instead.

def dump_raw_state( self, *, normalize: bool = True) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
410    def dump_raw_state(
411        self,
412        *,
413        normalize: bool = True,
414    ) -> dict[str, Any] | list[dict[str, Any]]:
415        """Dump the state for this connection.
416
417        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
418        with snake_case keys, suitable for passing to a connector's `--state` flag.
419
420        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
421        includes `stateType` and `connectionId`). This raw format can be passed
422        directly to `import_raw_state()` for backup/restore workflows.
423
424        Args:
425            normalize: If `True` (default), convert to Airbyte protocol format.
426                If `False`, return the raw Config API response.
427
428        Returns:
429            Normalized: list of protocol-format state message dicts (empty list if
430            no state). Raw: the full Config API state dict.
431        """
432        raw = api_util.get_connection_state(
433            connection_id=self.connection_id,
434            api_root=self.workspace.api_root,
435            client_id=self.workspace.client_id,
436            client_secret=self.workspace.client_secret,
437            bearer_token=self.workspace.bearer_token,
438        )
439        if normalize:
440            return _normalize_state_to_protocol(raw)
441        return raw

Dump the state for this connection.

By default, returns a list of Airbyte protocol AirbyteStateMessage dicts with snake_case keys, suitable for passing to a connector's --state flag.

When normalize is False, returns the raw Config API dict (camelCase keys, includes stateType and connectionId). This raw format can be passed directly to import_raw_state() for backup/restore workflows.

Arguments:
  • normalize: If True (default), convert to Airbyte protocol format. If False, return the raw Config API response.
Returns:

Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.

def import_raw_state( self, connection_state: dict[str, typing.Any] | list[dict[str, typing.Any]]) -> dict[str, typing.Any]:
443    def import_raw_state(
444        self,
445        connection_state: dict[str, Any] | list[dict[str, Any]],
446    ) -> dict[str, Any]:
447        """Import (restore) the full state for this connection.
448
449        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
450        > could result in broken connections, and/or incorrect sync behavior.
451
452        Replaces the entire connection state with the provided state blob.
453        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
454
455        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
456        The `connectionId` in the blob is always overridden with this connection's
457        ID, making state blobs portable across connections.
458
459        Accepts either format:
460
461        - **Config API format** (dict with `stateType`): passed through directly.
462        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
463          converted to Config API format before sending.
464
465        Args:
466            connection_state: Connection state in either Config API or Airbyte protocol format.
467
468        Returns:
469            The updated connection state as a dictionary.
470
471        Raises:
472            AirbyteConnectionSyncActiveError: If a sync is currently running on this
473                connection (HTTP 423). Wait for the sync to complete before retrying.
474        """
475        api_state: dict[str, Any]
476        if isinstance(connection_state, list):
477            if not _is_protocol_state_format(connection_state):
478                msg = (
479                    "Expected connection_state list to contain Airbyte protocol state "
480                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
481                    "or LEGACY). Got a list that does not match protocol format."
482                )
483                raise ValueError(msg)
484            api_state = _denormalize_protocol_state_to_api(
485                protocol_messages=connection_state,
486                connection_id=self.connection_id,
487            )
488        elif isinstance(connection_state, dict):
489            if _is_protocol_state_format(connection_state):
490                api_state = _denormalize_protocol_state_to_api(
491                    protocol_messages=[connection_state],
492                    connection_id=self.connection_id,
493                )
494            else:
495                api_state = connection_state
496        else:
497            msg = f"Expected a dict or list, got {type(connection_state)}"
498            raise TypeError(msg)
499
500        return api_util.replace_connection_state(
501            connection_id=self.connection_id,
502            connection_state_dict=api_state,
503            api_root=self.workspace.api_root,
504            client_id=self.workspace.client_id,
505            client_secret=self.workspace.client_secret,
506            bearer_token=self.workspace.bearer_token,
507        )

Import (restore) the full state for this connection.

⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

This is the counterpart to dump_raw_state() for backup/restore workflows. The connectionId in the blob is always overridden with this connection's ID, making state blobs portable across connections.

Accepts either format:

  • Config API format (dict with stateType): passed through directly.
  • Airbyte protocol format (list of AirbyteStateMessage dicts): automatically converted to Config API format before sending.
Arguments:
  • connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:

The updated connection state as a dictionary.

Raises:
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
def get_stream_state( self, stream_name: str, stream_namespace: str | None = None) -> dict[str, typing.Any] | None:
509    def get_stream_state(
510        self,
511        stream_name: str,
512        stream_namespace: str | None = None,
513    ) -> dict[str, Any] | None:
514        """Get the state blob for a single stream within this connection.
515
516        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
517        not the full connection state envelope.
518
519        This is compatible with `stream`-type state and stream-level entries
520        within a `global`-type state. It is not compatible with `legacy` state.
521        To get or set the entire connection-level state artifact, use
522        `dump_raw_state` and `import_raw_state` instead.
523
524        Args:
525            stream_name: The name of the stream to get state for.
526            stream_namespace: The source-side stream namespace. This refers to the
527                namespace from the source (e.g., database schema), not any destination
528                namespace override set in connection advanced settings.
529
530        Returns:
531            The stream's state blob as a dictionary, or None if the stream is not found.
532        """
533        state_data = self.dump_raw_state(normalize=False)
534        result = ConnectionStateResponse(**state_data)
535
536        streams = _get_stream_list(result)
537        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
538
539        if not matching:
540            available = [s.stream_descriptor.name for s in streams]
541            logger.warning(
542                "Stream '%s' not found in connection state for connection '%s'. "
543                "Available streams: %s",
544                stream_name,
545                self.connection_id,
546                available,
547            )
548            return None
549
550        return matching[0].stream_state

Get the state blob for a single stream within this connection.

Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Arguments:
  • stream_name: The name of the stream to get state for.
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:

The stream's state blob as a dictionary, or None if the stream is not found.

def set_stream_state( self, stream_name: str, state_blob_dict: dict[str, typing.Any], stream_namespace: str | None = None) -> None:
552    def set_stream_state(
553        self,
554        stream_name: str,
555        state_blob_dict: dict[str, Any],
556        stream_namespace: str | None = None,
557    ) -> None:
558        """Set the state for a single stream within this connection.
559
560        Fetches the current full state, replaces only the specified stream's state,
561        then sends the full updated state back to the API. If the stream does not
562        exist in the current state, it is appended.
563
564        This is compatible with `stream`-type state and stream-level entries
565        within a `global`-type state. It is not compatible with `legacy` state.
566        To get or set the entire connection-level state artifact, use
567        `dump_raw_state` and `import_raw_state` instead.
568
569        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
570
571        Args:
572            stream_name: The name of the stream to update state for.
573            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
574            stream_namespace: The source-side stream namespace. This refers to the
575                namespace from the source (e.g., database schema), not any destination
576                namespace override set in connection advanced settings.
577
578        Raises:
579            PyAirbyteInputError: If the connection state type is not supported for
580                stream-level operations (not_set, legacy).
581            AirbyteConnectionSyncActiveError: If a sync is currently running on this
582                connection (HTTP 423). Wait for the sync to complete before retrying.
583        """
584        state_data = self.dump_raw_state(normalize=False)
585        current = ConnectionStateResponse(**state_data)
586
587        if current.state_type == "not_set":
588            raise PyAirbyteInputError(
589                message="Cannot set stream state: connection has no existing state.",
590                context={"connection_id": self.connection_id},
591            )
592
593        if current.state_type == "legacy":
594            raise PyAirbyteInputError(
595                message="Cannot set stream state on a legacy-type connection state.",
596                context={"connection_id": self.connection_id},
597            )
598
599        new_stream_entry = {
600            "streamDescriptor": {
601                "name": stream_name,
602                **(
603                    {
604                        "namespace": stream_namespace,
605                    }
606                    if stream_namespace
607                    else {}
608                ),
609            },
610            "streamState": state_blob_dict,
611        }
612
613        raw_streams: list[dict[str, Any]]
614        if current.state_type == "stream":
615            raw_streams = state_data.get("streamState", [])
616        elif current.state_type == "global":
617            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
618        else:
619            raw_streams = []
620
621        streams = _get_stream_list(current)
622        found = False
623        updated_streams_raw: list[dict[str, Any]] = []
624        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
625            if _match_stream(parsed_s, stream_name, stream_namespace):
626                updated_streams_raw.append(new_stream_entry)
627                found = True
628            else:
629                updated_streams_raw.append(raw_s)
630
631        if not found:
632            updated_streams_raw.append(new_stream_entry)
633
634        full_state: dict[str, Any] = {
635            **state_data,
636        }
637
638        if current.state_type == "stream":
639            full_state["streamState"] = updated_streams_raw
640        elif current.state_type == "global":
641            original_global = state_data.get("globalState", {})
642            full_state["globalState"] = {
643                **original_global,
644                "streamStates": updated_streams_raw,
645            }
646
647        self.import_raw_state(full_state)

Set the state for a single stream within this connection.

Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Uses the safe variant that prevents updates while a sync is running (HTTP 423).

Arguments:
  • stream_name: The name of the stream to update state for.
  • state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
  • PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
@deprecated("Use 'dump_raw_catalog()' instead.")
def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
649    @deprecated("Use 'dump_raw_catalog()' instead.")
650    def get_catalog_artifact(self) -> dict[str, Any] | None:
651        """Get the configured catalog for this connection.
652
653        Returns the full configured catalog (syncCatalog) for this connection,
654        including stream schemas, sync modes, cursor fields, and primary keys.
655
656        Uses the Config API endpoint: POST /v1/web_backend/connections/get
657
658        Returns:
659            Dictionary containing the configured catalog, or `None` if not found.
660        """
661        return self.dump_raw_catalog()

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def dump_raw_catalog(self, *, normalize: bool = True) -> dict[str, typing.Any] | None:
663    def dump_raw_catalog(
664        self,
665        *,
666        normalize: bool = True,
667    ) -> dict[str, Any] | None:
668        """Dump the configured catalog for this connection.
669
670        By default, returns the catalog in Airbyte protocol format
671        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
672        to a connector's `--catalog` flag.
673
674        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
675        Config API (camelCase keys, nested `config` block). This raw format can be
676        passed directly to `import_raw_catalog()` for backup/restore workflows.
677
678        Args:
679            normalize: If `True` (default), convert to Airbyte protocol format.
680                If `False`, return the raw Config API catalog.
681
682        Returns:
683            The configured catalog dict, or `None` if not found.
684        """
685        connection_response = api_util.get_connection_catalog(
686            connection_id=self.connection_id,
687            api_root=self.workspace.api_root,
688            client_id=self.workspace.client_id,
689            client_secret=self.workspace.client_secret,
690            bearer_token=self.workspace.bearer_token,
691        )
692        raw = connection_response.get("syncCatalog")
693        if raw is None:
694            return None
695        if normalize:
696            return _normalize_catalog_to_protocol(raw)
697        return raw

Dump the configured catalog for this connection.

By default, returns the catalog in Airbyte protocol format (ConfiguredAirbyteCatalog with snake_case keys), suitable for passing to a connector's --catalog flag.

When normalize is False, returns the raw syncCatalog dict from the Config API (camelCase keys, nested config block). This raw format can be passed directly to import_raw_catalog() for backup/restore workflows.

Arguments:
  • normalize: If True (default), convert to Airbyte protocol format. If False, return the raw Config API catalog.
Returns:

The configured catalog dict, or None if not found.

def import_raw_catalog(self, catalog: dict[str, typing.Any]) -> None:
699    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
700        """Replace the configured catalog for this connection.
701
702        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
703        > could result in broken connections, and/or incorrect sync behavior.
704
705        Accepts a configured catalog dict and replaces the connection's entire
706        catalog with it. All other connection settings remain unchanged.
707
708        Accepts either format:
709
710        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
711          passed through directly.
712        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
713          automatically converted to Config API format before sending.
714
715        Args:
716            catalog: The configured catalog dict in either format.
717        """
718        if _is_protocol_catalog_format(catalog):
719            catalog = _denormalize_catalog_to_api(catalog)
720
721        api_util.replace_connection_catalog(
722            connection_id=self.connection_id,
723            configured_catalog_dict=catalog,
724            api_root=self.workspace.api_root,
725            client_id=self.workspace.client_id,
726            client_secret=self.workspace.client_secret,
727            bearer_token=self.workspace.bearer_token,
728        )

Replace the configured catalog for this connection.

⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.

Accepts either format:

  • Config API format (syncCatalog with camelCase keys and nested config): passed through directly.
  • Airbyte protocol format (ConfiguredAirbyteCatalog with snake_case keys): automatically converted to Config API format before sending.
Arguments:
  • catalog: The configured catalog dict in either format.
def rename(self, name: str) -> CloudConnection:
730    def rename(self, name: str) -> CloudConnection:
731        """Rename the connection.
732
733        Args:
734            name: New name for the connection
735
736        Returns:
737            Updated CloudConnection object with refreshed info
738        """
739        updated_response = api_util.patch_connection(
740            connection_id=self.connection_id,
741            api_root=self.workspace.api_root,
742            client_id=self.workspace.client_id,
743            client_secret=self.workspace.client_secret,
744            bearer_token=self.workspace.bearer_token,
745            name=name,
746        )
747        self._connection_info = updated_response
748        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
750    def set_table_prefix(self, prefix: str) -> CloudConnection:
751        """Set the table prefix for the connection.
752
753        Args:
754            prefix: New table prefix to use when syncing to the destination
755
756        Returns:
757            Updated CloudConnection object with refreshed info
758        """
759        updated_response = api_util.patch_connection(
760            connection_id=self.connection_id,
761            api_root=self.workspace.api_root,
762            client_id=self.workspace.client_id,
763            client_secret=self.workspace.client_secret,
764            bearer_token=self.workspace.bearer_token,
765            prefix=prefix,
766        )
767        self._connection_info = updated_response
768        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
770    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
771        """Set the selected streams for the connection.
772
773        This is a destructive operation that can break existing connections if the
774        stream selection is changed incorrectly. Use with caution.
775
776        Args:
777            stream_names: List of stream names to sync
778
779        Returns:
780            Updated CloudConnection object with refreshed info
781        """
782        configurations = api_util.build_stream_configurations(stream_names)
783
784        updated_response = api_util.patch_connection(
785            connection_id=self.connection_id,
786            api_root=self.workspace.api_root,
787            client_id=self.workspace.client_id,
788            client_secret=self.workspace.client_secret,
789            bearer_token=self.workspace.bearer_token,
790            configurations=configurations,
791        )
792        self._connection_info = updated_response
793        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
797    @property
798    def enabled(self) -> bool:
799        """Get the current enabled status of the connection.
800
801        This property always fetches fresh data from the API to ensure accuracy,
802        as another process or user may have toggled the setting.
803
804        Returns:
805            True if the connection status is 'active', False otherwise.
806        """
807        connection_info = self._fetch_connection_info(force_refresh=True)
808        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
820    def set_enabled(
821        self,
822        *,
823        enabled: bool,
824        ignore_noop: bool = True,
825    ) -> None:
826        """Set the enabled status of the connection.
827
828        Args:
829            enabled: True to enable (set status to 'active'), False to disable
830                (set status to 'inactive').
831            ignore_noop: If True (default), silently return if the connection is already
832                in the requested state. If False, raise ValueError when the requested
833                state matches the current state.
834
835        Raises:
836            ValueError: If ignore_noop is False and the connection is already in the
837                requested state.
838        """
839        # Always fetch fresh data to check current status
840        connection_info = self._fetch_connection_info(force_refresh=True)
841        current_status = connection_info.status
842        desired_status = (
843            api_util.models.ConnectionStatusEnum.ACTIVE
844            if enabled
845            else api_util.models.ConnectionStatusEnum.INACTIVE
846        )
847
848        if current_status == desired_status:
849            if ignore_noop:
850                return
851            raise ValueError(
852                f"Connection is already {'enabled' if enabled else 'disabled'}. "
853                f"Current status: {current_status}"
854            )
855
856        updated_response = api_util.patch_connection(
857            connection_id=self.connection_id,
858            api_root=self.workspace.api_root,
859            client_id=self.workspace.client_id,
860            client_secret=self.workspace.client_secret,
861            bearer_token=self.workspace.bearer_token,
862            status=desired_status,
863        )
864        self._connection_info = updated_response

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
868    def set_schedule(
869        self,
870        cron_expression: str,
871    ) -> None:
872        """Set a cron schedule for the connection.
873
874        Args:
875            cron_expression: A cron expression defining when syncs should run.
876
877        Examples:
878                - "0 0 * * *" - Daily at midnight UTC
879                - "0 */6 * * *" - Every 6 hours
880                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
881        """
882        schedule = api_util.models.AirbyteAPIConnectionSchedule(
883            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
884            cron_expression=cron_expression,
885        )
886        updated_response = api_util.patch_connection(
887            connection_id=self.connection_id,
888            api_root=self.workspace.api_root,
889            client_id=self.workspace.client_id,
890            client_secret=self.workspace.client_secret,
891            bearer_token=self.workspace.bearer_token,
892            schedule=schedule,
893        )
894        self._connection_info = updated_response

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
896    def set_manual_schedule(self) -> None:
897        """Set the connection to manual scheduling.
898
899        Disables automatic syncs. Syncs will only run when manually triggered.
900        """
901        schedule = api_util.models.AirbyteAPIConnectionSchedule(
902            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
903        )
904        updated_response = api_util.patch_connection(
905            connection_id=self.connection_id,
906            api_root=self.workspace.api_root,
907            client_id=self.workspace.client_id,
908            client_secret=self.workspace.client_secret,
909            bearer_token=self.workspace.bearer_token,
910            schedule=schedule,
911        )
912        self._connection_info = updated_response

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
916    def permanently_delete(
917        self,
918        *,
919        cascade_delete_source: bool = False,
920        cascade_delete_destination: bool = False,
921    ) -> None:
922        """Delete the connection.
923
924        Args:
925            cascade_delete_source: Whether to also delete the source.
926            cascade_delete_destination: Whether to also delete the destination.
927        """
928        self.workspace.permanently_delete_connection(self)
929
930        if cascade_delete_source:
931            self.workspace.permanently_delete_source(self.source_id)
932
933        if cascade_delete_destination:
934            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.